pull/1369/head
Craig Tiller 10 years ago
parent 65582323ad
commit 83f88d90b9
  1. 8
      BUILD
  2. 15
      Makefile
  3. 6
      build.json
  4. 70
      src/core/channel/census_filter.c
  5. 44
      src/core/channel/channel_stack.c
  6. 77
      src/core/channel/channel_stack.h
  7. 18
      src/core/channel/child_channel.c
  8. 93
      src/core/channel/client_channel.c
  9. 261
      src/core/channel/connected_channel.c
  10. 72
      src/core/channel/http_client_filter.c
  11. 137
      src/core/channel/http_filter.c
  12. 43
      src/core/channel/http_filter.h
  13. 116
      src/core/channel/http_server_filter.c
  14. 13
      src/core/channel/noop_filter.c
  15. 178
      src/core/surface/call.c
  16. 1
      src/core/surface/call.h
  17. 4
      src/core/transport/chttp2_transport.c
  18. 28
      src/core/transport/stream_op.h
  19. 21
      src/core/transport/transport.h
  20. 141
      src/core/transport/transport_op_string.c
  21. 7
      vsprojects/vs2010/grpc.vcxproj
  22. 12
      vsprojects/vs2010/grpc.vcxproj.filters
  23. 7
      vsprojects/vs2010/grpc_unsecure.vcxproj
  24. 12
      vsprojects/vs2010/grpc_unsecure.vcxproj.filters
  25. 7
      vsprojects/vs2013/grpc.vcxproj
  26. 12
      vsprojects/vs2013/grpc.vcxproj.filters
  27. 7
      vsprojects/vs2013/grpc_unsecure.vcxproj
  28. 12
      vsprojects/vs2013/grpc_unsecure.vcxproj.filters

@ -145,7 +145,6 @@ cc_library(
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@ -245,7 +244,6 @@ cc_library(
"src/core/tsi/fake_transport_security.c",
"src/core/tsi/ssl_transport_security.c",
"src/core/tsi/transport_security.c",
"src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@ -254,7 +252,6 @@ cc_library(
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@ -342,6 +339,7 @@ cc_library(
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c",
"src/core/transport/transport_op_string.c",
],
hdrs = [
"include/grpc/grpc_security.h",
@ -373,7 +371,6 @@ cc_library(
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@ -454,7 +451,6 @@ cc_library(
"src/core/transport/transport.h",
"src/core/transport/transport_impl.h",
"src/core/surface/init_unsecure.c",
"src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@ -463,7 +459,6 @@ cc_library(
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@ -551,6 +546,7 @@ cc_library(
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c",
"src/core/transport/transport_op_string.c",
],
hdrs = [
"include/grpc/byte_buffer.h",

@ -2603,7 +2603,6 @@ LIBGRPC_SRC = \
src/core/tsi/fake_transport_security.c \
src/core/tsi/ssl_transport_security.c \
src/core/tsi/transport_security.c \
src/core/channel/call_op_string.c \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
@ -2612,7 +2611,6 @@ LIBGRPC_SRC = \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
src/core/channel/http_filter.c \
src/core/channel/http_server_filter.c \
src/core/channel/noop_filter.c \
src/core/compression/algorithm.c \
@ -2700,6 +2698,7 @@ LIBGRPC_SRC = \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
src/core/transport/transport_op_string.c \
PUBLIC_HEADERS_C += \
include/grpc/grpc_security.h \
@ -2750,7 +2749,6 @@ src/core/surface/secure_channel_create.c: $(OPENSSL_DEP)
src/core/tsi/fake_transport_security.c: $(OPENSSL_DEP)
src/core/tsi/ssl_transport_security.c: $(OPENSSL_DEP)
src/core/tsi/transport_security.c: $(OPENSSL_DEP)
src/core/channel/call_op_string.c: $(OPENSSL_DEP)
src/core/channel/census_filter.c: $(OPENSSL_DEP)
src/core/channel/channel_args.c: $(OPENSSL_DEP)
src/core/channel/channel_stack.c: $(OPENSSL_DEP)
@ -2759,7 +2757,6 @@ src/core/channel/client_channel.c: $(OPENSSL_DEP)
src/core/channel/client_setup.c: $(OPENSSL_DEP)
src/core/channel/connected_channel.c: $(OPENSSL_DEP)
src/core/channel/http_client_filter.c: $(OPENSSL_DEP)
src/core/channel/http_filter.c: $(OPENSSL_DEP)
src/core/channel/http_server_filter.c: $(OPENSSL_DEP)
src/core/channel/noop_filter.c: $(OPENSSL_DEP)
src/core/compression/algorithm.c: $(OPENSSL_DEP)
@ -2847,6 +2844,7 @@ src/core/transport/chttp2_transport.c: $(OPENSSL_DEP)
src/core/transport/metadata.c: $(OPENSSL_DEP)
src/core/transport/stream_op.c: $(OPENSSL_DEP)
src/core/transport/transport.c: $(OPENSSL_DEP)
src/core/transport/transport_op_string.c: $(OPENSSL_DEP)
endif
$(LIBDIR)/$(CONFIG)/libgrpc.a: $(ZLIB_DEP) $(OPENSSL_DEP) $(LIBGRPC_OBJS)
@ -2913,7 +2911,6 @@ $(OBJDIR)/$(CONFIG)/src/core/surface/secure_channel_create.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/fake_transport_security.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/ssl_transport_security.o:
$(OBJDIR)/$(CONFIG)/src/core/tsi/transport_security.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o:
@ -2922,7 +2919,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o:
@ -3010,6 +3006,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o:
LIBGRPC_TEST_UTIL_SRC = \
@ -3090,7 +3087,6 @@ $(OBJDIR)/$(CONFIG)/test/core/util/slice_splitter.o:
LIBGRPC_UNSECURE_SRC = \
src/core/surface/init_unsecure.c \
src/core/channel/call_op_string.c \
src/core/channel/census_filter.c \
src/core/channel/channel_args.c \
src/core/channel/channel_stack.c \
@ -3099,7 +3095,6 @@ LIBGRPC_UNSECURE_SRC = \
src/core/channel/client_setup.c \
src/core/channel/connected_channel.c \
src/core/channel/http_client_filter.c \
src/core/channel/http_filter.c \
src/core/channel/http_server_filter.c \
src/core/channel/noop_filter.c \
src/core/compression/algorithm.c \
@ -3187,6 +3182,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/metadata.c \
src/core/transport/stream_op.c \
src/core/transport/transport.c \
src/core/transport/transport_op_string.c \
PUBLIC_HEADERS_C += \
include/grpc/byte_buffer.h \
@ -3231,7 +3227,6 @@ ifneq ($(NO_DEPS),true)
endif
$(OBJDIR)/$(CONFIG)/src/core/surface/init_unsecure.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/call_op_string.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/census_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_args.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/channel_stack.o:
@ -3240,7 +3235,6 @@ $(OBJDIR)/$(CONFIG)/src/core/channel/client_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/client_setup.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/connected_channel.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_client_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/http_server_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/channel/noop_filter.o:
$(OBJDIR)/$(CONFIG)/src/core/compression/algorithm.o:
@ -3328,6 +3322,7 @@ $(OBJDIR)/$(CONFIG)/src/core/transport/chttp2_transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/metadata.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/stream_op.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport.o:
$(OBJDIR)/$(CONFIG)/src/core/transport/transport_op_string.o:
LIBGRPC++_SRC = \

@ -96,7 +96,6 @@
"src/core/channel/client_setup.h",
"src/core/channel/connected_channel.h",
"src/core/channel/http_client_filter.h",
"src/core/channel/http_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
@ -178,7 +177,6 @@
"src/core/transport/transport_impl.h"
],
"src": [
"src/core/channel/call_op_string.c",
"src/core/channel/census_filter.c",
"src/core/channel/channel_args.c",
"src/core/channel/channel_stack.c",
@ -187,7 +185,6 @@
"src/core/channel/client_setup.c",
"src/core/channel/connected_channel.c",
"src/core/channel/http_client_filter.c",
"src/core/channel/http_filter.c",
"src/core/channel/http_server_filter.c",
"src/core/channel/noop_filter.c",
"src/core/compression/algorithm.c",
@ -274,7 +271,8 @@
"src/core/transport/chttp2_transport.c",
"src/core/transport/metadata.c",
"src/core/transport/stream_op.c",
"src/core/transport/transport.c"
"src/core/transport/transport.c",
"src/core/transport/transport_op_string.c"
]
}
],

@ -49,6 +49,11 @@ typedef struct call_data {
census_op_id op_id;
census_rpc_stats stats;
gpr_timespec start_ts;
/* recv callback */
grpc_stream_op_buffer *recv_ops;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
} call_data;
typedef struct channel_data {
@ -60,55 +65,58 @@ static void init_rpc_stats(census_rpc_stats* stats) {
stats->cnt = 1;
}
static void extract_and_annotate_method_tag(grpc_call_op* op, call_data* calld,
static void extract_and_annotate_method_tag(grpc_stream_op_buffer* sopb, call_data* calld,
channel_data* chand) {
grpc_linked_mdelem* m;
for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
if (m->md->key == chand->path_str) {
gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
census_add_method_tag(
calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
size_t i;
for (i = 0; i < sopb->nops; i++) {
grpc_stream_op * op = &sopb->ops[i];
if (op->type != GRPC_OP_METADATA) continue;
for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
if (m->md->key == chand->path_str) {
gpr_log(GPR_DEBUG, "%s", (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
census_add_method_tag(
calld->op_id, (const char*)GPR_SLICE_START_PTR(m->md->value->slice));
}
}
}
}
static void client_call_op(grpc_call_element* elem,
grpc_call_element* from_elem, grpc_call_op* op) {
static void client_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
switch (op->type) {
case GRPC_SEND_METADATA:
extract_and_annotate_method_tag(op, calld, chand);
break;
case GRPC_RECV_FINISH:
/* Should we stop timing the rpc here? */
break;
default:
break;
if (op->send_ops) {
extract_and_annotate_method_tag(op->send_ops, calld, chand);
}
/* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
}
static void server_call_op(grpc_call_element* elem,
grpc_call_element* from_elem, grpc_call_op* op) {
static void server_on_done_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
if (success) {
extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
}
calld->on_done_recv(calld->recv_user_data, success);
}
static void server_start_transport_op(grpc_call_element* elem, grpc_transport_op* op) {
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
GPR_ASSERT(calld != NULL);
GPR_ASSERT(chand != NULL);
GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
switch (op->type) {
case GRPC_RECV_METADATA:
extract_and_annotate_method_tag(op, calld, chand);
break;
case GRPC_SEND_FINISH:
/* Should we stop timing the rpc here? */
break;
default:
break;
if (op->recv_ops) {
/* substitute our callback for the op callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
calld->recv_user_data = op->recv_user_data;
op->on_done_recv = server_on_done_recv;
op->recv_user_data = elem;
}
/* Always pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
@ -180,11 +188,11 @@ static void destroy_channel_elem(grpc_channel_element* elem) {
}
const grpc_channel_filter grpc_client_census_filter = {
client_call_op, channel_op, sizeof(call_data), client_init_call_elem,
client_start_transport_op, channel_op, sizeof(call_data), client_init_call_elem,
client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
server_call_op, channel_op, sizeof(call_data), server_init_call_elem,
server_start_transport_op, channel_op, sizeof(call_data), server_init_call_elem,
server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "census-server"};

@ -35,6 +35,7 @@
#include <grpc/support/log.h>
#include <stdlib.h>
#include <string.h>
int grpc_trace_channel = 0;
@ -181,12 +182,9 @@ void grpc_call_stack_destroy(grpc_call_stack *stack) {
}
}
void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op) {
grpc_call_element *next_elem = elem + op->dir;
if (op->type == GRPC_SEND_METADATA || op->type == GRPC_RECV_METADATA) {
grpc_metadata_batch_assert_ok(&op->data.metadata);
}
next_elem->filter->call_op(next_elem, elem, op);
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op) {
grpc_call_element *next_elem = elem + 1;
next_elem->filter->start_transport_op(next_elem, op);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op) {
@ -205,39 +203,15 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
sizeof(grpc_call_stack)));
}
static void do_nothing(void *user_data, grpc_op_error error) {}
void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
grpc_call_op cancel_op;
cancel_op.type = GRPC_CANCEL_OP;
cancel_op.dir = GRPC_CALL_DOWN;
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
cancel_op.flags = 0;
cancel_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &cancel_op);
}
void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
grpc_call_op finish_op;
finish_op.type = GRPC_SEND_FINISH;
finish_op.dir = GRPC_CALL_DOWN;
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
finish_op.flags = 0;
finish_op.bind_pollset = NULL;
grpc_call_next_op(cur_elem, &finish_op);
grpc_transport_op op;
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
grpc_call_next_op(cur_elem, &op);
}
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message) {
grpc_call_op op;
op.type = GRPC_RECV_SYNTHETIC_STATUS;
op.dir = GRPC_CALL_UP;
op.done_cb = do_nothing;
op.user_data = NULL;
op.data.synthetic_status.status = status;
op.data.synthetic_status.message = message;
grpc_call_next_op(cur_elem, &op);
abort();
}

@ -51,78 +51,11 @@
typedef struct grpc_channel_element grpc_channel_element;
typedef struct grpc_call_element grpc_call_element;
/* Call operations - things that can be sent and received.
Threading:
SEND, RECV, and CANCEL ops can be active on a call at the same time, but
only one SEND, one RECV, and one CANCEL can be active at a time.
If state is shared between send/receive/cancel operations, it is up to
filters to provide their own protection around that. */
typedef enum {
/* send metadata to the channels peer */
GRPC_SEND_METADATA,
/* send a message to the channels peer */
GRPC_SEND_MESSAGE,
/* send a pre-formatted message to the channels peer */
GRPC_SEND_PREFORMATTED_MESSAGE,
/* send half-close to the channels peer */
GRPC_SEND_FINISH,
/* request that more data be allowed through flow control */
GRPC_REQUEST_DATA,
/* metadata was received from the channels peer */
GRPC_RECV_METADATA,
/* a message was received from the channels peer */
GRPC_RECV_MESSAGE,
/* half-close was received from the channels peer */
GRPC_RECV_HALF_CLOSE,
/* full close was received from the channels peer */
GRPC_RECV_FINISH,
/* a status has been sythesized locally */
GRPC_RECV_SYNTHETIC_STATUS,
/* the call has been abnormally terminated */
GRPC_CANCEL_OP
} grpc_call_op_type;
/* The direction of the call.
The values of the enums (1, -1) matter here - they are used to increment
or decrement a pointer to find the next element to call */
typedef enum { GRPC_CALL_DOWN = 1, GRPC_CALL_UP = -1 } grpc_call_dir;
/* A single filterable operation to be performed on a call */
typedef struct {
/* The type of operation we're performing */
grpc_call_op_type type;
/* The directionality of this call - does the operation begin at the bottom
of the stack and flow up, or does the operation start at the top of the
stack and flow down through the filters. */
grpc_call_dir dir;
/* Flags associated with this call: see GRPC_WRITE_* in grpc.h */
gpr_uint32 flags;
/* Argument data, matching up with grpc_call_op_type names */
union {
grpc_byte_buffer *message;
grpc_metadata_batch metadata;
struct {
grpc_status_code status;
const char *message;
} synthetic_status;
} data;
grpc_pollset *bind_pollset;
/* Must be called when processing of this call-op is complete.
Signature chosen to match transport flow control callbacks */
void (*done_cb)(void *user_data, grpc_op_error error);
/* User data to be passed into done_cb */
void *user_data;
} grpc_call_op;
/* returns a string representation of op, that can be destroyed with gpr_free */
char *grpc_call_op_string(grpc_call_op *op);
typedef enum {
/* send a goaway message to remote channels indicating that we are going
to disconnect in the future */
@ -170,8 +103,7 @@ typedef struct {
typedef struct {
/* Called to eg. send/receive data on a call.
See grpc_call_next_op on how to call the next element in the stack */
void (*call_op)(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op);
void (*start_transport_op)(grpc_call_element *elem, grpc_transport_op *op);
/* Called to handle channel level operations - e.g. new calls, or transport
closure.
See grpc_channel_next_op on how to call the next element in the stack */
@ -272,8 +204,8 @@ void grpc_call_stack_init(grpc_channel_stack *channel_stack,
/* Destroy a call stack */
void grpc_call_stack_destroy(grpc_call_stack *stack);
/* Call the next operation (depending on call directionality) in a call stack */
void grpc_call_next_op(grpc_call_element *elem, grpc_call_op *op);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_call_element *elem, grpc_transport_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_channel_op *op);
@ -285,10 +217,9 @@ grpc_channel_stack *grpc_channel_stack_from_top_element(
grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem);
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_call_op *op);
grpc_call_element *elem, grpc_transport_op *op);
void grpc_call_element_send_cancel(grpc_call_element *cur_elem);
void grpc_call_element_send_finish(grpc_call_element *cur_elem);
void grpc_call_element_recv_status(grpc_call_element *cur_elem,
grpc_status_code status,
const char *message);

@ -61,22 +61,11 @@ typedef struct {
} lb_channel_data;
typedef struct {
grpc_call_element *back;
grpc_child_channel *channel;
} lb_call_data;
static void lb_call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
lb_call_data *calld = elem->call_data;
switch (op->dir) {
case GRPC_CALL_UP:
calld->back->filter->call_op(calld->back, elem, op);
break;
case GRPC_CALL_DOWN:
grpc_call_next_op(elem, op);
break;
}
static void lb_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
grpc_call_next_op(elem, op);
}
/* Currently we assume all channel operations should just be pushed up. */
@ -165,7 +154,7 @@ static void lb_destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_child_channel_top_filter = {
lb_call_op, lb_channel_op, sizeof(lb_call_data),
lb_start_transport_op, lb_channel_op, sizeof(lb_call_data),
lb_init_call_elem, lb_destroy_call_elem, sizeof(lb_channel_data),
lb_init_channel_elem, lb_destroy_channel_elem, "child-channel", };
@ -282,7 +271,6 @@ grpc_child_call *grpc_child_channel_create_call(grpc_child_channel *channel,
lbelem = LINK_BACK_ELEM_FROM_CALL(stk);
lbchand = lbelem->channel_data;
lbcalld = lbelem->call_data;
lbcalld->back = parent;
lbcalld->channel = channel;
gpr_mu_lock(&lbchand->mu);

@ -82,7 +82,7 @@ struct call_data {
/* owning element */
grpc_call_element *elem;
gpr_uint8 got_first_send;
gpr_uint8 got_first_op;
call_state state;
gpr_timespec deadline;
@ -91,7 +91,7 @@ struct call_data {
/* our child call stack */
grpc_child_call *child_call;
} active;
grpc_call_op waiting_op;
grpc_transport_op waiting_op;
} s;
};
@ -110,9 +110,7 @@ static int prepare_activate(grpc_call_element *elem,
return 1;
}
static void do_nothing(void *ignored, grpc_op_error error) {}
static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
static void complete_activate(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
@ -121,17 +119,16 @@ static void complete_activate(grpc_call_element *elem, grpc_call_op *op) {
/* continue the start call down the stack, this nees to happen after metadata
are flushed*/
child_elem->filter->call_op(child_elem, elem, op);
child_elem->filter->start_transport_op(child_elem, op);
}
static void start_rpc(grpc_call_element *elem, grpc_call_op *op) {
static void start_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&chand->mu);
grpc_metadata_batch_destroy(&op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_ERROR);
grpc_transport_op_finish_with_failure(op);
return;
}
GPR_ASSERT(calld->state == CALL_CREATED);
@ -187,19 +184,10 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
}
static void send_up_cancelled_ops(grpc_call_element *elem) {
grpc_call_op finish_op;
/* send up a synthesized status */
grpc_call_element_recv_status(elem, GRPC_STATUS_CANCELLED, "Cancelled");
/* send up a finish */
finish_op.type = GRPC_RECV_FINISH;
finish_op.dir = GRPC_CALL_UP;
finish_op.flags = 0;
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
grpc_call_next_op(elem, &finish_op);
abort();
}
static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
static void cancel_rpc(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_call_element *child_elem;
@ -209,15 +197,13 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
case CALL_ACTIVE:
child_elem = grpc_child_call_get_top_element(calld->s.active.child_call);
gpr_mu_unlock(&chand->mu);
child_elem->filter->call_op(child_elem, elem, op);
child_elem->filter->start_transport_op(child_elem, op);
return; /* early out */
case CALL_WAITING:
grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
remove_waiting_child(chand, calld);
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu);
send_up_cancelled_ops(elem);
calld->s.waiting_op.done_cb(calld->s.waiting_op.user_data, GRPC_OP_ERROR);
return; /* early out */
case CALL_CREATED:
calld->state = CALL_CANCELLED;
@ -232,40 +218,27 @@ static void cancel_rpc(grpc_call_element *elem, grpc_call_op *op) {
abort();
}
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
static void cc_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
call_data *calld = elem->call_data;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_METADATA:
if (!calld->got_first_send) {
/* filter out the start event to find which child to send on */
calld->got_first_send = 1;
start_rpc(elem, op);
} else {
grpc_call_next_op(elem, op);
}
break;
case GRPC_CANCEL_OP:
cancel_rpc(elem, op);
break;
case GRPC_SEND_MESSAGE:
case GRPC_SEND_FINISH:
case GRPC_REQUEST_DATA:
if (calld->state == CALL_ACTIVE) {
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
child_elem->filter->call_op(child_elem, elem, op);
} else {
op->done_cb(op->user_data, GRPC_OP_ERROR);
}
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_UP);
grpc_call_next_op(elem, op);
break;
if (op->cancel_with_status != GRPC_STATUS_OK) {
GPR_ASSERT(op->send_ops == NULL);
GPR_ASSERT(op->recv_ops == NULL);
cancel_rpc(elem, op);
return;
}
if (!calld->got_first_op) {
calld->got_first_op = 1;
start_rpc(elem, op);
} else {
grpc_call_element *child_elem =
grpc_child_call_get_top_element(calld->s.active.child_call);
child_elem->filter->start_transport_op(child_elem, op);
}
}
@ -359,7 +332,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->elem = elem;
calld->state = CALL_CREATED;
calld->deadline = gpr_inf_future;
calld->got_first_send = 0;
calld->got_first_op = 0;
}
/* Destructor for call_data */
@ -372,9 +345,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (calld->state == CALL_ACTIVE) {
grpc_child_call_destroy(calld->s.active.child_call);
}
if (calld->state == CALL_WAITING) {
grpc_metadata_batch_destroy(&calld->s.waiting_op.data.metadata);
}
GPR_ASSERT(calld->state != CALL_WAITING);
}
/* Constructor for channel_data */
@ -417,7 +388,7 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_channel_filter = {
call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
cc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"client-channel",
};
@ -436,7 +407,7 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
call_data **waiting_children;
size_t waiting_child_count;
size_t i;
grpc_call_op *call_ops;
grpc_transport_op *call_ops;
/* build the child filter stack */
child_filters = gpr_malloc(sizeof(grpc_channel_filter *) * num_child_filters);
@ -472,13 +443,13 @@ grpc_transport_setup_result grpc_client_channel_transport_setup_complete(
chand->waiting_child_count = 0;
chand->waiting_child_capacity = 0;
call_ops = gpr_malloc(sizeof(grpc_call_op) * waiting_child_count);
call_ops = gpr_malloc(sizeof(*call_ops) * waiting_child_count);
for (i = 0; i < waiting_child_count; i++) {
call_ops[i] = waiting_children[i]->s.waiting_op;
if (!prepare_activate(waiting_children[i]->elem, chand->active_child)) {
waiting_children[i] = NULL;
call_ops[i].done_cb(call_ops[i].user_data, GRPC_OP_ERROR);
grpc_transport_op_finish_with_failure(&call_ops[i]);
}
}

@ -50,19 +50,10 @@
typedef struct connected_channel_channel_data {
grpc_transport *transport;
gpr_uint32 max_message_length;
} channel_data;
typedef struct connected_channel_call_data {
grpc_call_element *elem;
grpc_stream_op_buffer outgoing_sopb;
gpr_uint32 max_message_length;
gpr_uint32 incoming_message_length;
gpr_uint8 reading_message;
gpr_uint8 got_read_close;
gpr_slice_buffer incoming_message;
gpr_uint32 outgoing_buffer_length_estimate;
void *unused;
} call_data;
/* We perform a small hack to locate transport data alongside the connected
@ -72,6 +63,7 @@ typedef struct connected_channel_call_data {
#define CALL_DATA_FROM_TRANSPORT_STREAM(transport_stream) \
(((call_data *)(transport_stream)) - 1)
#if 0
/* Copy the contents of a byte buffer into stream ops */
static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
grpc_stream_op_buffer *sopb) {
@ -87,76 +79,17 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
break;
}
}
/* Flush queued stream operations onto the transport */
static void end_bufferable_op(grpc_call_op *op, channel_data *chand,
call_data *calld, int is_last) {
size_t nops;
if (op->flags & GRPC_WRITE_BUFFER_HINT) {
if (calld->outgoing_buffer_length_estimate < MAX_BUFFER_LENGTH) {
op->done_cb(op->user_data, GRPC_OP_OK);
return;
}
}
calld->outgoing_buffer_length_estimate = 0;
grpc_sopb_add_flow_ctl_cb(&calld->outgoing_sopb, op->done_cb, op->user_data);
nops = calld->outgoing_sopb.nops;
calld->outgoing_sopb.nops = 0;
grpc_transport_send_batch(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
calld->outgoing_sopb.ops, nops, is_last);
}
#endif
/* Intercept a call operation and either push it directly up or translate it
into transport stream operations */
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
static void con_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
if (op->bind_pollset) {
grpc_transport_add_to_pollset(chand->transport, op->bind_pollset);
}
switch (op->type) {
case GRPC_SEND_METADATA:
grpc_sopb_add_metadata(&calld->outgoing_sopb, op->data.metadata);
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_MESSAGE:
grpc_sopb_add_begin_message(&calld->outgoing_sopb,
grpc_byte_buffer_length(op->data.message),
op->flags);
/* fall-through */
case GRPC_SEND_PREFORMATTED_MESSAGE:
copy_byte_buffer_to_stream_ops(op->data.message, &calld->outgoing_sopb);
calld->outgoing_buffer_length_estimate +=
(5 + grpc_byte_buffer_length(op->data.message));
end_bufferable_op(op, chand, calld, 0);
break;
case GRPC_SEND_FINISH:
end_bufferable_op(op, chand, calld, 1);
break;
case GRPC_REQUEST_DATA:
/* re-arm window updates if they were disarmed by finish_message */
grpc_transport_set_allow_window_updates(
chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 1);
break;
case GRPC_CANCEL_OP:
grpc_transport_abort_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
GRPC_STATUS_CANCELLED);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_UP);
grpc_call_next_op(elem, op);
break;
}
grpc_transport_perform_op(chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
}
/* Currently we assume all channel operations should just be pushed up. */
@ -188,14 +121,6 @@ static void init_call_elem(grpc_call_element *elem,
int r;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
calld->elem = elem;
grpc_sopb_init(&calld->outgoing_sopb);
calld->reading_message = 0;
calld->got_read_close = 0;
calld->outgoing_buffer_length_estimate = 0;
calld->max_message_length = chand->max_message_length;
gpr_slice_buffer_init(&calld->incoming_message);
r = grpc_transport_init_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
server_transport_data);
@ -207,8 +132,6 @@ static void destroy_call_elem(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
grpc_sopb_destroy(&calld->outgoing_sopb);
gpr_slice_buffer_destroy(&calld->incoming_message);
grpc_transport_destroy_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld));
}
@ -218,12 +141,12 @@ static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
size_t i;
GPR_ASSERT(!is_first);
GPR_ASSERT(is_last);
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
cd->transport = NULL;
#if 0
cd->max_message_length = DEFAULT_MAX_MESSAGE_LENGTH;
if (args) {
for (i = 0; i < args->num_args; i++) {
@ -240,6 +163,7 @@ static void init_channel_elem(grpc_channel_element *elem,
}
}
}
#endif
}
/* Destructor for channel_data */
@ -250,15 +174,10 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_connected_channel_filter = {
call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
con_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "connected",
};
static gpr_slice alloc_recv_buffer(void *user_data, grpc_transport *transport,
grpc_stream *stream, size_t size_hint) {
return gpr_slice_malloc(size_hint);
}
/* Transport callback to accept a new stream... calls up to handle it */
static void accept_stream(void *user_data, grpc_transport *transport,
const void *transport_server_data) {
@ -276,168 +195,6 @@ static void accept_stream(void *user_data, grpc_transport *transport,
channel_op(elem, NULL, &op);
}
static void recv_error(channel_data *chand, call_data *calld, int line,
const char *message) {
gpr_log_message(__FILE__, line, GPR_LOG_SEVERITY_ERROR, message);
if (chand->transport) {
grpc_transport_abort_stream(chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld),
GRPC_STATUS_INVALID_ARGUMENT);
}
}
static void do_nothing(void *calldata, grpc_op_error error) {}
static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_element *elem = calld->elem;
grpc_call_op call_op;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
/* if we got all the bytes for this message, call up the stack */
call_op.type = GRPC_RECV_MESSAGE;
call_op.done_cb = do_nothing;
/* TODO(ctiller): this could be a lot faster if coded directly */
call_op.data.message = grpc_byte_buffer_create(calld->incoming_message.slices,
calld->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&calld->incoming_message);
/* disable window updates until we get a request more from above */
grpc_transport_set_allow_window_updates(
chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), 0);
GPR_ASSERT(calld->incoming_message.count == 0);
calld->reading_message = 0;
grpc_call_next_op(elem, &call_op);
}
static void got_metadata(grpc_call_element *elem,
grpc_metadata_batch metadata) {
grpc_call_op op;
op.type = GRPC_RECV_METADATA;
op.dir = GRPC_CALL_UP;
op.flags = 0;
op.data.metadata = metadata;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_next_op(elem, &op);
}
/* Handle incoming stream ops from the transport, translating them into
call_ops to pass up the call stack */
static void recv_batch(void *user_data, grpc_transport *transport,
grpc_stream *stream, grpc_stream_op *ops,
size_t ops_count, grpc_stream_state final_state) {
call_data *calld = CALL_DATA_FROM_TRANSPORT_STREAM(stream);
grpc_call_element *elem = calld->elem;
channel_data *chand = elem->channel_data;
grpc_stream_op *stream_op;
grpc_call_op call_op;
size_t i;
gpr_uint32 length;
GPR_ASSERT(elem->filter == &grpc_connected_channel_filter);
for (i = 0; i < ops_count; i++) {
stream_op = ops + i;
switch (stream_op->type) {
case GRPC_OP_FLOW_CTL_CB:
stream_op->data.flow_ctl_cb.cb(stream_op->data.flow_ctl_cb.arg, 1);
break;
case GRPC_NO_OP:
break;
case GRPC_OP_METADATA:
got_metadata(elem, stream_op->data.metadata);
break;
case GRPC_OP_BEGIN_MESSAGE:
/* can't begin a message when we're still reading a message */
if (calld->reading_message) {
char *message = NULL;
gpr_asprintf(&message,
"Message terminated early; read %d bytes, expected %d",
(int)calld->incoming_message.length,
(int)calld->incoming_message_length);
recv_error(chand, calld, __LINE__, message);
gpr_free(message);
return;
}
/* stash away parameters, and prepare for incoming slices */
length = stream_op->data.begin_message.length;
if (length > calld->max_message_length) {
char *message = NULL;
gpr_asprintf(
&message,
"Maximum message length of %d exceeded by a message of length %d",
calld->max_message_length, length);
recv_error(chand, calld, __LINE__, message);
gpr_free(message);
} else if (length > 0) {
calld->reading_message = 1;
calld->incoming_message_length = length;
} else {
finish_message(chand, calld);
}
break;
case GRPC_OP_SLICE:
if (GPR_SLICE_LENGTH(stream_op->data.slice) == 0) {
gpr_slice_unref(stream_op->data.slice);
break;
}
/* we have to be reading a message to know what to do here */
if (!calld->reading_message) {
recv_error(chand, calld, __LINE__,
"Received payload data while not reading a message");
return;
}
/* append the slice to the incoming buffer */
gpr_slice_buffer_add(&calld->incoming_message, stream_op->data.slice);
if (calld->incoming_message.length > calld->incoming_message_length) {
/* if we got too many bytes, complain */
char *message = NULL;
gpr_asprintf(&message,
"Receiving message overflow; read %d bytes, expected %d",
(int)calld->incoming_message.length,
(int)calld->incoming_message_length);
recv_error(chand, calld, __LINE__, message);
gpr_free(message);
return;
} else if (calld->incoming_message.length ==
calld->incoming_message_length) {
finish_message(chand, calld);
}
}
}
/* if the stream closed, then call up the stack to let it know */
if (!calld->got_read_close && (final_state == GRPC_STREAM_RECV_CLOSED ||
final_state == GRPC_STREAM_CLOSED)) {
calld->got_read_close = 1;
if (calld->reading_message) {
char *message = NULL;
gpr_asprintf(&message,
"Last message truncated; read %d bytes, expected %d",
(int)calld->incoming_message.length,
(int)calld->incoming_message_length);
recv_error(chand, calld, __LINE__, message);
gpr_free(message);
}
call_op.type = GRPC_RECV_HALF_CLOSE;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
call_op.done_cb = do_nothing;
call_op.user_data = NULL;
grpc_call_next_op(elem, &call_op);
}
if (final_state == GRPC_STREAM_CLOSED) {
call_op.type = GRPC_RECV_FINISH;
call_op.dir = GRPC_CALL_UP;
call_op.flags = 0;
call_op.done_cb = do_nothing;
call_op.user_data = NULL;
grpc_call_next_op(elem, &call_op);
}
}
static void transport_goaway(void *user_data, grpc_transport *transport,
grpc_status_code status, gpr_slice debug) {
/* transport got goaway ==> call up and handle it */
@ -470,7 +227,7 @@ static void transport_closed(void *user_data, grpc_transport *transport) {
}
const grpc_transport_callbacks connected_channel_transport_callbacks = {
alloc_recv_buffer, accept_stream, recv_batch,
accept_stream,
transport_goaway, transport_closed,
};

@ -39,6 +39,12 @@ typedef struct call_data {
grpc_linked_mdelem scheme;
grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type;
int sent_initial_metadata;
int got_initial_metadata;
grpc_stream_op_buffer *recv_ops;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
} call_data;
typedef struct channel_data {
@ -64,22 +70,39 @@ static grpc_mdelem *client_filter(void *user_data, grpc_mdelem *md) {
return md;
}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
static void hc_on_recv(void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
size_t i;
size_t nops = calld->recv_ops->nops;
grpc_stream_op *ops = calld->recv_ops->ops;
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->got_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
}
}
calld->on_done_recv(calld->recv_user_data, success);
}
static void hc_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_METADATA:
if (op->send_ops && !calld->sent_initial_metadata) {
size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops;
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->sent_initial_metadata = 1;
/* Send : prefixed headers, which have to be before any application
* layer headers. */
layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
grpc_mdelem_ref(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme,
@ -88,17 +111,20 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_mdelem_ref(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type,
grpc_mdelem_ref(channeld->content_type));
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
grpc_metadata_batch_filter(&op->data.metadata, client_filter, elem);
grpc_call_next_op(elem, op);
break;
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
break;
}
}
if (op->recv_ops && !calld->got_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
calld->recv_user_data = op->recv_user_data;
op->on_done_recv = hc_on_recv;
op->recv_user_data = elem;
}
grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@ -120,7 +146,11 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {}
const void *server_transport_data) {
call_data *calld = elem->call_data;
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
@ -181,6 +211,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_client_filter = {
call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
hc_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-client"};

@ -1,137 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/http_filter.h"
#include <grpc/support/log.h>
typedef struct call_data {
int unused; /* C89 requires at least one struct element */
} call_data;
typedef struct channel_data {
int unused; /* C89 requires at least one struct element */
} channel_data;
/* used to silence 'variable not used' warnings */
static void ignore_unused(void *ignored) {}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
ignore_unused(calld);
ignore_unused(channeld);
switch (op->type) {
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
break;
}
}
/* Called on special channel events, such as disconnection or new incoming
calls on the server */
static void channel_op(grpc_channel_element *elem,
grpc_channel_element *from_elem, grpc_channel_op *op) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
ignore_unused(channeld);
switch (op->type) {
default:
/* pass control up or down the stack depending on op->dir */
grpc_channel_next_op(elem, op);
break;
}
}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
/* initialize members */
calld->unused = channeld->unused;
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_call_element *elem) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
ignore_unused(calld);
ignore_unused(channeld);
}
/* Constructor for channel_data */
static void init_channel_elem(grpc_channel_element *elem,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
/* The first and the last filters tend to be implemented differently to
handle the case that there's no 'next' filter to call on the up or down
path */
GPR_ASSERT(!is_first);
GPR_ASSERT(!is_last);
/* initialize members */
channeld->unused = 0;
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_channel_element *elem) {
/* grab pointers to our data from the channel element */
channel_data *channeld = elem->channel_data;
ignore_unused(channeld);
}
const grpc_channel_filter grpc_http_filter = {
call_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "http"};

@ -1,43 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
#define GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H
#include "src/core/channel/channel_stack.h"
/* Processes metadata that is common to both client and server for HTTP2
transports. */
extern const grpc_channel_filter grpc_http_filter;
#endif /* GRPC_INTERNAL_CORE_CHANNEL_HTTP_FILTER_H */

@ -52,6 +52,10 @@ typedef struct call_data {
gpr_uint8 seen_scheme;
gpr_uint8 seen_te_trailers;
grpc_linked_mdelem status;
grpc_stream_op_buffer *recv_ops;
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
} call_data;
typedef struct channel_data {
@ -143,63 +147,73 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
/* Called either:
- in response to an API call (or similar) from above, to send something
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
static void hs_on_recv(void *user_data, int success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
size_t i;
size_t nops = calld->recv_ops->nops;
grpc_stream_op *ops = calld->recv_ops->ops;
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->got_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
calld->seen_path) {
/* do nothing */
} else {
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
success = 0;
grpc_call_element_send_cancel(elem);
}
}
}
calld->on_done_recv(calld->recv_user_data, success);
}
static void hs_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
size_t i;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
grpc_metadata_batch_filter(&op->data.metadata, server_filter, elem);
if (!calld->got_initial_metadata) {
calld->got_initial_metadata = 1;
/* Have we seen the required http2 transport headers?
(:method, :scheme, content-type, with :path and :authority covered
at the channel level right now) */
if (calld->seen_post && calld->seen_scheme && calld->seen_te_trailers &&
calld->seen_path) {
grpc_call_next_op(elem, op);
} else {
if (!calld->seen_post) {
gpr_log(GPR_ERROR, "Missing :method header");
}
if (!calld->seen_scheme) {
gpr_log(GPR_ERROR, "Missing :scheme header");
}
if (!calld->seen_te_trailers) {
gpr_log(GPR_ERROR, "Missing te trailers header");
}
/* Error this call out */
grpc_metadata_batch_destroy(&op->data.metadata);
op->done_cb(op->user_data, GRPC_OP_OK);
grpc_call_element_send_cancel(elem);
}
} else {
grpc_call_next_op(elem, op);
}
break;
case GRPC_SEND_METADATA:
/* If we haven't sent status 200 yet, we need to so so because it needs to
come before any non : prefixed metadata. */
if (!calld->sent_status) {
calld->sent_status = 1;
grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
grpc_mdelem_ref(channeld->status_ok));
}
grpc_call_next_op(elem, op);
break;
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
if (op->send_ops && !calld->sent_status) {
size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops;
for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->sent_status = 1;
grpc_metadata_batch_add_head(&op->data.metadata, &calld->status,
grpc_mdelem_ref(channeld->status_ok));
break;
}
}
if (op->recv_ops && !calld->got_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_ops = op->recv_ops;
calld->on_done_recv = op->on_done_recv;
calld->recv_user_data = op->recv_user_data;
op->on_done_recv = hs_on_recv;
op->recv_user_data = elem;
}
grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@ -321,6 +335,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_http_server_filter = {
call_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
hs_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem,
"http-server"};

@ -50,8 +50,7 @@ static void ignore_unused(void *ignored) {}
- a network event (or similar) from below, to receive something
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
static void noop_start_transport_op(grpc_call_element *elem, grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -59,12 +58,8 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
ignore_unused(calld);
ignore_unused(channeld);
switch (op->type) {
default:
/* pass control up or down the stack depending on op->dir */
grpc_call_next_op(elem, op);
break;
}
/* pass control down the stack */
grpc_call_next_op(elem, op);
}
/* Called on special channel events, such as disconnection or new incoming
@ -131,6 +126,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_no_op_filter = {
call_op, channel_op, sizeof(call_data),
noop_start_transport_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "no-op"};

@ -150,6 +150,9 @@ struct grpc_call {
gpr_uint8 num_completed_requests;
/* flag that we need to request more data */
gpr_uint8 need_more_data;
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint8 last_send_contains;
/* Active ioreqs.
request_set and request_data contain one element per active ioreq
@ -214,6 +217,10 @@ struct grpc_call {
size_t send_initial_metadata_count;
gpr_timespec send_deadline;
grpc_stream_op_buffer send_ops;
grpc_stream_op_buffer recv_ops;
grpc_stream_state recv_state;
/* Data that the legacy api needs to track. To be deleted at some point
soon */
legacy_state *legacy_state;
@ -234,9 +241,11 @@ struct grpc_call {
} while (0)
static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static send_action choose_send_action(grpc_call *call);
static void enact_send_action(grpc_call *call, send_action sa);
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
static int fill_send_ops(grpc_call *call, grpc_transport_op *op);
static void execute_op(grpc_call *call, grpc_transport_op *op);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
const void *server_transport_data,
@ -359,20 +368,6 @@ static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
return GRPC_CALL_OK;
}
static void request_more_data(grpc_call *call) {
grpc_call_op op;
/* call down */
op.type = GRPC_REQUEST_DATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.done_cb = do_nothing;
op.user_data = NULL;
op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
}
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@ -384,16 +379,31 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static void unlock(grpc_call *call) {
send_action sa = SEND_NOTHING;
grpc_transport_op op;
completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
int completing_requests = 0;
int need_more_data =
call->need_more_data &&
(call->write_state >= WRITE_STATE_STARTED || !call->is_client);
int start_op = 0;
int i;
if (need_more_data) {
memset(&op, 0, sizeof(op));
if (call->need_more_data &&
(call->write_state >= WRITE_STATE_STARTED || !call->is_client)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = call_on_done_recv;
op.recv_user_data = call;
call->need_more_data = 0;
grpc_call_internal_ref(call);
start_op = 1;
}
if (!call->sending) {
if (fill_send_ops(call, &op)) {
call->sending = 1;
grpc_call_internal_ref(call);
start_op = 1;
}
}
if (!call->completing && call->num_completed_requests != 0) {
@ -405,22 +415,10 @@ static void unlock(grpc_call *call) {
grpc_call_internal_ref(call);
}
if (!call->sending) {
sa = choose_send_action(call);
if (sa != SEND_NOTHING) {
call->sending = 1;
grpc_call_internal_ref(call);
}
}
gpr_mu_unlock(&call->mu);
if (need_more_data) {
request_more_data(call);
}
if (sa != SEND_NOTHING) {
enact_send_action(call, sa);
if (start_op) {
execute_op(call, &op);
}
if (completing_requests > 0) {
@ -577,6 +575,64 @@ static void finish_start_step(void *pc, grpc_op_error error) {
error);
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
grpc_metadata *metadata) {
size_t i;
grpc_mdelem_list out;
if (count == 0) {
out.head = out.tail = NULL;
return out;
}
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
md->value_length);
l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
}
out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
return out;
}
static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
grpc_ioreq_data data;
grpc_metadata_batch mdb;
size_t i;
GPR_ASSERT(op->send_ops == NULL);
switch (call->write_state) {
case WRITE_STATE_INITIAL:
if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
break;
}
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
mdb.list = chain_metadata_from_app(
call, data.send_metadata.count, data.send_metadata.metadata);
mdb.garbage.head = mdb.garbage.tail = NULL;
mdb.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
grpc_metadata_batch_link_head(&mdb,
&call->send_initial_metadata[i]);
}
grpc_sopb_add_metadata(&call->send_ops, mdb);
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << WRITE_STATE_INITIAL;
/* fall through intended */
case WRITE_STATE_STARTED:
if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
grpc_sopb_add_message(data.send_message);
abort();
}
}
static send_action choose_send_action(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_INITIAL:
@ -614,33 +670,7 @@ static send_action choose_send_action(grpc_call *call) {
return SEND_NOTHING;
}
static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
grpc_metadata *metadata) {
size_t i;
grpc_mdelem_list out;
if (count == 0) {
out.head = out.tail = NULL;
return out;
}
for (i = 0; i < count; i++) {
grpc_metadata *md = &metadata[i];
grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
assert(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
(const gpr_uint8 *)md->value,
md->value_length);
l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
}
out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
return out;
}
static void enact_send_action(grpc_call *call, send_action sa) {
grpc_ioreq_data data;
grpc_call_op op;
size_t i;
gpr_uint32 flags = 0;
@ -654,37 +684,11 @@ static void enact_send_action(grpc_call *call, send_action sa) {
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_INITIAL_METADATA:
data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.data.metadata.list = chain_metadata_from_app(
call, data.send_metadata.count, data.send_metadata.metadata);
op.data.metadata.garbage.head = op.data.metadata.garbage.tail = NULL;
op.data.metadata.deadline = call->send_deadline;
for (i = 0; i < call->send_initial_metadata_count; i++) {
grpc_metadata_batch_link_head(&op.data.metadata,
&call->send_initial_metadata[i]);
}
call->send_initial_metadata_count = 0;
op.done_cb = finish_start_step;
op.user_data = call;
op.bind_pollset = grpc_cq_pollset(call->cq);
grpc_call_execute_op(call, &op);
break;
case SEND_BUFFERED_MESSAGE:
flags |= GRPC_WRITE_BUFFER_HINT;
/* fallthrough */
case SEND_MESSAGE:
data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
op.type = GRPC_SEND_MESSAGE;
op.dir = GRPC_CALL_DOWN;
op.flags = flags;
op.data.message = data.send_message;
op.done_cb = finish_write_step;
op.user_data = call;
op.bind_pollset = NULL;
grpc_call_execute_op(call, &op);
break;
case SEND_TRAILING_METADATA_AND_FINISH:
/* send trailing metadata */

@ -106,7 +106,6 @@ void grpc_call_recv_message(grpc_call_element *surface_element,
void grpc_call_read_closed(grpc_call_element *surface_element);
void grpc_call_stream_closed(grpc_call_element *surface_element);
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);

@ -503,7 +503,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
gpr_mu_lock(&t->mu);
t->calling_back = 1;
ref_transport(t);
ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
sr = setup(arg, &t->base, t->metadata_context);
@ -515,7 +515,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
ref_transport(t);
ref_transport(t); /* matches unref inside recv_data */
recv_data(t, slices, nslices, GRPC_ENDPOINT_CB_OK);
unref_transport(t);

@ -50,22 +50,9 @@ typedef enum grpc_stream_op_code {
Must be ignored by receivers */
GRPC_NO_OP,
GRPC_OP_METADATA,
/* Begin a message/metadata element/status - as defined by
grpc_message_type. */
GRPC_OP_BEGIN_MESSAGE,
/* Add a slice of data to the current message/metadata element/status.
Must not overflow the forward declared length. */
GRPC_OP_SLICE
GRPC_OP_MESSAGE
} grpc_stream_op_code;
/* Arguments for GRPC_OP_BEGIN */
typedef struct grpc_begin_message {
/* How many bytes of data will this message contain */
gpr_uint32 length;
/* Write flags for the message: see grpc.h GRPC_WRITE_xxx */
gpr_uint32 flags;
} grpc_begin_message;
typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
@ -118,9 +105,8 @@ typedef struct grpc_stream_op {
/* the arguments to this operation. union fields are named according to the
associated op-code */
union {
grpc_begin_message begin_message;
grpc_byte_buffer *message;
grpc_metadata_batch metadata;
gpr_slice slice;
} data;
} grpc_stream_op;
@ -148,16 +134,8 @@ void grpc_stream_ops_unref_owned_objects(grpc_stream_op *ops, size_t nops);
/* Append a GRPC_NO_OP to a buffer */
void grpc_sopb_add_no_op(grpc_stream_op_buffer *sopb);
/* Append a GRPC_OP_BEGIN to a buffer */
void grpc_sopb_add_begin_message(grpc_stream_op_buffer *sopb, gpr_uint32 length,
gpr_uint32 flags);
void grpc_sopb_add_message(grpc_stream_op_buffer *sopb, grpc_byte_buffer *bb);
void grpc_sopb_add_metadata(grpc_stream_op_buffer *sopb, grpc_metadata_batch metadata);
/* Append a GRPC_SLICE to a buffer - does not ref/unref the slice */
void grpc_sopb_add_slice(grpc_stream_op_buffer *sopb, gpr_slice slice);
/* Append a GRPC_OP_FLOW_CTL_CB to a buffer */
void grpc_sopb_add_flow_ctl_cb(grpc_stream_op_buffer *sopb,
void (*cb)(void *arg, grpc_op_error error),
void *arg);
/* Append a buffer to a buffer - does not ref/unref any internal objects */
void grpc_sopb_append(grpc_stream_op_buffer *sopb, grpc_stream_op *ops,
size_t nops);

@ -78,6 +78,8 @@ struct grpc_transport_callbacks {
void (*accept_stream)(void *user_data, grpc_transport *transport,
const void *server_data);
void (*goaway)(void *user_data, grpc_transport *transport, grpc_status_code status, gpr_slice debug);
/* The transport has been closed */
void (*closed)(void *user_data, grpc_transport *transport);
};
@ -139,8 +141,14 @@ typedef struct grpc_transport_op {
void *recv_user_data;
grpc_pollset *bind_pollset;
grpc_status_code cancel_with_status;
} grpc_transport_op;
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
char *grpc_transport_op_string(grpc_transport_op *op);
/* Send a batch of operations on a transport
Takes ownership of any objects contained in ops.
@ -161,19 +169,6 @@ void grpc_transport_perform_op(grpc_transport *transport, grpc_stream *stream,
void grpc_transport_ping(grpc_transport *transport, void (*cb)(void *user_data),
void *user_data);
/* Abort a stream
Terminate reading and writing for a stream. A final recv_batch with no
operations and final_state == GRPC_STREAM_CLOSED will be received locally,
and no more data will be presented to the up-layer.
TODO(ctiller): consider adding a HTTP/2 reason to this function. */
void grpc_transport_abort_stream(grpc_transport *transport, grpc_stream *stream,
grpc_status_code status);
void grpc_transport_add_to_pollset(grpc_transport *transport,
grpc_pollset *pollset);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,
gpr_slice debug_data);

@ -0,0 +1,141 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/channel/channel_stack.h"
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
gpr_strvec_add(b, gpr_strdup(" key="));
gpr_strvec_add(
b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
gpr_strvec_add(b, gpr_strdup(" value="));
gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
GPR_SLICE_LENGTH(md->value->slice),
GPR_HEXDUMP_PLAINTEXT));
}
static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
grpc_linked_mdelem *m;
for (m = md.list.head; m != NULL; m = m->next) {
put_metadata(b, m->md);
}
if (gpr_time_cmp(md.deadline, gpr_inf_future) != 0) {
char *tmp;
gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
md.deadline.tv_nsec);
gpr_strvec_add(b, tmp);
}
}
char *grpc_call_op_string(grpc_call_op *op) {
char *tmp;
char *out;
gpr_strvec b;
gpr_strvec_init(&b);
switch (op->dir) {
case GRPC_CALL_DOWN:
gpr_strvec_add(&b, gpr_strdup(">"));
break;
case GRPC_CALL_UP:
gpr_strvec_add(&b, gpr_strdup("<"));
break;
}
switch (op->type) {
case GRPC_SEND_METADATA:
gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_SEND_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
break;
case GRPC_SEND_PREFORMATTED_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("SEND_PREFORMATTED_MESSAGE"));
break;
case GRPC_SEND_FINISH:
gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
break;
case GRPC_REQUEST_DATA:
gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
break;
case GRPC_RECV_METADATA:
gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
put_metadata_list(&b, op->data.metadata);
break;
case GRPC_RECV_MESSAGE:
gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
break;
case GRPC_RECV_HALF_CLOSE:
gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
break;
case GRPC_RECV_FINISH:
gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
case GRPC_RECV_SYNTHETIC_STATUS:
gpr_asprintf(&tmp, "RECV_SYNTHETIC_STATUS status=%d message='%s'",
op->data.synthetic_status.status,
op->data.synthetic_status.message);
gpr_strvec_add(&b, tmp);
break;
case GRPC_CANCEL_OP:
gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
gpr_strvec_add(&b, tmp);
if (op->bind_pollset) {
gpr_strvec_add(&b, gpr_strdup("bind_pollset"));
}
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);
return out;
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem, grpc_call_op *op) {
char *str = grpc_call_op_string(op);
gpr_log(file, line, severity, "OP[%s:%p]: %s", elem->filter->name, elem, str);
gpr_free(str);
}

@ -107,7 +107,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@ -229,8 +228,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@ -247,8 +244,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@ -423,6 +418,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="gpr.vcxproj">

@ -61,9 +61,6 @@
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -88,9 +85,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -352,6 +346,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
<Filter>src\core\transport</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\grpc_security.h">
@ -443,9 +440,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -91,7 +91,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@ -175,8 +174,6 @@
<ItemGroup>
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@ -193,8 +190,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@ -369,6 +364,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="gpr.vcxproj">

@ -4,9 +4,6 @@
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -31,9 +28,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -295,6 +289,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
<Filter>src\core\transport</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\byte_buffer.h">
@ -338,9 +335,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -109,7 +109,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@ -231,8 +230,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@ -249,8 +246,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@ -425,6 +420,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="gpr.vcxproj">

@ -61,9 +61,6 @@
<ClCompile Include="..\..\src\core\tsi\transport_security.c">
<Filter>src\core\tsi</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -88,9 +85,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -352,6 +346,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
<Filter>src\core\transport</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\grpc_security.h">
@ -443,9 +440,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

@ -93,7 +93,6 @@
<ClInclude Include="..\..\src\core\channel\client_setup.h" />
<ClInclude Include="..\..\src\core\channel\connected_channel.h" />
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
@ -177,8 +176,6 @@
<ItemGroup>
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\channel_args.c">
@ -195,8 +192,6 @@
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
</ClCompile>
<ClCompile Include="..\..\src\core\channel\noop_filter.c">
@ -371,6 +366,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport.c">
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="gpr.vcxproj">

@ -4,9 +4,6 @@
<ClCompile Include="..\..\src\core\surface\init_unsecure.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\call_op_string.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\census_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -31,9 +28,6 @@
<ClCompile Include="..\..\src\core\channel\http_client_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\channel\http_server_filter.c">
<Filter>src\core\channel</Filter>
</ClCompile>
@ -295,6 +289,9 @@
<ClCompile Include="..\..\src\core\transport\transport.c">
<Filter>src\core\transport</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\transport\transport_op_string.c">
<Filter>src\core\transport</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\include\grpc\byte_buffer.h">
@ -338,9 +335,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\channel\http_server_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>

Loading…
Cancel
Save