Merge pull request #2221 from yang-g/no_new_bbq_after_flush

Bug fix. Do not push to incoming_queue after it is flushed.
pull/2226/merge
Craig Tiller 10 years ago
commit 40238fd032
  1. 18
      src/core/surface/call.c
  2. 39
      test/cpp/end2end/end2end_test.cc

@ -156,6 +156,9 @@ struct grpc_call {
gpr_uint8 reading_message;
/* have we bound a pollset yet? */
gpr_uint8 bound_pollset;
/* is an error status set */
gpr_uint8 error_status_set;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@ -214,7 +217,7 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
/** Compression level for the call */
/* Compression level for the call */
grpc_compression_level compression_level;
/* Contexts for various subsystems (security, tracing, ...). */
@ -409,6 +412,7 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = status;
call->error_status_set = status != GRPC_STATUS_OK;
if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
@ -686,13 +690,13 @@ static void call_on_done_send(void *pc, int success) {
}
static void finish_message(grpc_call *call) {
/* TODO(ctiller): this could be a lot faster if coded directly */
grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
call->incoming_message.slices, call->incoming_message.count);
if (call->error_status_set == 0) {
/* TODO(ctiller): this could be a lot faster if coded directly */
grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
call->incoming_message.slices, call->incoming_message.count);
grpc_bbq_push(&call->incoming_queue, byte_buffer);
}
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
grpc_bbq_push(&call->incoming_queue, byte_buffer);
GPR_ASSERT(call->incoming_message.count == 0);
call->reading_message = 0;
}

@ -68,6 +68,8 @@ namespace testing {
namespace {
const char* kServerCancelAfterReads = "cancel_after_reads";
// When echo_deadline is requested, deadline seen in the ServerContext is set in
// the response in seconds.
void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
@ -131,7 +133,23 @@ class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
EchoResponse* response) GRPC_OVERRIDE {
EchoRequest request;
response->set_message("");
int cancel_after_reads = 0;
const std::multimap<grpc::string, grpc::string> client_initial_metadata =
context->client_metadata();
if (client_initial_metadata.find(kServerCancelAfterReads) !=
client_initial_metadata.end()) {
std::istringstream iss(
client_initial_metadata.find(kServerCancelAfterReads)->second);
iss >> cancel_after_reads;
gpr_log(GPR_INFO, "cancel_after_reads %d", cancel_after_reads);
}
while (reader->Read(&request)) {
if (cancel_after_reads == 1) {
gpr_log(GPR_INFO, "return cancel status");
return Status::CANCELLED;
} else if (cancel_after_reads > 0) {
cancel_after_reads--;
}
response->mutable_message()->append(request.message());
}
return Status::OK;
@ -687,6 +705,27 @@ TEST_F(End2endTest, OverridePerCallCredentials) {
EXPECT_TRUE(s.ok());
}
// Client sends 20 requests and the server returns CANCELLED status after
// reading 10 requests.
TEST_F(End2endTest, RequestStreamServerEarlyCancelTest) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerCancelAfterReads, "10");
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
int send_messages = 20;
while (send_messages > 0) {
EXPECT_TRUE(stream->Write(request));
send_messages--;
}
stream->WritesDone();
Status s = stream->Finish();
EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
}
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save