When there is an error from filters (such as max recv message size too

large), add an error to the batch to avoid emitting new rpc on the
server side in C++.
pull/9846/head
yang-g 8 years ago
parent 63852c933b
commit 23f777df08
  1. 17
      src/core/lib/surface/call.c
  2. 3
      src/core/lib/surface/server.c
  3. 1
      src/proto/grpc/testing/echo_messages.proto
  4. 3
      test/cpp/end2end/end2end_test.cc
  5. 4
      test/cpp/end2end/test_service_impl.cc

@ -112,7 +112,7 @@ static received_status unpack_received_status(gpr_atm atm) {
.error = (grpc_error *)(atm & ~(gpr_atm)1)};
}
#define MAX_ERRORS_PER_BATCH 3
#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
grpc_call *call;
@ -254,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error);
grpc_error *error, bool has_cancelled);
static void add_init_error(grpc_error **composite, grpc_error *new) {
if (new == GRPC_ERROR_NONE) return;
@ -1223,6 +1223,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
}
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
@ -1289,10 +1294,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
grpc_error *error) {
grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
if (idx == 0) {
if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
@ -1306,7 +1311,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@ -1343,7 +1348,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}

@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
GRPC_ERROR_REF(error));
return;
}

@ -50,6 +50,7 @@ message RequestParams {
bool skip_cancelled_check = 9;
string expected_transport_security_type = 10;
DebugInfo debug_info = 11;
bool server_die = 12; // Server should not see a request with this set.
}
message EchoRequest {

@ -901,8 +901,7 @@ TEST_P(End2endTest, RpcMaxMessageSize) {
EchoRequest request;
EchoResponse response;
request.set_message(string(kMaxMessageSize_ * 2, 'a'));
// cancelled is not guaranteed to appear before the end of the service handler
request.mutable_param()->set_skip_cancelled_check(true);
request.mutable_param()->set_server_die(true);
ClientContext context;
Status s = stub_->Echo(&context, request, &response);

@ -88,6 +88,10 @@ void CheckServerAuthContext(
Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
if (request->has_param() && request->param().server_die()) {
gpr_log(GPR_ERROR, "The request should not reach application handler.");
GPR_ASSERT(0);
}
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
if (server_try_cancel > DO_NOT_CANCEL) {

Loading…
Cancel
Save