reviewable/pr8842/r3
Craig Tiller 8 years ago
parent e9d67009e9
commit 255edaa32e
  1. 10
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 6
      src/core/lib/channel/deadline_filter.c
  3. 3
      src/core/lib/iomgr/error.c
  4. 49
      src/core/lib/surface/call.c
  5. 15
      src/core/lib/transport/transport.h
  6. 8
      src/core/lib/transport/transport_op_string.c
  7. 4
      test/core/end2end/tests/filter_causes_close.c

@ -1018,10 +1018,6 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
grpc_chttp2_cancel_stream(exec_ctx, t, s, GRPC_ERROR_REF(op->cancel_error));
}
if (op->close_error != GRPC_ERROR_NONE) {
close_from_api(exec_ctx, t, s, GRPC_ERROR_REF(op->close_error));
}
if (op->send_initial_metadata != NULL) {
GPR_ASSERT(s->send_initial_metadata_finished == NULL);
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
@ -1461,6 +1457,12 @@ static void status_codes_from_error(grpc_error *error, gpr_timespec deadline,
void grpc_chttp2_cancel_stream(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_error *due_to_error) {
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_get_int(due_to_error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
close_from_api(exec_ctx, t, s, due_to_error);
return;
}
if (!s->read_closed || !s->write_closed) {
grpc_status_code grpc_status;
grpc_chttp2_error_code http_error;

@ -197,8 +197,7 @@ void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_deadline_state* deadline_state = elem->call_data;
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
if (op->cancel_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
@ -286,8 +285,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
server_call_data* calld = elem->call_data;
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
if (op->cancel_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline

@ -261,7 +261,8 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
grpc_error *out;
if (is_special(in)) {
if (in == GRPC_ERROR_NONE)
out = GRPC_ERROR_CREATE("no error");
out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
else if (in == GRPC_ERROR_OOM)
out = GRPC_ERROR_CREATE("oom");
else if (in == GRPC_ERROR_CANCELLED)

@ -222,9 +222,8 @@ static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
grpc_error *error);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
@ -339,7 +338,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
grpc_status_code status;
const char *error_str;
grpc_error_get_status(error, &status, &error_str);
close_with_status(exec_ctx, call, status, error_str);
cancel_with_status(exec_ctx, call, status, error_str);
}
if (args->cq != NULL) {
GPR_ASSERT(
@ -528,13 +527,10 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return r;
}
typedef enum { TC_CANCEL, TC_CLOSE } termination_closure_type;
typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
termination_closure_type type;
grpc_transport_stream_op op;
} termination_closure;
@ -550,14 +546,7 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp,
grpc_error *error) {
termination_closure *tc = tcp;
memset(&tc->op, 0, sizeof(tc->op));
switch (tc->type) {
case TC_CANCEL:
tc->op.cancel_error = tc->error;
break;
case TC_CLOSE:
tc->op.close_error = tc->error;
break;
}
tc->op.cancel_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc,
grpc_schedule_on_exec_ctx);
@ -577,17 +566,19 @@ static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx,
}
static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx,
grpc_call *c,
termination_closure_type tc_type,
grpc_error *error) {
grpc_call *c, grpc_error *error) {
termination_closure *tc = gpr_malloc(sizeof(*tc));
memset(tc, 0, sizeof(*tc));
tc->type = tc_type;
tc->call = c;
tc->error = error;
return terminate_with_status(exec_ctx, tc);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_error *error) {
terminate_with_error(exec_ctx, c, error);
}
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
return grpc_error_set_int(
@ -599,14 +590,7 @@ static grpc_error *error_from_status(grpc_status_code status,
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
return terminate_with_error(exec_ctx, c, TC_CANCEL,
error_from_status(status, description));
}
static grpc_call_error close_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description) {
return terminate_with_error(exec_ctx, c, TC_CLOSE,
return terminate_with_error(exec_ctx, c,
error_from_status(status, description));
}
@ -927,7 +911,7 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
return CALL_STACK_FROM_CALL(call);
}
/*
/*******************************************************************************
* BATCH API IMPLEMENTATION
*/
@ -1141,10 +1125,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *msg;
grpc_error_get_status(error, &status, &msg);
close_with_status(exec_ctx, call, status, msg);
cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error));
}
gpr_mu_lock(&bctl->call->mu);
if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
@ -1172,7 +1153,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
algo);
gpr_log(GPR_ERROR, "%s", error_msg);
close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
@ -1181,7 +1162,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
gpr_log(GPR_ERROR, "%s", error_msg);
close_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
} else {
call->incoming_compression_algorithm = algo;
}

@ -150,13 +150,18 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats;
/** If != GRPC_ERROR_NONE, cancel this stream */
/** If != GRPC_ERROR_NONE, forcefully close this stream.
The HTTP2 semantics should be:
- server side: if cancel_error has GRPC_ERROR_INT_GRPC_STATUS, and
trailing metadata has not been sent, send trailing metadata with status
and message from cancel_error (use grpc_error_get_status) followed by
a RST_STREAM with error=GRPC_CHTTP2_NO_ERROR to force a full close
- at all other times: use grpc_error_get_status to get a status code, and
convert to a HTTP2 error code using
grpc_chttp2_grpc_status_to_http2_error. Send a RST_STREAM with this
error. */
grpc_error *cancel_error;
/** If != GRPC_ERROR_NONE, send grpc-status, grpc-message, and close this
stream for both reading and writing */
grpc_error *close_error;
/* Indexes correspond to grpc_context_index enum values */
grpc_call_context_element *context;

@ -125,14 +125,6 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec_add(&b, tmp);
}
if (op->close_error != GRPC_ERROR_NONE) {
gpr_strvec_add(&b, gpr_strdup(" "));
const char *msg = grpc_error_string(op->close_error);
gpr_asprintf(&tmp, "CLOSE:%s", msg);
grpc_error_free_string(msg);
gpr_strvec_add(&b, tmp);
}
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);

@ -210,7 +210,9 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = elem->call_data;
grpc_closure_sched(
exec_ctx, calld->recv_im_ready,
GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1));
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_PERMISSION_DENIED));
}
static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,

Loading…
Cancel
Save