Use an arena for call & subchannel_call allocation

reviewable/pr10135/r1
Craig Tiller 8 years ago
parent e7a1702fe9
commit d426caca81
  1. 4
      src/core/ext/census/grpc_filter.c
  2. 26
      src/core/ext/client_channel/client_channel.c
  3. 33
      src/core/ext/client_channel/subchannel.c
  4. 18
      src/core/ext/client_channel/subchannel.h
  5. 2
      src/core/ext/load_reporting/load_reporting_filter.c
  6. 7
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  7. 2
      src/core/ext/transport/chttp2/transport/internal.h
  8. 29
      src/core/lib/channel/channel_stack.c
  9. 13
      src/core/lib/channel/channel_stack.h
  10. 2
      src/core/lib/security/transport/client_auth_filter.c
  11. 2
      src/core/lib/security/transport/server_auth_filter.c
  12. 15
      src/core/lib/surface/call.c
  13. 5
      src/core/lib/transport/transport.c
  14. 16
      test/core/channel/channel_stack_test.c
  15. 2
      test/core/end2end/tests/filter_call_init_fails.c
  16. 2
      test/core/end2end/tests/filter_causes_close.c
  17. 4
      test/core/end2end/tests/filter_latency.c

@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) { grpc_closure *ignored) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) { grpc_closure *ignored) {
call_data *d = elem->call_data; call_data *d = elem->call_data;
GPR_ASSERT(d != NULL); GPR_ASSERT(d != NULL);
/* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */

@ -660,6 +660,7 @@ typedef struct client_channel_call_data {
/** either 0 for no call, 1 for cancelled, or a pointer to a /** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */ grpc_subchannel_call */
gpr_atm subchannel_call; gpr_atm subchannel_call;
gpr_arena *arena;
subchannel_creation_phase creation_phase; subchannel_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel; grpc_connected_subchannel *connected_subchannel;
@ -754,9 +755,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
} else { } else {
/* Create call on subchannel. */ /* Create call on subchannel. */
grpc_subchannel_call *subchannel_call = NULL; grpc_subchannel_call *subchannel_call = NULL;
grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena};
grpc_error *new_error = grpc_connected_subchannel_create_call( grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
calld->call_start_time, calld->deadline, &subchannel_call);
if (new_error != GRPC_ERROR_NONE) { if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error); new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL; subchannel_call = CANCELLED_CALL;
@ -982,9 +988,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
calld->connected_subchannel != NULL) { calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL; grpc_subchannel_call *subchannel_call = NULL;
grpc_connected_subchannel_call_args call_args = {
.pollent = calld->pollent,
.path = calld->path,
.start_time = calld->call_start_time,
.deadline = calld->deadline,
.arena = calld->arena};
grpc_error *error = grpc_connected_subchannel_create_call( grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
calld->call_start_time, calld->deadline, &subchannel_call);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL; subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
@ -1161,6 +1172,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
calld->owning_call = args->call_stack; calld->owning_call = args->call_stack;
calld->pollent = NULL; calld->pollent = NULL;
calld->arena = args->arena;
GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
grpc_closure_sched( grpc_closure_sched(
exec_ctx, exec_ctx,
@ -1175,7 +1187,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) { grpc_closure *then_schedule_closure) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem); grpc_deadline_state_destroy(exec_ctx, elem);
grpc_slice_unref_internal(exec_ctx, calld->path); grpc_slice_unref_internal(exec_ctx, calld->path);
@ -1185,6 +1197,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(calld->cancel_error); GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld); grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) { if (call != NULL && call != CANCELLED_CALL) {
grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure);
then_schedule_closure = NULL;
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");
} }
GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
@ -1194,7 +1208,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
"picked"); "picked");
} }
gpr_free(calld->waiting_ops); gpr_free(calld->waiting_ops);
gpr_free(and_free_memory); grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE);
} }
static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -148,6 +148,7 @@ struct grpc_subchannel {
struct grpc_subchannel_call { struct grpc_subchannel_call {
grpc_connected_subchannel *connection; grpc_connected_subchannel *connection;
grpc_closure *schedule_closure_after_destroy;
}; };
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
@ -719,13 +720,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) { grpc_error *error) {
grpc_subchannel_call *c = call; grpc_subchannel_call *c = call;
GPR_ASSERT(c->schedule_closure_after_destroy != NULL);
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_connected_subchannel *connection = c->connection; grpc_connected_subchannel *connection = c->connection;
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL,
c->schedule_closure_after_destroy);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call");
GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
} }
void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call,
grpc_closure *closure) {
GPR_ASSERT(call->schedule_closure_after_destroy == NULL);
GPR_ASSERT(closure != NULL);
call->schedule_closure_after_destroy = closure;
}
void grpc_subchannel_call_ref( void grpc_subchannel_call_ref(
grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
@ -761,15 +771,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call( grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, const grpc_connected_subchannel_call_args *args,
gpr_timespec deadline, grpc_subchannel_call **call) { grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); *call = gpr_arena_alloc(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below. (*call)->connection = con; // Ref is added below.
grpc_error *error = grpc_call_element_args call_args = {.call_stack = callstk,
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, .server_transport_data = NULL,
NULL, NULL, path, start_time, deadline, callstk); .context = NULL,
.path = args->path,
.start_time = args->start_time,
.deadline = args->deadline,
.arena = args->arena};
grpc_error *error = grpc_call_stack_init(
exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error); const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string); gpr_log(GPR_ERROR, "error: %s", error_string);
@ -778,7 +795,7 @@ grpc_error *grpc_connected_subchannel_create_call(
return error; return error;
} }
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -37,6 +37,7 @@
#include "src/core/ext/client_channel/connector.h" #include "src/core/ext/client_channel/connector.h"
#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/metadata.h"
@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS); GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */ /** construct a subchannel call */
typedef struct {
grpc_polling_entity *pollent;
grpc_slice path;
gpr_timespec start_time;
gpr_timespec deadline;
gpr_arena *arena;
} grpc_connected_subchannel_call_args;
grpc_error *grpc_connected_subchannel_create_call( grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, const grpc_connected_subchannel_call_args *args,
gpr_timespec deadline, grpc_subchannel_call **subchannel_call); grpc_subchannel_call **subchannel_call);
/** process a transport level op */ /** process a transport level op */
void grpc_connected_subchannel_process_transport_op( void grpc_connected_subchannel_process_transport_op(
@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *subchannel_call); grpc_subchannel_call *subchannel_call);
/** Must be called once per call. Sets the 'then_schedule_closure' argument for
call stack destruction. */
void grpc_subchannel_call_set_cleanup_closure(
grpc_subchannel_call *subchannel_call, grpc_closure *closure);
grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_call_stack *grpc_subchannel_call_get_call_stack(
grpc_subchannel_call *subchannel_call); grpc_subchannel_call *subchannel_call);

@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) { grpc_closure *ignored) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
/* TODO(dgq): do something with the data /* TODO(dgq): do something with the data

@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_TIMER_END("destroy_stream", 0); GPR_TIMER_END("destroy_stream", 0);
gpr_free(s->destroy_stream_arg); grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE);
} }
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, void *and_free_memory) { grpc_stream *gs,
grpc_closure *then_schedule_closure) {
GPR_TIMER_BEGIN("destroy_stream", 0); GPR_TIMER_BEGIN("destroy_stream", 0);
grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
s->destroy_stream_arg = and_free_memory; s->destroy_stream_arg = then_schedule_closure;
grpc_closure_sched( grpc_closure_sched(
exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s,
grpc_combiner_scheduler(t->combiner, false)), grpc_combiner_scheduler(t->combiner, false)),

@ -425,7 +425,7 @@ struct grpc_chttp2_stream {
grpc_stream_refcount *refcount; grpc_stream_refcount *refcount;
grpc_closure destroy_stream; grpc_closure destroy_stream;
void *destroy_stream_arg; grpc_closure *destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
uint8_t included[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT];

@ -166,41 +166,32 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
} }
} }
grpc_error *grpc_call_stack_init( grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, int initial_refs, grpc_iomgr_cb_func destroy,
grpc_call_context_element *context, const void *transport_server_data, void *destroy_arg,
grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, const grpc_call_element_args *elem_args) {
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
size_t count = channel_stack->count; size_t count = channel_stack->count;
grpc_call_element *call_elems; grpc_call_element *call_elems;
char *user_data; char *user_data;
size_t i; size_t i;
call_stack->count = count; elem_args->call_stack->count = count;
GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy,
destroy_arg, "CALL_STACK"); destroy_arg, "CALL_STACK");
call_elems = CALL_ELEMS_FROM_STACK(call_stack); call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack);
user_data = ((char *)call_elems) + user_data = ((char *)call_elems) +
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */ /* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE; grpc_error *first_error = GRPC_ERROR_NONE;
const grpc_call_element_args args = {
.start_time = start_time,
.call_stack = call_stack,
.server_transport_data = transport_server_data,
.context = context,
.path = path,
.deadline = deadline,
};
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
call_elems[i].filter = channel_elems[i].filter; call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data; call_elems[i].call_data = user_data;
grpc_error *error = grpc_error *error = call_elems[i].filter->init_call_elem(
call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); exec_ctx, &call_elems[i], elem_args);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
if (first_error == GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) {
first_error = error; first_error = error;

@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h" #include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -84,6 +85,7 @@ typedef struct {
grpc_slice path; grpc_slice path;
gpr_timespec start_time; gpr_timespec start_time;
gpr_timespec deadline; gpr_timespec deadline;
gpr_arena *arena;
} grpc_call_element_args; } grpc_call_element_args;
typedef struct { typedef struct {
@ -236,12 +238,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is /* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the expected to be NULL on a client, or an opaque transport owned pointer on the
server. */ server. */
grpc_error *grpc_call_stack_init( grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, int initial_refs, grpc_iomgr_cb_func destroy,
grpc_call_context_element *context, const void *transport_server_data, void *destroy_arg,
grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, const grpc_call_element_args *elem_args);
grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first /* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */ * op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

@ -318,7 +318,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) { grpc_closure *ignored) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_call_credentials_unref(exec_ctx, calld->creds); grpc_call_credentials_unref(exec_ctx, calld->creds);
if (calld->have_host) { if (calld->have_host) {

@ -227,7 +227,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) {} grpc_closure *ignored) {}
/* Constructor for channel_data */ /* Constructor for channel_data */
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,

@ -367,11 +367,16 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */ /* initial refcount dropped by grpc_call_destroy */
grpc_call_element_args call_args = {
.call_stack = CALL_STACK_FROM_CALL(call),
.server_transport_data = args->server_transport_data,
.context = call->context,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
.arena = call->arena};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, call->context, destroy_call, call, &call_args));
args->server_transport_data, path,
call->start_time, send_deadline,
CALL_STACK_FROM_CALL(call)));
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
@ -431,8 +436,8 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
static void release_call(grpc_exec_ctx *exec_ctx, void *call, static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) { grpc_error *error) {
grpc_call *c = call; grpc_call *c = call;
gpr_arena_destroy(c->arena);
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call"); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
gpr_arena_destroy(c->arena);
} }
static void set_status_value_directly(grpc_status_code status, void *dest); static void set_status_value_directly(grpc_status_code status, void *dest);

@ -197,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
grpc_transport *transport, grpc_transport *transport,
grpc_stream *stream, void *and_free_memory) { grpc_stream *stream,
grpc_closure *then_schedule_closure) {
transport->vtable->destroy_stream(exec_ctx, transport, stream, transport->vtable->destroy_stream(exec_ctx, transport, stream,
and_free_memory); then_schedule_closure);
} }
char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,

@ -68,7 +68,7 @@ static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void call_destroy_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *ignored) { grpc_closure *ignored) {
++*(int *)(elem->channel_data); ++*(int *)(elem->channel_data);
} }
@ -139,10 +139,16 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0); GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size); call_stack = gpr_malloc(channel_stack->call_stack_size);
grpc_error *error = grpc_call_element_args args = {
grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, .call_stack = call_stack,
NULL, NULL, path, gpr_now(GPR_CLOCK_MONOTONIC), .server_transport_data = NULL,
gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack); .context = NULL,
.path = path,
.start_time = gpr_now(GPR_CLOCK_MONOTONIC),
.deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC),
.arena = NULL};
grpc_error *error = grpc_call_stack_init(&exec_ctx, channel_stack, 1,
free_call, call_stack, &args);
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1); GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0); call_elem = grpc_call_stack_element(call_stack, 0);

@ -213,7 +213,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) {} grpc_closure *ignored) {}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,

@ -236,7 +236,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) {} grpc_closure *ignored) {}
static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem, grpc_channel_element *elem,

@ -267,7 +267,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) { grpc_closure *ignored) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
g_client_latency = final_info->stats.latency; g_client_latency = final_info->stats.latency;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
@ -276,7 +276,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
void *and_free_memory) { grpc_closure *ignored) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
g_server_latency = final_info->stats.latency; g_server_latency = final_info->stats.latency;
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);

Loading…
Cancel
Save