diff --git a/BUILD b/BUILD index a4649d5024e..97a85555115 100644 --- a/BUILD +++ b/BUILD @@ -145,7 +145,6 @@ cc_library( "src/core/channel/client_setup.h", "src/core/channel/connected_channel.h", "src/core/channel/http_client_filter.h", - "src/core/channel/http_filter.h", "src/core/channel/http_server_filter.h", "src/core/channel/noop_filter.h", "src/core/compression/algorithm.h", @@ -245,7 +244,6 @@ cc_library( "src/core/tsi/fake_transport_security.c", "src/core/tsi/ssl_transport_security.c", "src/core/tsi/transport_security.c", - "src/core/channel/call_op_string.c", "src/core/channel/census_filter.c", "src/core/channel/channel_args.c", "src/core/channel/channel_stack.c", @@ -254,7 +252,6 @@ cc_library( "src/core/channel/client_setup.c", "src/core/channel/connected_channel.c", "src/core/channel/http_client_filter.c", - "src/core/channel/http_filter.c", "src/core/channel/http_server_filter.c", "src/core/channel/noop_filter.c", "src/core/compression/algorithm.c", @@ -342,6 +339,7 @@ cc_library( "src/core/transport/metadata.c", "src/core/transport/stream_op.c", "src/core/transport/transport.c", + "src/core/transport/transport_op_string.c", ], hdrs = [ "include/grpc/grpc_security.h", @@ -373,7 +371,6 @@ cc_library( "src/core/channel/client_setup.h", "src/core/channel/connected_channel.h", "src/core/channel/http_client_filter.h", - "src/core/channel/http_filter.h", "src/core/channel/http_server_filter.h", "src/core/channel/noop_filter.h", "src/core/compression/algorithm.h", @@ -454,7 +451,6 @@ cc_library( "src/core/transport/transport.h", "src/core/transport/transport_impl.h", "src/core/surface/init_unsecure.c", - "src/core/channel/call_op_string.c", "src/core/channel/census_filter.c", "src/core/channel/channel_args.c", "src/core/channel/channel_stack.c", @@ -463,7 +459,6 @@ cc_library( "src/core/channel/client_setup.c", "src/core/channel/connected_channel.c", "src/core/channel/http_client_filter.c", - "src/core/channel/http_filter.c", "src/core/channel/http_server_filter.c", "src/core/channel/noop_filter.c", "src/core/compression/algorithm.c", @@ -551,6 +546,7 @@ cc_library( "src/core/transport/metadata.c", "src/core/transport/stream_op.c", "src/core/transport/transport.c", + "src/core/transport/transport_op_string.c", ], hdrs = [ "include/grpc/byte_buffer.h", diff --git a/Makefile b/Makefile index 218d68f733e..b09a63fef0c 100644 --- a/Makefile +++ b/Makefile @@ -2603,7 +2603,6 @@ LIBGRPC_SRC = \ src/core/tsi/fake_transport_security.c \ src/core/tsi/ssl_transport_security.c \ src/core/tsi/transport_security.c \ - src/core/channel/call_op_string.c \ src/core/channel/census_filter.c \ src/core/channel/channel_args.c \ src/core/channel/channel_stack.c \ @@ -2612,7 +2611,6 @@ LIBGRPC_SRC = \ src/core/channel/client_setup.c \ src/core/channel/connected_channel.c \ src/core/channel/http_client_filter.c \ - src/core/channel/http_filter.c \ src/core/channel/http_server_filter.c \ src/core/channel/noop_filter.c \ src/core/compression/algorithm.c \ @@ -2700,6 +2698,7 @@ LIBGRPC_SRC = \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ + src/core/transport/transport_op_string.c \ PUBLIC_HEADERS_C += \ include/grpc/grpc_security.h \ @@ -2750,7 +2749,6 @@ src/core/surface/secure_channel_create.c: $(OPENSSL_DEP) src/core/tsi/fake_transport_security.c: $(OPENSSL_DEP) src/core/tsi/ssl_transport_security.c: $(OPENSSL_DEP) src/core/tsi/transport_security.c: $(OPENSSL_DEP) -src/core/channel/call_op_string.c: $(OPENSSL_DEP) src/core/channel/census_filter.c: $(OPENSSL_DEP) src/core/channel/channel_args.c: $(OPENSSL_DEP) src/core/channel/channel_stack.c: $(OPENSSL_DEP) @@ -2759,7 +2757,6 @@ src/core/channel/client_channel.c: $(OPENSSL_DEP) src/core/channel/client_setup.c: $(OPENSSL_DEP) src/core/channel/connected_channel.c: $(OPENSSL_DEP) src/core/channel/http_client_filter.c: $(OPENSSL_DEP) -src/core/channel/http_filter.c: $(OPENSSL_DEP) src/core/channel/http_server_filter.c: $(OPENSSL_DEP) src/core/channel/noop_filter.c: $(OPENSSL_DEP) src/core/compression/algorithm.c: $(OPENSSL_DEP) @@ -2847,6 +2844,7 @@ src/core/transport/chttp2_transport.c: $(OPENSSL_DEP) src/core/transport/metadata.c: $(OPENSSL_DEP) src/core/transport/stream_op.c: $(OPENSSL_DEP) src/core/transport/transport.c: $(OPENSSL_DEP) +src/core/transport/transport_op_string.c: $(OPENSSL_DEP) endif $(LIBDIR)/$(CONFIG)/libgrpc.a: $(ZLIB_DEP) $(OPENSSL_DEP) $(LIBGRPC_OBJS) @@ -2913,7 +2911,6 @@ $(OBJDIR)/$(CONFIG)/src/core/surface/secure_channel_create.o: $(OBJDIR)/$(CONFIG)/src/core/tsi/fake_transport_security.o: $(OBJDIR)/$(CONFIG)/src/core/tsi/ssl_transport_security.o: $(OBJDIR)/$(CONFIG)/src/core/tsi/transport_security.o: -$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o: $(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o: $(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o: @@ -2922,7 +2919,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o: $(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o: $(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o: $(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o: -$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o: $(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o: @@ -3010,6 +3006,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o: $(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o: $(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o: $(OBJDIR)/$(CONFIG)/src/core/transport/transport.o: +$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o: LIBGRPC_TEST_UTIL_SRC = \ @@ -3090,7 +3087,6 @@ $(OBJDIR)/$(CONFIG)/test/core/util/slice_splitter.o: LIBGRPC_UNSECURE_SRC = \ src/core/surface/init_unsecure.c \ - src/core/channel/call_op_string.c \ src/core/channel/census_filter.c \ src/core/channel/channel_args.c \ src/core/channel/channel_stack.c \ @@ -3099,7 +3095,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/channel/client_setup.c \ src/core/channel/connected_channel.c \ src/core/channel/http_client_filter.c \ - src/core/channel/http_filter.c \ src/core/channel/http_server_filter.c \ src/core/channel/noop_filter.c \ src/core/compression/algorithm.c \ @@ -3187,6 +3182,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/transport/metadata.c \ src/core/transport/stream_op.c \ src/core/transport/transport.c \ + src/core/transport/transport_op_string.c \ PUBLIC_HEADERS_C += \ include/grpc/byte_buffer.h \ @@ -3231,7 +3227,6 @@ ifneq ($(NO_DEPS),true) endif $(OBJDIR)/$(CONFIG)/src/core/surface/init_unsecure.o: -$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o: $(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o: $(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o: @@ -3240,7 +3235,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o: $(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o: $(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o: $(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o: -$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o: $(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o: $(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o: @@ -3328,6 +3322,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o: $(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o: $(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o: $(OBJDIR)/$(CONFIG)/src/core/transport/transport.o: +$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o: LIBGRPC++_SRC = \ diff --git a/build.json b/build.json index 89cf8de3a62..43a81ecb7cb 100644 --- a/build.json +++ b/build.json @@ -96,7 +96,6 @@ "src/core/channel/client_setup.h", "src/core/channel/connected_channel.h", "src/core/channel/http_client_filter.h", - "src/core/channel/http_filter.h", "src/core/channel/http_server_filter.h", "src/core/channel/noop_filter.h", "src/core/compression/algorithm.h", @@ -178,7 +177,6 @@ "src/core/transport/transport_impl.h" ], "src": [ - "src/core/channel/call_op_string.c", "src/core/channel/census_filter.c", "src/core/channel/channel_args.c", "src/core/channel/channel_stack.c", @@ -187,7 +185,6 @@ "src/core/channel/client_setup.c", "src/core/channel/connected_channel.c", "src/core/channel/http_client_filter.c", - "src/core/channel/http_filter.c", "src/core/channel/http_server_filter.c", "src/core/channel/noop_filter.c", "src/core/compression/algorithm.c", @@ -274,7 +271,8 @@ "src/core/transport/chttp2_transport.c", "src/core/transport/metadata.c", "src/core/transport/stream_op.c", - "src/core/transport/transport.c" + "src/core/transport/transport.c", + "src/core/transport/transport_op_string.c" ] } ], diff --git a/src/core/channel/census_filter.c b/src/core/channel/census_filter.c index 9c0c20af221..3e0fc39fc9a 100644 --- a/src/core/channel/census_filter.c +++ b/src/core/channel/census_filter.c @@ -49,6 +49,11 @@ typedef struct call_data { census_op_id op_id; census_rpc_stats stats; gpr_timespec start_ts; + + /* recv callback */ + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -60,55 +65,58 @@ static void init_rpc_stats(census_rpc_stats* stats) { stats->cnt = 1; } -static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld, +static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld, channel_data* chand) { grpc_linked_mdelem* m; - for (m = op->data.metadata.list.head; m != NULL; m = m->next) { - if (m->md->key == chand->path_str) { - gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); - census_add_method_tag( - calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + size_t i; + for (i = 0; i < sopb->nops; i++) { + grpc_stream_op * op = &sopb->ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + for (m = op->data.metadata.list.head; m != NULL; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + census_add_method_tag( + calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice)); + } } } } -static void client_call_op(grpc_call_element* elem, - grpc_call_element* from_elem, grpc_call_op* op) { +static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); - switch (op->type) { - case GRPC_SEND_METADATA: - extract_and_annotate_method_tag(op, calld, chand); - break; - case GRPC_RECV_FINISH: - /* Should we stop timing the rpc here? */ - break; - default: - break; + if (op->send_ops) { + extract_and_annotate_method_tag(op->send_ops, calld, chand); } - /* Always pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); } -static void server_call_op(grpc_call_element* elem, - grpc_call_element* from_elem, grpc_call_op* op) { +static void server_on_done_recv(void *ptr, int success) { + grpc_call_element *elem = ptr; + call_data* calld = elem->call_data; + channel_data* chand = elem->channel_data; + if (success) { + extract_and_annotate_method_tag(calld->recv_ops, calld, chand); + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) { call_data* calld = elem->call_data; channel_data* chand = elem->channel_data; GPR_ASSERT(calld != NULL); GPR_ASSERT(chand != NULL); GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0)); - switch (op->type) { - case GRPC_RECV_METADATA: - extract_and_annotate_method_tag(op, calld, chand); - break; - case GRPC_SEND_FINISH: - /* Should we stop timing the rpc here? */ - break; - default: - break; + if (op->recv_ops) { + /* substitute our callback for the op callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = server_on_done_recv; + op->recv_user_data = elem; } /* Always pass control up or down the stack depending on op->dir */ grpc_call_next_op(elem, op); @@ -180,11 +188,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) { } const grpc_channel_filter grpc_client_census_filter = { - client_call_op, channel_op, sizeof(call_data), client_init_call_elem, + client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem, client_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_call_op, channel_op, sizeof(call_data), server_init_call_elem, + server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem, server_destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "census-server"}; diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index 3a3a3a75b70..c121e270056 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -35,6 +35,7 @@ #include #include +#include int grpc_trace_channel = 0; @@ -181,12 +182,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) { } } -void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) { - grpc_call_element *next_elem = elem + op->dir; - if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) { - grpc_metadata_batch_assert_ok(&op->data.metadata); - } - next_elem->filter->call_op(next_elem, elem, op); +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) { + grpc_call_element *next_elem = elem + 1; + next_elem->filter->start_transport_op(next_elem, op); } void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) { @@ -205,39 +203,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { sizeof(grpc_call_stack))); } -static void do_nothing(void *user_data, grpc_op_error error) {} - void grpc_call_element_send_cancel(grpc_call_element *cur_elem) { - grpc_call_op cancel_op; - cancel_op.type = GRPC_CANCEL_OP; - cancel_op.dir = GRPC_CALL_DOWN; - cancel_op.done_cb = do_nothing; - cancel_op.user_data = NULL; - cancel_op.flags = 0; - cancel_op.bind_pollset = NULL; - grpc_call_next_op(cur_elem, &cancel_op); -} - -void grpc_call_element_send_finish(grpc_call_element *cur_elem) { - grpc_call_op finish_op; - finish_op.type = GRPC_SEND_FINISH; - finish_op.dir = GRPC_CALL_DOWN; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - finish_op.flags = 0; - finish_op.bind_pollset = NULL; - grpc_call_next_op(cur_elem, &finish_op); + grpc_transport_op op; + memset(&op, 0, sizeof(op)); + op.cancel_with_status = GRPC_STATUS_CANCELLED; + grpc_call_next_op(cur_elem, &op); } void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message) { - grpc_call_op op; - op.type = GRPC_RECV_SYNTHETIC_STATUS; - op.dir = GRPC_CALL_UP; - op.done_cb = do_nothing; - op.user_data = NULL; - op.data.synthetic_status.status = status; - op.data.synthetic_status.message = message; - grpc_call_next_op(cur_elem, &op); + abort(); } diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index addc92b2727..75897ff6516 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -51,78 +51,11 @@ typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; -/* Call operations - things that can be sent and received. - - Threading: - SEND, RECV, and CANCEL ops can be active on a call at the same time, but - only one SEND, one RECV, and one CANCEL can be active at a time. - - If state is shared between send/receive/cancel operations, it is up to - filters to provide their own protection around that. */ -typedef enum { - /* send metadata to the channels peer */ - GRPC_SEND_METADATA, - /* send a message to the channels peer */ - GRPC_SEND_MESSAGE, - /* send a pre-formatted message to the channels peer */ - GRPC_SEND_PREFORMATTED_MESSAGE, - /* send half-close to the channels peer */ - GRPC_SEND_FINISH, - /* request that more data be allowed through flow control */ - GRPC_REQUEST_DATA, - /* metadata was received from the channels peer */ - GRPC_RECV_METADATA, - /* a message was received from the channels peer */ - GRPC_RECV_MESSAGE, - /* half-close was received from the channels peer */ - GRPC_RECV_HALF_CLOSE, - /* full close was received from the channels peer */ - GRPC_RECV_FINISH, - /* a status has been sythesized locally */ - GRPC_RECV_SYNTHETIC_STATUS, - /* the call has been abnormally terminated */ - GRPC_CANCEL_OP -} grpc_call_op_type; - /* The direction of the call. The values of the enums (1, -1) matter here - they are used to increment or decrement a pointer to find the next element to call */ typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir; -/* A single filterable operation to be performed on a call */ -typedef struct { - /* The type of operation we're performing */ - grpc_call_op_type type; - /* The directionality of this call - does the operation begin at the bottom - of the stack and flow up, or does the operation start at the top of the - stack and flow down through the filters. */ - grpc_call_dir dir; - - /* Flags associated with this call: see GRPC_WRITE_* in grpc.h */ - gpr_uint32 flags; - - /* Argument data, matching up with grpc_call_op_type names */ - union { - grpc_byte_buffer *message; - grpc_metadata_batch metadata; - struct { - grpc_status_code status; - const char *message; - } synthetic_status; - } data; - - grpc_pollset *bind_pollset; - - /* Must be called when processing of this call-op is complete. - Signature chosen to match transport flow control callbacks */ - void (*done_cb)(void *user_data, grpc_op_error error); - /* User data to be passed into done_cb */ - void *user_data; -} grpc_call_op; - -/* returns a string representation of op, that can be destroyed with gpr_free */ -char *grpc_call_op_string(grpc_call_op *op); - typedef enum { /* send a goaway message to remote channels indicating that we are going to disconnect in the future */ @@ -170,8 +103,7 @@ typedef struct { typedef struct { /* Called to eg. send/receive data on a call. See grpc_call_next_op on how to call the next element in the stack */ - void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op); + void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op); /* Called to handle channel level operations - e.g. new calls, or transport closure. See grpc_channel_next_op on how to call the next element in the stack */ @@ -272,8 +204,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_call_stack *stack); -/* Call the next operation (depending on call directionality) in a call stack */ -void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op); +/* Call the next operation in a call stack */ +void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op); /* Call the next operation (depending on call directionality) in a channel stack */ void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op); @@ -285,10 +217,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element( grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem); void grpc_call_log_op(char *file, int line, gpr_log_severity severity, - grpc_call_element *elem, grpc_call_op *op); + grpc_call_element *elem, grpc_transport_op *op); void grpc_call_element_send_cancel(grpc_call_element *cur_elem); -void grpc_call_element_send_finish(grpc_call_element *cur_elem); void grpc_call_element_recv_status(grpc_call_element *cur_elem, grpc_status_code status, const char *message); diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c index 2cb03829c79..244417384a7 100644 --- a/src/core/channel/child_channel.c +++ b/src/core/channel/child_channel.c @@ -61,22 +61,11 @@ typedef struct { } lb_channel_data; typedef struct { - grpc_call_element *back; grpc_child_channel *channel; } lb_call_data; -static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - lb_call_data *calld = elem->call_data; - - switch (op->dir) { - case GRPC_CALL_UP: - calld->back->filter->call_op(calld->back, elem, op); - break; - case GRPC_CALL_DOWN: - grpc_call_next_op(elem, op); - break; - } +static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { + grpc_call_next_op(elem, op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -165,7 +154,7 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_child_channel_top_filter = { - lb_call_op, lb_channel_op, sizeof(lb_call_data), + lb_start_transport_op, lb_channel_op, sizeof(lb_call_data), lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data), lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", }; @@ -282,7 +271,6 @@ grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel, lbelem = LINK_BACK_ELEM_FROM_CALL(stk); lbchand = lbelem->channel_data; lbcalld = lbelem->call_data; - lbcalld->back = parent; lbcalld->channel = channel; gpr_mu_lock(&lbchand->mu); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index bc481e59ca5..6ad50cb9448 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -82,7 +82,7 @@ struct call_data { /* owning element */ grpc_call_element *elem; - gpr_uint8 got_first_send; + gpr_uint8 got_first_op; call_state state; gpr_timespec deadline; @@ -91,7 +91,7 @@ struct call_data { /* our child call stack */ grpc_child_call *child_call; } active; - grpc_call_op waiting_op; + grpc_transport_op waiting_op; } s; }; @@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem, return 1; } -static void do_nothing(void *ignored, grpc_op_error error) {} - -static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { +static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; grpc_call_element *child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); @@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) { /* continue the start call down the stack, this nees to happen after metadata are flushed*/ - child_elem->filter->call_op(child_elem, elem, op); + child_elem->filter->start_transport_op(child_elem, op); } -static void start_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu); if (calld->state == CALL_CANCELLED) { gpr_mu_unlock(&chand->mu); - grpc_metadata_batch_destroy(&op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(op); return; } GPR_ASSERT(calld->state == CALL_CREATED); @@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) { } static void send_up_cancelled_ops(grpc_call_element *elem) { - grpc_call_op finish_op; - /* send up a synthesized status */ - grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled"); - /* send up a finish */ - finish_op.type = GRPC_RECV_FINISH; - finish_op.dir = GRPC_CALL_UP; - finish_op.flags = 0; - finish_op.done_cb = do_nothing; - finish_op.user_data = NULL; - grpc_call_next_op(elem, &finish_op); + abort(); } -static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { +static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_call_element *child_elem; @@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { case CALL_ACTIVE: child_elem = grpc_child_call_get_top_element(calld->s.active.child_call); gpr_mu_unlock(&chand->mu); - child_elem->filter->call_op(child_elem, elem, op); + child_elem->filter->start_transport_op(child_elem, op); return; /* early out */ case CALL_WAITING: - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); remove_waiting_child(chand, calld); calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu); send_up_cancelled_ops(elem); - calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR); return; /* early out */ case CALL_CREATED: calld->state = CALL_CANCELLED; @@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) { abort(); } -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void cc_start_transport_op(grpc_call_element *elem, + grpc_transport_op *op) { call_data *calld = elem->call_data; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_SEND_METADATA: - if (!calld->got_first_send) { - /* filter out the start event to find which child to send on */ - calld->got_first_send = 1; - start_rpc(elem, op); - } else { - grpc_call_next_op(elem, op); - } - break; - case GRPC_CANCEL_OP: - cancel_rpc(elem, op); - break; - case GRPC_SEND_MESSAGE: - case GRPC_SEND_FINISH: - case GRPC_REQUEST_DATA: - if (calld->state == CALL_ACTIVE) { - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - child_elem->filter->call_op(child_elem, elem, op); - } else { - op->done_cb(op->user_data, GRPC_OP_ERROR); - } - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; + if (op->cancel_with_status != GRPC_STATUS_OK) { + GPR_ASSERT(op->send_ops == NULL); + GPR_ASSERT(op->recv_ops == NULL); + + cancel_rpc(elem, op); + return; + } + + if (!calld->got_first_op) { + calld->got_first_op = 1; + start_rpc(elem, op); + } else { + grpc_call_element *child_elem = + grpc_child_call_get_top_element(calld->s.active.child_call); + child_elem->filter->start_transport_op(child_elem, op); } } @@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem, calld->elem = elem; calld->state = CALL_CREATED; calld->deadline = gpr_inf_future; - calld->got_first_send = 0; + calld->got_first_op = 0; } /* Destructor for call_data */ @@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) { if (calld->state == CALL_ACTIVE) { grpc_child_call_destroy(calld->s.active.child_call); } - if (calld->state == CALL_WAITING) { - grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata); - } + GPR_ASSERT(calld->state != CALL_WAITING); } /* Constructor for channel_data */ @@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_client_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "client-channel", }; @@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( call_data **waiting_children; size_t waiting_child_count; size_t i; - grpc_call_op *call_ops; + grpc_transport_op *call_ops; /* build the child filter stack */ child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters); @@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete( chand->waiting_child_count = 0; chand->waiting_child_capacity = 0; - call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count); + call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count); for (i = 0; i < waiting_child_count; i++) { call_ops[i] = waiting_children[i]->s.waiting_op; if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) { waiting_children[i] = NULL; - call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR); + grpc_transport_op_finish_with_failure(&call_ops[i]); } } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 711274bfe1b..d0b834a10a1 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -50,19 +50,10 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; - gpr_uint32 max_message_length; } channel_data; typedef struct connected_channel_call_data { - grpc_call_element *elem; - grpc_stream_op_buffer outgoing_sopb; - - gpr_uint32 max_message_length; - gpr_uint32 incoming_message_length; - gpr_uint8 reading_message; - gpr_uint8 got_read_close; - gpr_slice_buffer incoming_message; - gpr_uint32 outgoing_buffer_length_estimate; + void *unused; } call_data; /* We perform a small hack to locate transport data alongside the connected @@ -72,6 +63,7 @@ typedef struct connected_channel_call_data { #define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \ (((call_data *)(transport_stream)) - 1) +#if 0 /* Copy the contents of a byte buffer into stream ops */ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, grpc_stream_op_buffer *sopb) { @@ -87,76 +79,17 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer, break; } } - -/* Flush queued stream operations onto the transport */ -static void end_bufferable_op(grpc_call_op *op, channel_data *chand, - call_data *calld, int is_last) { - size_t nops; - - if (op->flags & GRPC_WRITE_BUFFER_HINT) { - if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) { - op->done_cb(op->user_data, GRPC_OP_OK); - return; - } - } - - calld->outgoing_buffer_length_estimate = 0; - grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data); - - nops = calld->outgoing_sopb.nops; - calld->outgoing_sopb.nops = 0; - grpc_transport_send_batch(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - calld->outgoing_sopb.ops, nops, is_last); -} +#endif /* Intercept a call operation and either push it directly up or translate it into transport stream operations */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - if (op->bind_pollset) { - grpc_transport_add_to_pollset(chand->transport, op->bind_pollset); - } - - switch (op->type) { - case GRPC_SEND_METADATA: - grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_MESSAGE: - grpc_sopb_add_begin_message(&calld->outgoing_sopb, - grpc_byte_buffer_length(op->data.message), - op->flags); - /* fall-through */ - case GRPC_SEND_PREFORMATTED_MESSAGE: - copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb); - calld->outgoing_buffer_length_estimate += - (5 + grpc_byte_buffer_length(op->data.message)); - end_bufferable_op(op, chand, calld, 0); - break; - case GRPC_SEND_FINISH: - end_bufferable_op(op, chand, calld, 1); - break; - case GRPC_REQUEST_DATA: - /* re-arm window updates if they were disarmed by finish_message */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1); - break; - case GRPC_CANCEL_OP: - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_CANCELLED); - break; - default: - GPR_ASSERT(op->dir == GRPC_CALL_UP); - grpc_call_next_op(elem, op); - break; - } + grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); } /* Currently we assume all channel operations should just be pushed up. */ @@ -188,14 +121,6 @@ static void init_call_elem(grpc_call_element *elem, int r; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - calld->elem = elem; - grpc_sopb_init(&calld->outgoing_sopb); - - calld->reading_message = 0; - calld->got_read_close = 0; - calld->outgoing_buffer_length_estimate = 0; - calld->max_message_length = chand->max_message_length; - gpr_slice_buffer_init(&calld->incoming_message); r = grpc_transport_init_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), server_transport_data); @@ -207,8 +132,6 @@ static void destroy_call_elem(grpc_call_element *elem) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - grpc_sopb_destroy(&calld->outgoing_sopb); - gpr_slice_buffer_destroy(&calld->incoming_message); grpc_transport_destroy_stream(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld)); } @@ -218,12 +141,12 @@ static void init_channel_elem(grpc_channel_element *elem, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; - size_t i; GPR_ASSERT(!is_first); GPR_ASSERT(is_last); GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); cd->transport = NULL; +#if 0 cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH; if (args) { for (i = 0; i < args->num_args; i++) { @@ -240,6 +163,7 @@ static void init_channel_elem(grpc_channel_element *elem, } } } +#endif } /* Destructor for channel_data */ @@ -250,15 +174,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_connected_channel_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected", }; -static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport, - grpc_stream *stream, size_t size_hint) { - return gpr_slice_malloc(size_hint); -} - /* Transport callback to accept a new stream... calls up to handle it */ static void accept_stream(void *user_data, grpc_transport *transport, const void *transport_server_data) { @@ -276,168 +195,6 @@ static void accept_stream(void *user_data, grpc_transport *transport, channel_op(elem, NULL, &op); } -static void recv_error(channel_data *chand, call_data *calld, int line, - const char *message) { - gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message); - - if (chand->transport) { - grpc_transport_abort_stream(chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), - GRPC_STATUS_INVALID_ARGUMENT); - } -} - -static void do_nothing(void *calldata, grpc_op_error error) {} - -static void finish_message(channel_data *chand, call_data *calld) { - grpc_call_element *elem = calld->elem; - grpc_call_op call_op; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - /* if we got all the bytes for this message, call up the stack */ - call_op.type = GRPC_RECV_MESSAGE; - call_op.done_cb = do_nothing; - /* TODO(ctiller): this could be a lot faster if coded directly */ - call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices, - calld->incoming_message.count); - gpr_slice_buffer_reset_and_unref(&calld->incoming_message); - - /* disable window updates until we get a request more from above */ - grpc_transport_set_allow_window_updates( - chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0); - - GPR_ASSERT(calld->incoming_message.count == 0); - calld->reading_message = 0; - grpc_call_next_op(elem, &call_op); -} - -static void got_metadata(grpc_call_element *elem, - grpc_metadata_batch metadata) { - grpc_call_op op; - op.type = GRPC_RECV_METADATA; - op.dir = GRPC_CALL_UP; - op.flags = 0; - op.data.metadata = metadata; - op.done_cb = do_nothing; - op.user_data = NULL; - - grpc_call_next_op(elem, &op); -} - -/* Handle incoming stream ops from the transport, translating them into - call_ops to pass up the call stack */ -static void recv_batch(void *user_data, grpc_transport *transport, - grpc_stream *stream, grpc_stream_op *ops, - size_t ops_count, grpc_stream_state final_state) { - call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream); - grpc_call_element *elem = calld->elem; - channel_data *chand = elem->channel_data; - grpc_stream_op *stream_op; - grpc_call_op call_op; - size_t i; - gpr_uint32 length; - - GPR_ASSERT(elem->filter == &grpc_connected_channel_filter); - - for (i = 0; i < ops_count; i++) { - stream_op = ops + i; - switch (stream_op->type) { - case GRPC_OP_FLOW_CTL_CB: - stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1); - break; - case GRPC_NO_OP: - break; - case GRPC_OP_METADATA: - got_metadata(elem, stream_op->data.metadata); - break; - case GRPC_OP_BEGIN_MESSAGE: - /* can't begin a message when we're still reading a message */ - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Message terminated early; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } - /* stash away parameters, and prepare for incoming slices */ - length = stream_op->data.begin_message.length; - if (length > calld->max_message_length) { - char *message = NULL; - gpr_asprintf( - &message, - "Maximum message length of %d exceeded by a message of length %d", - calld->max_message_length, length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } else if (length > 0) { - calld->reading_message = 1; - calld->incoming_message_length = length; - } else { - finish_message(chand, calld); - } - break; - case GRPC_OP_SLICE: - if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) { - gpr_slice_unref(stream_op->data.slice); - break; - } - /* we have to be reading a message to know what to do here */ - if (!calld->reading_message) { - recv_error(chand, calld, __LINE__, - "Received payload data while not reading a message"); - return; - } - /* append the slice to the incoming buffer */ - gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice); - if (calld->incoming_message.length > calld->incoming_message_length) { - /* if we got too many bytes, complain */ - char *message = NULL; - gpr_asprintf(&message, - "Receiving message overflow; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - return; - } else if (calld->incoming_message.length == - calld->incoming_message_length) { - finish_message(chand, calld); - } - } - } - /* if the stream closed, then call up the stack to let it know */ - if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED || - final_state == GRPC_STREAM_CLOSED)) { - calld->got_read_close = 1; - if (calld->reading_message) { - char *message = NULL; - gpr_asprintf(&message, - "Last message truncated; read %d bytes, expected %d", - (int)calld->incoming_message.length, - (int)calld->incoming_message_length); - recv_error(chand, calld, __LINE__, message); - gpr_free(message); - } - call_op.type = GRPC_RECV_HALF_CLOSE; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } - if (final_state == GRPC_STREAM_CLOSED) { - call_op.type = GRPC_RECV_FINISH; - call_op.dir = GRPC_CALL_UP; - call_op.flags = 0; - call_op.done_cb = do_nothing; - call_op.user_data = NULL; - grpc_call_next_op(elem, &call_op); - } -} - static void transport_goaway(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug) { /* transport got goaway ==> call up and handle it */ @@ -470,7 +227,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) { } const grpc_transport_callbacks connected_channel_transport_callbacks = { - alloc_recv_buffer, accept_stream, recv_batch, + accept_stream, transport_goaway, transport_closed, }; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index bc014b15fff..fb2ce4d34ad 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -39,6 +39,12 @@ typedef struct call_data { grpc_linked_mdelem scheme; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; + int sent_initial_metadata; + + int got_initial_metadata; + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -64,22 +70,39 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) { return md; } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void hc_on_recv(void *user_data, int success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + if (success) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); + } + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + size_t i; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_SEND_METADATA: + if (op->send_ops && !calld->sent_initial_metadata) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_initial_metadata = 1; /* Send : prefixed headers, which have to be before any application - * layer headers. */ + layer headers. */ grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, grpc_mdelem_ref(channeld->method)); grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, @@ -88,17 +111,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, grpc_mdelem_ref(channeld->te_trailers)); grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_mdelem_ref(channeld->content_type)); - grpc_call_next_op(elem, op); - break; - case GRPC_RECV_METADATA: - grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem); - grpc_call_next_op(elem, op); - break; - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); break; + } } + + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = hc_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -120,7 +146,11 @@ static void channel_op(grpc_channel_element *elem, /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) {} + const void *server_transport_data) { + call_data *calld = elem->call_data; + calld->sent_initial_metadata = 0; + calld->got_initial_metadata = 0; +} /* Destructor for call_data */ static void destroy_call_elem(grpc_call_element *elem) { @@ -181,6 +211,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_client_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "http-client"}; diff --git a/src/core/channel/http_filter.c b/src/core/channel/http_filter.c deleted file mode 100644 index 453a0422d85..00000000000 --- a/src/core/channel/http_filter.c +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/channel/http_filter.h" -#include - -typedef struct call_data { - int unused; /* C89 requires at least one struct element */ -} call_data; - -typedef struct channel_data { - int unused; /* C89 requires at least one struct element */ -} channel_data; - -/* used to silence 'variable not used' warnings */ -static void ignore_unused(void *ignored) {} - -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - - ignore_unused(calld); - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); - break; - } -} - -/* Called on special channel events, such as disconnection or new incoming - calls on the server */ -static void channel_op(grpc_channel_element *elem, - grpc_channel_element *from_elem, grpc_channel_op *op) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); - - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_channel_next_op(elem, op); - break; - } -} - -/* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, - const void *server_transport_data) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - /* initialize members */ - calld->unused = channeld->unused; -} - -/* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(calld); - ignore_unused(channeld); -} - -/* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - /* The first and the last filters tend to be implemented differently to - handle the case that there's no 'next' filter to call on the up or down - path */ - GPR_ASSERT(!is_first); - GPR_ASSERT(!is_last); - - /* initialize members */ - channeld->unused = 0; -} - -/* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ - channel_data *channeld = elem->channel_data; - - ignore_unused(channeld); -} - -const grpc_channel_filter grpc_http_filter = { - call_op, channel_op, sizeof(call_data), - init_call_elem, destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, "http"}; diff --git a/src/core/channel/http_filter.h b/src/core/channel/http_filter.h deleted file mode 100644 index 1b116ad61f4..00000000000 --- a/src/core/channel/http_filter.h +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H -#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H - -#include "src/core/channel/channel_stack.h" - -/* Processes metadata that is common to both client and server for HTTP2 - transports. */ -extern const grpc_channel_filter grpc_http_filter; - -#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index 85224f28e5b..d18c38d93c8 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -52,6 +52,10 @@ typedef struct call_data { gpr_uint8 seen_scheme; gpr_uint8 seen_te_trailers; grpc_linked_mdelem status; + + grpc_stream_op_buffer *recv_ops; + void (*on_done_recv)(void *user_data, int success); + void *recv_user_data; } call_data; typedef struct channel_data { @@ -143,63 +147,73 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void hs_on_recv(void *user_data, int success) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + if (success) { + size_t i; + size_t nops = calld->recv_ops->nops; + grpc_stream_op *ops = calld->recv_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->got_initial_metadata = 1; + grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); + /* Have we seen the required http2 transport headers? + (:method, :scheme, content-type, with :path and :authority covered + at the channel level right now) */ + if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && + calld->seen_path) { + /* do nothing */ + } else { + if (!calld->seen_post) { + gpr_log(GPR_ERROR, "Missing :method header"); + } + if (!calld->seen_scheme) { + gpr_log(GPR_ERROR, "Missing :scheme header"); + } + if (!calld->seen_te_trailers) { + gpr_log(GPR_ERROR, "Missing te trailers header"); + } + /* Error this call out */ + success = 0; + grpc_call_element_send_cancel(elem); + } + } + } + calld->on_done_recv(calld->recv_user_data, success); +} + +static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + size_t i; GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - switch (op->type) { - case GRPC_RECV_METADATA: - grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem); - if (!calld->got_initial_metadata) { - calld->got_initial_metadata = 1; - /* Have we seen the required http2 transport headers? - (:method, :scheme, content-type, with :path and :authority covered - at the channel level right now) */ - if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers && - calld->seen_path) { - grpc_call_next_op(elem, op); - } else { - if (!calld->seen_post) { - gpr_log(GPR_ERROR, "Missing :method header"); - } - if (!calld->seen_scheme) { - gpr_log(GPR_ERROR, "Missing :scheme header"); - } - if (!calld->seen_te_trailers) { - gpr_log(GPR_ERROR, "Missing te trailers header"); - } - /* Error this call out */ - grpc_metadata_batch_destroy(&op->data.metadata); - op->done_cb(op->user_data, GRPC_OP_OK); - grpc_call_element_send_cancel(elem); - } - } else { - grpc_call_next_op(elem, op); - } - break; - case GRPC_SEND_METADATA: - /* If we haven't sent status 200 yet, we need to so so because it needs to - come before any non : prefixed metadata. */ - if (!calld->sent_status) { - calld->sent_status = 1; - grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, - grpc_mdelem_ref(channeld->status_ok)); - } - grpc_call_next_op(elem, op); - break; - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); + if (op->send_ops && !calld->sent_status) { + size_t nops = op->send_ops->nops; + grpc_stream_op *ops = op->send_ops->ops; + for (i = 0; i < nops; i++) { + grpc_stream_op *op = &ops[i]; + if (op->type != GRPC_OP_METADATA) continue; + calld->sent_status = 1; + grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, + grpc_mdelem_ref(channeld->status_ok)); break; + } } + + if (op->recv_ops && !calld->got_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_ops = op->recv_ops; + calld->on_done_recv = op->on_done_recv; + calld->recv_user_data = op->recv_user_data; + op->on_done_recv = hs_on_recv; + op->recv_user_data = elem; + } + + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -321,6 +335,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_http_server_filter = { - call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, + hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "http-server"}; diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index d987fa2bc10..403b60901bf 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -50,8 +50,7 @@ static void ignore_unused(void *ignored) {} - a network event (or similar) from below, to receive something op contains type and call direction information, in addition to the data that is being sent or received. */ -static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, - grpc_call_op *op) { +static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; @@ -59,12 +58,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem, ignore_unused(calld); ignore_unused(channeld); - switch (op->type) { - default: - /* pass control up or down the stack depending on op->dir */ - grpc_call_next_op(elem, op); - break; - } + /* pass control down the stack */ + grpc_call_next_op(elem, op); } /* Called on special channel events, such as disconnection or new incoming @@ -131,6 +126,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) { } const grpc_channel_filter grpc_no_op_filter = { - call_op, channel_op, sizeof(call_data), + noop_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem, destroy_channel_elem, "no-op"}; diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 608442c0089..709ca0b3974 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -150,6 +150,9 @@ struct grpc_call { gpr_uint8 num_completed_requests; /* flag that we need to request more data */ gpr_uint8 need_more_data; + /* flags with bits corresponding to write states allowing us to determine + what was sent */ + gpr_uint8 last_send_contains; /* Active ioreqs. request_set and request_data contain one element per active ioreq @@ -214,6 +217,10 @@ struct grpc_call { size_t send_initial_metadata_count; gpr_timespec send_deadline; + grpc_stream_op_buffer send_ops; + grpc_stream_op_buffer recv_ops; + grpc_stream_state recv_state; + /* Data that the legacy api needs to track. To be deleted at some point soon */ legacy_state *legacy_state; @@ -234,9 +241,11 @@ struct grpc_call { } while (0) static void do_nothing(void *ignored, grpc_op_error also_ignored) {} -static send_action choose_send_action(grpc_call *call); -static void enact_send_action(grpc_call *call, send_action sa); static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); +static void call_on_done_recv(void *call, int success); +static void call_on_done_send(void *call, int success); +static int fill_send_ops(grpc_call *call, grpc_transport_op *op); +static void execute_op(grpc_call *call, grpc_transport_op *op); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, const void *server_transport_data, @@ -359,20 +368,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { return GRPC_CALL_OK; } -static void request_more_data(grpc_call *call) { - grpc_call_op op; - - /* call down */ - op.type = GRPC_REQUEST_DATA; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = do_nothing; - op.user_data = NULL; - op.bind_pollset = NULL; - - grpc_call_execute_op(call, &op); -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -384,16 +379,31 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static void unlock(grpc_call *call) { - send_action sa = SEND_NOTHING; + grpc_transport_op op; completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; int completing_requests = 0; - int need_more_data = - call->need_more_data && - (call->write_state >= WRITE_STATE_STARTED || !call->is_client); + int start_op = 0; int i; - if (need_more_data) { + memset(&op, 0, sizeof(op)); + + if (call->need_more_data && + (call->write_state >= WRITE_STATE_STARTED || !call->is_client)) { + op.recv_ops = &call->recv_ops; + op.recv_state = &call->recv_state; + op.on_done_recv = call_on_done_recv; + op.recv_user_data = call; call->need_more_data = 0; + grpc_call_internal_ref(call); + start_op = 1; + } + + if (!call->sending) { + if (fill_send_ops(call, &op)) { + call->sending = 1; + grpc_call_internal_ref(call); + start_op = 1; + } } if (!call->completing && call->num_completed_requests != 0) { @@ -405,22 +415,10 @@ static void unlock(grpc_call *call) { grpc_call_internal_ref(call); } - if (!call->sending) { - sa = choose_send_action(call); - if (sa != SEND_NOTHING) { - call->sending = 1; - grpc_call_internal_ref(call); - } - } - gpr_mu_unlock(&call->mu); - if (need_more_data) { - request_more_data(call); - } - - if (sa != SEND_NOTHING) { - enact_send_action(call, sa); + if (start_op) { + execute_op(call, &op); } if (completing_requests > 0) { @@ -577,6 +575,64 @@ static void finish_start_step(void *pc, grpc_op_error error) { error); } +static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, + grpc_metadata *metadata) { + size_t i; + grpc_mdelem_list out; + if (count == 0) { + out.head = out.tail = NULL; + return out; + } + for (i = 0; i < count; i++) { + grpc_metadata *md = &metadata[i]; + grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; + grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); + l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length); + l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; + l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; + } + out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); + out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); + return out; +} + +static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { + grpc_ioreq_data data; + grpc_metadata_batch mdb; + size_t i; + GPR_ASSERT(op->send_ops == NULL); + + switch (call->write_state) { + case WRITE_STATE_INITIAL: + if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) { + break; + } + data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; + mdb.list = chain_metadata_from_app( + call, data.send_metadata.count, data.send_metadata.metadata); + mdb.garbage.head = mdb.garbage.tail = NULL; + mdb.deadline = call->send_deadline; + for (i = 0; i < call->send_initial_metadata_count; i++) { + grpc_metadata_batch_link_head(&mdb, + &call->send_initial_metadata[i]); + } + grpc_sopb_add_metadata(&call->send_ops, mdb); + op->send_ops = &call->send_ops; + op->bind_pollset = grpc_cq_pollset(call->cq); + call->last_send_contains |= 1 << WRITE_STATE_INITIAL; + /* fall through intended */ + case WRITE_STATE_STARTED: + if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) { + data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; + grpc_sopb_add_message(data.send_message); +abort(); + } +} + static send_action choose_send_action(grpc_call *call) { switch (call->write_state) { case WRITE_STATE_INITIAL: @@ -614,33 +670,7 @@ static send_action choose_send_action(grpc_call *call) { return SEND_NOTHING; } -static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count, - grpc_metadata *metadata) { - size_t i; - grpc_mdelem_list out; - if (count == 0) { - out.head = out.tail = NULL; - return out; - } - for (i = 0; i < count; i++) { - grpc_metadata *md = &metadata[i]; - grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1]; - grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1]; - grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; - assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data)); - l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length); - l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL; - l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL; - } - out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data); - out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data); - return out; -} - static void enact_send_action(grpc_call *call, send_action sa) { - grpc_ioreq_data data; grpc_call_op op; size_t i; gpr_uint32 flags = 0; @@ -654,37 +684,11 @@ static void enact_send_action(grpc_call *call, send_action sa) { flags |= GRPC_WRITE_BUFFER_HINT; /* fallthrough */ case SEND_INITIAL_METADATA: - data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA]; - op.type = GRPC_SEND_METADATA; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.metadata.list = chain_metadata_from_app( - call, data.send_metadata.count, data.send_metadata.metadata); - op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL; - op.data.metadata.deadline = call->send_deadline; - for (i = 0; i < call->send_initial_metadata_count; i++) { - grpc_metadata_batch_link_head(&op.data.metadata, - &call->send_initial_metadata[i]); - } - call->send_initial_metadata_count = 0; - op.done_cb = finish_start_step; - op.user_data = call; - op.bind_pollset = grpc_cq_pollset(call->cq); - grpc_call_execute_op(call, &op); break; case SEND_BUFFERED_MESSAGE: flags |= GRPC_WRITE_BUFFER_HINT; /* fallthrough */ case SEND_MESSAGE: - data = call->request_data[GRPC_IOREQ_SEND_MESSAGE]; - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = flags; - op.data.message = data.send_message; - op.done_cb = finish_write_step; - op.user_data = call; - op.bind_pollset = NULL; - grpc_call_execute_op(call, &op); break; case SEND_TRAILING_METADATA_AND_FINISH: /* send trailing metadata */ diff --git a/src/core/surface/call.h b/src/core/surface/call.h index f8d0915349e..358e5560a35 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -106,7 +106,6 @@ void grpc_call_recv_message(grpc_call_element *surface_element, void grpc_call_read_closed(grpc_call_element *surface_element); void grpc_call_stream_closed(grpc_call_element *surface_element); -void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index e32ee284e09..86bd5d20986 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -503,7 +503,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, gpr_mu_lock(&t->mu); t->calling_back = 1; - ref_transport(t); + ref_transport(t); /* matches unref at end of this function */ gpr_mu_unlock(&t->mu); sr = setup(arg, &t->base, t->metadata_context); @@ -515,7 +515,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup, if (t->destroying) gpr_cv_signal(&t->cv); unlock(t); - ref_transport(t); + ref_transport(t); /* matches unref inside recv_data */ recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK); unref_transport(t); diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index c3901bf6088..dabe68f3bdd 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -50,22 +50,9 @@ typedef enum grpc_stream_op_code { Must be ignored by receivers */ GRPC_NO_OP, GRPC_OP_METADATA, - /* Begin a message/metadata element/status - as defined by - grpc_message_type. */ - GRPC_OP_BEGIN_MESSAGE, - /* Add a slice of data to the current message/metadata element/status. - Must not overflow the forward declared length. */ - GRPC_OP_SLICE + GRPC_OP_MESSAGE } grpc_stream_op_code; -/* Arguments for GRPC_OP_BEGIN */ -typedef struct grpc_begin_message { - /* How many bytes of data will this message contain */ - gpr_uint32 length; - /* Write flags for the message: see grpc.h GRPC_WRITE_xxx */ - gpr_uint32 flags; -} grpc_begin_message; - typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -118,9 +105,8 @@ typedef struct grpc_stream_op { /* the arguments to this operation. union fields are named according to the associated op-code */ union { - grpc_begin_message begin_message; + grpc_byte_buffer *message; grpc_metadata_batch metadata; - gpr_slice slice; } data; } grpc_stream_op; @@ -148,16 +134,8 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops); /* Append a GRPC_NO_OP to a buffer */ void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb); -/* Append a GRPC_OP_BEGIN to a buffer */ -void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length, - gpr_uint32 flags); +void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb); void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata); -/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */ -void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice); -/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */ -void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb, - void (*cb)(void *arg, grpc_op_error error), - void *arg); /* Append a buffer to a buffer - does not ref/unref any internal objects */ void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops, size_t nops); diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h index 6dd0fdaf468..f31011e56a6 100644 --- a/src/core/transport/transport.h +++ b/src/core/transport/transport.h @@ -78,6 +78,8 @@ struct grpc_transport_callbacks { void (*accept_stream)(void *user_data, grpc_transport *transport, const void *server_data); + void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug); + /* The transport has been closed */ void (*closed)(void *user_data, grpc_transport *transport); }; @@ -139,8 +141,14 @@ typedef struct grpc_transport_op { void *recv_user_data; grpc_pollset *bind_pollset; + + grpc_status_code cancel_with_status; } grpc_transport_op; +void grpc_transport_op_finish_with_failure(grpc_transport_op *op); + +char *grpc_transport_op_string(grpc_transport_op *op); + /* Send a batch of operations on a transport Takes ownership of any objects contained in ops. @@ -161,19 +169,6 @@ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream, void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data), void *user_data); -/* Abort a stream - - Terminate reading and writing for a stream. A final recv_batch with no - operations and final_state == GRPC_STREAM_CLOSED will be received locally, - and no more data will be presented to the up-layer. - - TODO(ctiller): consider adding a HTTP/2 reason to this function. */ -void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream, - grpc_status_code status); - -void grpc_transport_add_to_pollset(grpc_transport *transport, - grpc_pollset *pollset); - /* Advise peer of pending connection termination. */ void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status, gpr_slice debug_data); diff --git a/src/core/transport/transport_op_string.c b/src/core/transport/transport_op_string.c new file mode 100644 index 00000000000..5f7e1be2682 --- /dev/null +++ b/src/core/transport/transport_op_string.c @@ -0,0 +1,141 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/channel/channel_stack.h" + +#include +#include +#include + +#include "src/core/support/string.h" +#include +#include + +static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { + gpr_strvec_add(b, gpr_strdup(" key=")); + gpr_strvec_add( + b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); + + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), + GPR_SLICE_LENGTH(md->value->slice), + GPR_HEXDUMP_PLAINTEXT)); +} + +static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) { + grpc_linked_mdelem *m; + for (m = md.list.head; m != NULL; m = m->next) { + put_metadata(b, m->md); + } + if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) { + char *tmp; + gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec, + md.deadline.tv_nsec); + gpr_strvec_add(b, tmp); + } +} + +char *grpc_call_op_string(grpc_call_op *op) { + char *tmp; + char *out; + + gpr_strvec b; + gpr_strvec_init(&b); + + switch (op->dir) { + case GRPC_CALL_DOWN: + gpr_strvec_add(&b, gpr_strdup(">")); + break; + case GRPC_CALL_UP: + gpr_strvec_add(&b, gpr_strdup("<")); + break; + } + switch (op->type) { + case GRPC_SEND_METADATA: + gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); + put_metadata_list(&b, op->data.metadata); + break; + case GRPC_SEND_MESSAGE: + gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); + break; + case GRPC_SEND_PREFORMATTED_MESSAGE: + gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE")); + break; + case GRPC_SEND_FINISH: + gpr_strvec_add(&b, gpr_strdup("SEND_FINISH")); + break; + case GRPC_REQUEST_DATA: + gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA")); + break; + case GRPC_RECV_METADATA: + gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); + put_metadata_list(&b, op->data.metadata); + break; + case GRPC_RECV_MESSAGE: + gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); + break; + case GRPC_RECV_HALF_CLOSE: + gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE")); + break; + case GRPC_RECV_FINISH: + gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); + break; + case GRPC_RECV_SYNTHETIC_STATUS: + gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'", + op->data.synthetic_status.status, + op->data.synthetic_status.message); + gpr_strvec_add(&b, tmp); + break; + case GRPC_CANCEL_OP: + gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); + break; + } + gpr_asprintf(&tmp, " flags=0x%08x", op->flags); + gpr_strvec_add(&b, tmp); + if (op->bind_pollset) { + gpr_strvec_add(&b, gpr_strdup("bind_pollset")); + } + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); + + return out; +} + +void grpc_call_log_op(char *file, int line, gpr_log_severity severity, + grpc_call_element *elem, grpc_call_op *op) { + char *str = grpc_call_op_string(op); + gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str); + gpr_free(str); +} diff --git a/vsprojects/vs2010/grpc.vcxproj b/vsprojects/vs2010/grpc.vcxproj index f16ad85e5c7..c92efd8ca61 100644 --- a/vsprojects/vs2010/grpc.vcxproj +++ b/vsprojects/vs2010/grpc.vcxproj @@ -107,7 +107,6 @@ - @@ -229,8 +228,6 @@ - - @@ -247,8 +244,6 @@ - - @@ -423,6 +418,8 @@ + + diff --git a/vsprojects/vs2010/grpc.vcxproj.filters b/vsprojects/vs2010/grpc.vcxproj.filters index 1dfca58cb55..edce8d96526 100644 --- a/vsprojects/vs2010/grpc.vcxproj.filters +++ b/vsprojects/vs2010/grpc.vcxproj.filters @@ -61,9 +61,6 @@ src\core\tsi - - src\core\channel - src\core\channel @@ -88,9 +85,6 @@ src\core\channel - - src\core\channel - src\core\channel @@ -352,6 +346,9 @@ src\core\transport + + src\core\transport + @@ -443,9 +440,6 @@ src\core\channel - - src\core\channel - src\core\channel diff --git a/vsprojects/vs2010/grpc_unsecure.vcxproj b/vsprojects/vs2010/grpc_unsecure.vcxproj index 75b6dce9437..f0b94c09031 100644 --- a/vsprojects/vs2010/grpc_unsecure.vcxproj +++ b/vsprojects/vs2010/grpc_unsecure.vcxproj @@ -91,7 +91,6 @@ - @@ -175,8 +174,6 @@ - - @@ -193,8 +190,6 @@ - - @@ -369,6 +364,8 @@ + + diff --git a/vsprojects/vs2010/grpc_unsecure.vcxproj.filters b/vsprojects/vs2010/grpc_unsecure.vcxproj.filters index 7c94d4d51e3..daca2c0c5a0 100644 --- a/vsprojects/vs2010/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vs2010/grpc_unsecure.vcxproj.filters @@ -4,9 +4,6 @@ src\core\surface - - src\core\channel - src\core\channel @@ -31,9 +28,6 @@ src\core\channel - - src\core\channel - src\core\channel @@ -295,6 +289,9 @@ src\core\transport + + src\core\transport + @@ -338,9 +335,6 @@ src\core\channel - - src\core\channel - src\core\channel diff --git a/vsprojects/vs2013/grpc.vcxproj b/vsprojects/vs2013/grpc.vcxproj index b606f91473c..fd6900e9b88 100644 --- a/vsprojects/vs2013/grpc.vcxproj +++ b/vsprojects/vs2013/grpc.vcxproj @@ -109,7 +109,6 @@ - @@ -231,8 +230,6 @@ - - @@ -249,8 +246,6 @@ - - @@ -425,6 +420,8 @@ + + diff --git a/vsprojects/vs2013/grpc.vcxproj.filters b/vsprojects/vs2013/grpc.vcxproj.filters index 1dfca58cb55..edce8d96526 100644 --- a/vsprojects/vs2013/grpc.vcxproj.filters +++ b/vsprojects/vs2013/grpc.vcxproj.filters @@ -61,9 +61,6 @@ src\core\tsi - - src\core\channel - src\core\channel @@ -88,9 +85,6 @@ src\core\channel - - src\core\channel - src\core\channel @@ -352,6 +346,9 @@ src\core\transport + + src\core\transport + @@ -443,9 +440,6 @@ src\core\channel - - src\core\channel - src\core\channel diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj b/vsprojects/vs2013/grpc_unsecure.vcxproj index b068274af9d..e170d1934e8 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj @@ -93,7 +93,6 @@ - @@ -177,8 +176,6 @@ - - @@ -195,8 +192,6 @@ - - @@ -371,6 +366,8 @@ + + diff --git a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters index 7c94d4d51e3..daca2c0c5a0 100644 --- a/vsprojects/vs2013/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vs2013/grpc_unsecure.vcxproj.filters @@ -4,9 +4,6 @@ src\core\surface - - src\core\channel - src\core\channel @@ -31,9 +28,6 @@ src\core\channel - - src\core\channel - src\core\channel @@ -295,6 +289,9 @@ src\core\transport + + src\core\transport + @@ -338,9 +335,6 @@ src\core\channel - - src\core\channel - src\core\channel