Merge pull request #2286 from ctiller/split-me-baby-one-more-time

Further lock contention reduction for CHTTP2
pull/6357/head
Jan Tattermusch 9 years ago
commit 0d6fee6c8e
  1. 4
      src/core/ext/census/grpc_filter.c
  2. 5
      src/core/ext/client_config/client_channel.c
  3. 6
      src/core/ext/client_config/subchannel.c
  4. 657
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  5. 63
      src/core/ext/transport/chttp2/transport/internal.h
  6. 8
      src/core/ext/transport/chttp2/transport/stream_lists.c
  7. 6
      src/core/lib/channel/channel_stack.c
  8. 11
      src/core/lib/channel/channel_stack.h
  9. 4
      src/core/lib/channel/compress_filter.c
  10. 7
      src/core/lib/channel/connected_channel.c
  11. 4
      src/core/lib/channel/http_client_filter.c
  12. 4
      src/core/lib/channel/http_server_filter.c
  13. 6
      src/core/lib/iomgr/iomgr.c
  14. 4
      src/core/lib/security/client_auth_filter.c
  15. 4
      src/core/lib/security/server_auth_filter.c
  16. 6
      src/core/lib/surface/call.c
  17. 4
      src/core/lib/surface/completion_queue.c
  18. 6
      src/core/lib/surface/lame_client.c
  19. 4
      src/core/lib/surface/server.c
  20. 5
      src/core/lib/transport/transport.c
  21. 12
      src/core/lib/transport/transport.h
  22. 2
      src/core/lib/transport/transport_impl.h
  23. 6
      test/core/channel/channel_stack_test.c
  24. 4
      test/core/end2end/tests/filter_causes_close.c

@ -134,7 +134,7 @@ static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
} }
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@ -152,7 +152,7 @@ static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
} }
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem, void *ignored) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */

@ -415,9 +415,10 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *and_free_memory) {
grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data); grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
gpr_free(and_free_memory);
} }
/* Constructor for channel_data */ /* Constructor for channel_data */

@ -644,9 +644,9 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
bool success) { bool success) {
grpc_subchannel_call *c = call; grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c)); grpc_connected_subchannel *connection = c->connection;
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, c->connection, "subchannel_call"); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), c);
gpr_free(c); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
} }

File diff suppressed because it is too large Load Diff

@ -291,27 +291,44 @@ struct grpc_chttp2_transport_parsing {
int64_t outgoing_window; int64_t outgoing_window;
}; };
typedef void (*grpc_chttp2_locked_action)(grpc_exec_ctx *ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, void *arg);
typedef struct grpc_chttp2_executor_action_header {
grpc_chttp2_stream *stream;
grpc_chttp2_locked_action action;
struct grpc_chttp2_executor_action_header *next;
void *arg;
} grpc_chttp2_executor_action_header;
struct grpc_chttp2_transport { struct grpc_chttp2_transport {
grpc_transport base; /* must be first */ grpc_transport base; /* must be first */
grpc_endpoint *ep;
gpr_refcount refs; gpr_refcount refs;
grpc_endpoint *ep;
char *peer_string; char *peer_string;
/** when this drops to zero it's safe to shutdown the endpoint */ /** when this drops to zero it's safe to shutdown the endpoint */
gpr_refcount shutdown_ep_refs; gpr_refcount shutdown_ep_refs;
struct {
gpr_mu mu; gpr_mu mu;
/** is a thread currently in the global lock */
bool global_active;
/** is a thread currently writing */
bool writing_active;
/** is a thread currently parsing */
bool parsing_active;
grpc_chttp2_executor_action_header *pending_actions;
} executor;
/** is the transport destroying itself? */ /** is the transport destroying itself? */
uint8_t destroying; uint8_t destroying;
/** has the upper layer closed the transport? */ /** has the upper layer closed the transport? */
uint8_t closed; uint8_t closed;
/** is a thread currently writing */
uint8_t writing_active;
/** is a thread currently parsing */
uint8_t parsing_active;
/** is there a read request to the endpoint outstanding? */ /** is there a read request to the endpoint outstanding? */
uint8_t endpoint_reading; uint8_t endpoint_reading;
@ -338,8 +355,10 @@ struct grpc_chttp2_transport {
/** closure to execute writing */ /** closure to execute writing */
grpc_closure writing_action; grpc_closure writing_action;
/** closure to finish reading from the endpoint */ /** closure to start reading from the endpoint */
grpc_closure recv_data; grpc_closure reading_action;
/** closure to actually do parsing */
grpc_closure parsing_action;
/** incoming read bytes */ /** incoming read bytes */
gpr_slice_buffer read_buffer; gpr_slice_buffer read_buffer;
@ -397,21 +416,26 @@ typedef struct {
grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats; grpc_transport_stream_stats stats;
/** number of streams that are currently being read */
gpr_refcount active_streams;
/** when the application requests writes be closed, the write_closed is /** when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are 'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */ 'sending' it; when the write has been performed it is 'sent' */
uint8_t write_closed; bool write_closed;
/** is this stream reading half-closed (boolean) */ /** is this stream reading half-closed (boolean) */
uint8_t read_closed; bool read_closed;
/** are all published incoming byte streams closed */
bool all_incoming_byte_streams_finished;
/** is this stream in the stream map? (boolean) */ /** is this stream in the stream map? (boolean) */
uint8_t in_stream_map; bool in_stream_map;
/** has this stream seen an error? if 1, then pending incoming frames /** has this stream seen an error? if 1, then pending incoming frames
can be thrown away */ can be thrown away */
uint8_t seen_error; bool seen_error;
uint8_t published_initial_metadata; bool published_initial_metadata;
uint8_t published_trailing_metadata; bool published_trailing_metadata;
uint8_t faked_trailing_metadata; bool faked_trailing_metadata;
grpc_chttp2_incoming_metadata_buffer received_initial_metadata; grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
grpc_chttp2_incoming_metadata_buffer received_trailing_metadata; grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
@ -570,6 +594,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
void grpc_chttp2_list_add_check_read_ops( void grpc_chttp2_list_add_check_read_ops(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_check_read_ops( int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global); grpc_chttp2_stream_global **stream_global);
@ -645,6 +672,12 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global, grpc_chttp2_stream_global *stream_global,
grpc_closure **pclosure, int success); grpc_closure **pclosure, int success);
void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *transport,
grpc_chttp2_stream *optional_stream,
grpc_chttp2_locked_action action,
void *arg, size_t sizeof_arg);
#define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
#define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \ #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
(sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1) (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)

@ -305,6 +305,14 @@ void grpc_chttp2_list_add_check_read_ops(
GRPC_CHTTP2_LIST_CHECK_READ_OPS); GRPC_CHTTP2_LIST_CHECK_READ_OPS);
} }
bool grpc_chttp2_list_remove_check_read_ops(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
return stream_list_maybe_remove(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CHECK_READ_OPS);
}
int grpc_chttp2_list_pop_check_read_ops( int grpc_chttp2_list_pop_check_read_ops(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) { grpc_chttp2_stream_global **stream_global) {

@ -213,14 +213,16 @@ void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
grpc_pollset *pollset) {} grpc_pollset *pollset) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack) { void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void *and_free_memory) {
grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack);
size_t count = stack->count; size_t count = stack->count;
size_t i; size_t i;
/* destroy per-filter data */ /* destroy per-filter data */
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
elems[i].filter->destroy_call_elem(exec_ctx, &elems[i]); elems[i].filter->destroy_call_elem(exec_ctx, &elems[i],
i == count - 1 ? and_free_memory : NULL);
} }
} }

@ -104,8 +104,12 @@ typedef struct {
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset); grpc_pollset *pollset);
/* Destroy per call data. /* Destroy per call data.
The filter does not need to do any chaining */ The filter does not need to do any chaining.
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
is complete. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
void *and_free_memory);
/* sizeof(per channel data) */ /* sizeof(per channel data) */
size_t sizeof_channel_data; size_t sizeof_channel_data;
@ -223,7 +227,8 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
#endif #endif
/* Destroy a call stack */ /* Destroy a call stack */
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack); void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void *and_free_memory);
/* Ignore set pollset - used by filters to implement the set_pollset method /* Ignore set pollset - used by filters to implement the set_pollset method
if they don't care about pollsets at all. Does nothing. */ if they don't care about pollsets at all. Does nothing. */

@ -246,8 +246,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *ignored) {
/* grab pointers to our data from the call element */ /* grab pointers to our data from the call element */
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
gpr_slice_buffer_destroy(&calld->slices); gpr_slice_buffer_destroy(&calld->slices);

@ -102,12 +102,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *and_free_memory) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
grpc_transport_destroy_stream(exec_ctx, chand->transport, grpc_transport_destroy_stream(exec_ctx, chand->transport,
TRANSPORT_STREAM_FROM_CALL_DATA(calld)); TRANSPORT_STREAM_FROM_CALL_DATA(calld),
and_free_memory);
} }
/* Constructor for channel_data */ /* Constructor for channel_data */

@ -155,8 +155,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) {} void *ignored) {}
static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) { static grpc_mdelem *scheme_from_args(const grpc_channel_args *args) {
unsigned i; unsigned i;

@ -225,8 +225,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) {} void *ignored) {}
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,

@ -166,8 +166,10 @@ bool grpc_iomgr_abort_on_leaks(void) {
if (env == NULL) return false; if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true", static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"}; "True", "TRUE", "1"};
bool should_we = false;
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) return true; if (0 == strcmp(env, truthy[i])) should_we = true;
} }
return false; gpr_free(env);
return should_we;
} }

@ -277,8 +277,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *ignored) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_call_credentials_unref(calld->creds); grpc_call_credentials_unref(calld->creds);
if (calld->host != NULL) { if (calld->host != NULL) {

@ -224,8 +224,8 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_pollset *pollset) {} grpc_pollset *pollset) {}
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) {} void *ignored) {}
/* Constructor for channel_data */ /* Constructor for channel_data */
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,

@ -373,8 +373,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->receiving_stream != NULL) { if (c->receiving_stream != NULL) {
grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); grpc_byte_stream_destroy(exec_ctx, c->receiving_stream);
} }
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_mu_destroy(&c->mu); gpr_mu_destroy(&c->mu);
for (i = 0; i < STATUS_SOURCE_COUNT; i++) { for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
if (c->status[i].details) { if (c->status[i].details) {
@ -392,7 +390,9 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
if (c->cq) { if (c->cq) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
} }
gpr_free(c); grpc_channel *channel = c->channel;
grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), c);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
GPR_TIMER_END("destroy_call", 0); GPR_TIMER_END("destroy_call", 0);
} }

@ -227,6 +227,10 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
#endif #endif
GPR_TIMER_BEGIN("grpc_cq_end_op", 0); GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
GRPC_API_TRACE(
"grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, success=%d, done=%p, "
"done_arg=%p, storage=%p)",
7, (exec_ctx, cc, tag, success, done, done_arg, storage));
storage->tag = tag; storage->tag = tag;
storage->done = done; storage->done = done;

@ -107,8 +107,10 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {} grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) {} void *and_free_memory) {
gpr_free(and_free_memory);
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,

@ -820,8 +820,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_ref(chand->server); server_ref(chand->server);
} }
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *ignored) {
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;

@ -133,8 +133,9 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_transport *transport,
grpc_stream *stream) { grpc_stream *stream, void *and_free_memory) {
transport->vtable->destroy_stream(exec_ctx, transport, stream); transport->vtable->destroy_stream(exec_ctx, transport, stream,
and_free_memory);
} }
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,

@ -98,6 +98,11 @@ void grpc_transport_move_stats(grpc_transport_stream_stats *from,
/* Transport stream op: a set of operations to perform on a transport /* Transport stream op: a set of operations to perform on a transport
against a single stream */ against a single stream */
typedef struct grpc_transport_stream_op { typedef struct grpc_transport_stream_op {
/** Should be enqueued when all requested operations (excluding recv_message
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
grpc_closure *on_complete;
/** Send initial metadata to the peer, from the provided metadata batch. /** Send initial metadata to the peer, from the provided metadata batch.
idempotent_request MUST be set if this is non-null */ idempotent_request MUST be set if this is non-null */
grpc_metadata_batch *send_initial_metadata; grpc_metadata_batch *send_initial_metadata;
@ -129,11 +134,6 @@ typedef struct grpc_transport_stream_op {
/** Collect any stats into provided buffer, zero internal stat counters */ /** Collect any stats into provided buffer, zero internal stat counters */
grpc_transport_stream_stats *collect_stats; grpc_transport_stream_stats *collect_stats;
/** Should be enqueued when all requested operations (excluding recv_message
and recv_initial_metadata which have their own closures) in a given batch
have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */ /** If != GRPC_STATUS_OK, cancel this stream */
grpc_status_code cancel_with_status; grpc_status_code cancel_with_status;
@ -213,7 +213,7 @@ void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
caller, but any child memory must be cleaned up) */ caller, but any child memory must be cleaned up) */
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_transport *transport,
grpc_stream *stream); grpc_stream *stream, void *and_free_memory);
void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op); grpc_transport_stream_op *op);

@ -63,7 +63,7 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy_stream */ /* implementation of grpc_transport_destroy_stream */
void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream); grpc_stream *stream, void *and_free_memory);
/* implementation of grpc_transport_destroy */ /* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);

@ -62,8 +62,8 @@ static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void channel_destroy_func(grpc_exec_ctx *exec_ctx, static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {} grpc_channel_element *elem) {}
static void call_destroy_func(grpc_exec_ctx *exec_ctx, static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) { void *ignored) {
++*(int *)(elem->channel_data); ++*(int *)(elem->channel_data);
} }
@ -87,7 +87,7 @@ static void free_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
} }
static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) { static void free_call(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_call_stack_destroy(exec_ctx, arg); grpc_call_stack_destroy(exec_ctx, arg, NULL);
gpr_free(arg); gpr_free(arg);
} }

@ -232,8 +232,8 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {} grpc_call_element_args *args) {}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element *elem) {} void *and_free_memory) {}
static void init_channel_elem(grpc_exec_ctx *exec_ctx, static void init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,

Loading…
Cancel
Save