Conversion progress

reviewable/pr9949/r1
Craig Tiller 8 years ago
parent 8c09d6795a
commit 72920cc08a
  1. 2
      src/core/ext/client_channel/client_channel.c
  2. 3
      src/core/lib/channel/compress_filter.c
  3. 17
      src/core/lib/channel/message_size_filter.c
  4. 26
      src/core/lib/security/transport/client_auth_filter.c
  5. 18
      src/core/lib/security/transport/server_auth_filter.c
  6. 10
      src/core/lib/surface/lame_client.c
  7. 34
      src/core/lib/surface/server.c
  8. 26
      src/core/lib/transport/transport_op_string.c
  9. 7
      test/core/end2end/tests/filter_causes_close.c

@ -449,7 +449,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error_ignored) {
grpc_transport_op *op = arg;
grpc_channel_element *elem = op->transport_private.args[0];
grpc_channel_element *elem = op->transport_private.extra_arg;
channel_data *chand = elem->channel_data;
if (op->on_connectivity_state_change != NULL) {

@ -210,7 +210,8 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
calld->send_op->send_message = &calld->replacement_stream.base;
calld->send_op->payload->send_message.send_message =
&calld->replacement_stream.base;
calld->post_send = calld->send_op->on_complete;
calld->send_op->on_complete = &calld->send_done;

@ -141,11 +141,13 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_transport_stream_op* op) {
call_data* calld = elem->call_data;
// Check max send message size.
if (op->send_message != NULL && calld->max_send_size >= 0 &&
op->send_message->length > (size_t)calld->max_send_size) {
if (op->send_message && calld->max_send_size >= 0 &&
op->payload->send_message.send_message->length >
(size_t)calld->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->send_message->length, calld->max_send_size);
op->payload->send_message.send_message->length,
calld->max_send_size);
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, grpc_error_set_int(GRPC_ERROR_CREATE(message_string),
GRPC_ERROR_INT_GRPC_STATUS,
@ -154,10 +156,11 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
return;
}
// Inject callback for receiving a message.
if (op->recv_message_ready != NULL) {
calld->next_recv_message_ready = op->recv_message_ready;
calld->recv_message = op->recv_message;
op->recv_message_ready = &calld->recv_message_ready;
if (op->payload->recv_message.recv_message_ready != NULL) {
calld->next_recv_message_ready =
op->payload->recv_message.recv_message_ready;
calld->recv_message = op->payload->recv_message.recv_message;
op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
}
// Chain to the next filter.
grpc_call_next_op(exec_ctx, elem, op);

@ -120,8 +120,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data,
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED);
} else {
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
GPR_ASSERT(op->send_initial_metadata != NULL);
mdb = op->send_initial_metadata;
GPR_ASSERT(op->send_initial_metadata);
mdb = op->payload->send_initial_metadata.send_initial_metadata;
for (i = 0; i < num_md; i++) {
add_error(&error,
grpc_metadata_batch_add_tail(
@ -174,7 +174,9 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_client_security_context *ctx =
(grpc_client_security_context *)op->context[GRPC_CONTEXT_SECURITY].value;
(grpc_client_security_context *)op->payload
->context[GRPC_CONTEXT_SECURITY]
.value;
grpc_call_credentials *channel_call_creds =
chand->security_connector->request_metadata_creds;
int call_creds_has_md = (ctx != NULL) && (ctx->creds != NULL);
@ -248,23 +250,25 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *l;
grpc_client_security_context *sec_ctx = NULL;
if (calld->security_context_set == 0 && op->cancel_error == GRPC_ERROR_NONE) {
if (calld->security_context_set == 0 && !op->cancel_stream) {
calld->security_context_set = 1;
GPR_ASSERT(op->context);
if (op->context[GRPC_CONTEXT_SECURITY].value == NULL) {
op->context[GRPC_CONTEXT_SECURITY].value =
GPR_ASSERT(op->payload->context != NULL);
if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
op->payload->context[GRPC_CONTEXT_SECURITY].value =
grpc_client_security_context_create();
op->context[GRPC_CONTEXT_SECURITY].destroy =
op->payload->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_client_security_context_destroy;
}
sec_ctx = op->context[GRPC_CONTEXT_SECURITY].value;
sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value;
GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
sec_ctx->auth_context =
GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (op->send_initial_metadata != NULL) {
for (l = op->send_initial_metadata->list.head; l != NULL; l = l->next) {
if (op->send_initial_metadata) {
for (l = op->payload->send_initial_metadata.send_initial_metadata->list
.head;
l != NULL; l = l->next) {
grpc_mdelem md = l->md;
/* Pointer comparison is OK for md_elems created from the same context.
*/

@ -139,9 +139,10 @@ static void on_md_processing_done(
? error_details
: "Authentication metadata processing failed.";
calld->transport_op->send_initial_metadata = NULL;
if (calld->transport_op->send_message != NULL) {
grpc_byte_stream_destroy(&exec_ctx, calld->transport_op->send_message);
calld->transport_op->send_message = NULL;
if (calld->transport_op->send_message) {
grpc_byte_stream_destroy(
&exec_ctx, calld->transport_op->payload->send_message.send_message);
calld->transport_op->send_message = false;
}
calld->transport_op->send_trailing_metadata = NULL;
grpc_closure_sched(&exec_ctx, calld->on_done_recv,
@ -173,11 +174,14 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata != NULL) {
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->auth_on_recv;
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
calld->on_done_recv =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->auth_on_recv;
calld->transport_op = op;
}
}

@ -84,10 +84,12 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->recv_initial_metadata != NULL) {
fill_metadata(exec_ctx, elem, op->recv_initial_metadata);
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(exec_ctx, elem, op->recv_trailing_metadata);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
} else if (op->recv_trailing_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, GRPC_ERROR_CREATE("lame client channel"));

@ -154,8 +154,7 @@ struct call_data {
grpc_completion_queue *cq_new;
grpc_metadata_batch *recv_initial_metadata;
bool recv_idempotent_request;
bool recv_cacheable_request;
uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata;
request_matcher *request_matcher;
@ -498,13 +497,7 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
rc->data.batch.details->deadline = calld->deadline;
rc->data.batch.details->flags =
(calld->recv_idempotent_request
? GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST
: 0) |
(calld->recv_cacheable_request
? GRPC_INITIAL_METADATA_CACHEABLE_REQUEST
: 0);
rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
break;
case REGISTERED_CALL:
*rc->data.registered.deadline = calld->deadline;
@ -632,7 +625,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (!grpc_slice_eq(rm->host, calld->host)) continue;
if (!grpc_slice_eq(rm->method, calld->path)) continue;
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request) {
0 == (calld->recv_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
@ -649,7 +643,8 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
if (rm->has_host) continue;
if (!grpc_slice_eq(rm->method, calld->path)) continue;
if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
!calld->recv_idempotent_request) {
0 == (calld->recv_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
continue;
}
finish_start_new_rpc(exec_ctx, server, elem,
@ -783,13 +778,16 @@ static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata != NULL) {
GPR_ASSERT(op->recv_idempotent_request == NULL);
calld->recv_initial_metadata = op->recv_initial_metadata;
calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
op->recv_idempotent_request = &calld->recv_idempotent_request;
op->recv_cacheable_request = &calld->recv_cacheable_request;
if (op->recv_initial_metadata) {
GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == NULL);
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
calld->on_done_recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->server_on_recv_initial_metadata;
op->payload->recv_initial_metadata.recv_flags =
&calld->recv_initial_metadata_flags;
}
}

@ -81,45 +81,49 @@ char *grpc_transport_stream_op_string(grpc_transport_stream_op *op) {
gpr_strvec_add(
&b, gpr_strdup(op->covered_by_poller ? "[COVERED]" : "[UNCOVERED]"));
if (op->send_initial_metadata != NULL) {
if (op->send_initial_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("SEND_INITIAL_METADATA{"));
put_metadata_list(&b, *op->send_initial_metadata);
put_metadata_list(
&b, *op->payload->send_initial_metadata.send_initial_metadata);
gpr_strvec_add(&b, gpr_strdup("}"));
}
if (op->send_message != NULL) {
if (op->send_message) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
op->send_message->flags, op->send_message->length);
op->payload->send_message.send_message->flags,
op->payload->send_message.send_message->length);
gpr_strvec_add(&b, tmp);
}
if (op->send_trailing_metadata != NULL) {
if (op->send_trailing_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("SEND_TRAILING_METADATA{"));
put_metadata_list(&b, *op->send_trailing_metadata);
put_metadata_list(
&b, *op->payload->send_trailing_metadata.send_trailing_metadata);
gpr_strvec_add(&b, gpr_strdup("}"));
}
if (op->recv_initial_metadata != NULL) {
if (op->recv_initial_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("RECV_INITIAL_METADATA"));
}
if (op->recv_message != NULL) {
if (op->recv_message) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
}
if (op->recv_trailing_metadata != NULL) {
if (op->recv_trailing_metadata) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_strvec_add(&b, gpr_strdup("RECV_TRAILING_METADATA"));
}
if (op->cancel_error != GRPC_ERROR_NONE) {
if (op->cancel_stream) {
gpr_strvec_add(&b, gpr_strdup(" "));
const char *msg = grpc_error_string(op->cancel_error);
const char *msg =
grpc_error_string(op->payload->cancel_stream.cancel_error);
gpr_asprintf(&tmp, "CANCEL:%s", msg);
gpr_strvec_add(&b, tmp);

@ -220,9 +220,10 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
if (op->recv_initial_metadata != NULL) {
calld->recv_im_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready =
if (op->recv_initial_metadata) {
calld->recv_im_ready =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
grpc_closure_create(recv_im_ready, elem, grpc_schedule_on_exec_ctx);
}
grpc_call_next_op(exec_ctx, elem, op);

Loading…
Cancel
Save