Merge branch 'merge-parse' of github.com:ctiller/grpc into merge-parse

pull/7793/head
Craig Tiller 8 years ago
commit f0e6554c0d
  1. 4
      src/core/ext/client_config/subchannel.c
  2. 112
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  3. 29
      src/core/ext/transport/chttp2/transport/internal.h
  4. 20
      src/core/ext/transport/chttp2/transport/stream_lists.c
  5. 2
      src/core/lib/iomgr/error.c
  6. 97
      src/core/lib/surface/call.c
  7. 32
      src/core/lib/surface/call.h
  8. 18
      src/core/lib/surface/channel.c
  9. 15
      src/core/lib/surface/server.c
  10. 10
      src/core/lib/transport/transport.c
  11. 2
      src/core/lib/transport/transport.h

@ -219,8 +219,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta,
: gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"SUBCHANNEL: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, "SUBCHANNEL: %p %s 0x%08" PRIxPTR " -> 0x%08" PRIxPTR " [%s]", c,
old_val + delta, reason); purpose, old_val, old_val + delta, reason);
#endif #endif
return old_val; return old_val;
} }

@ -163,11 +163,9 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->lists[i].tail == NULL); GPR_ASSERT(t->lists[i].tail == NULL);
} }
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0); GPR_ASSERT(grpc_chttp2_stream_map_size(&t->stream_map) == 0);
GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
grpc_chttp2_stream_map_destroy(&t->parsing_stream_map); grpc_chttp2_stream_map_destroy(&t->stream_map);
grpc_chttp2_stream_map_destroy(&t->new_stream_map);
grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
grpc_combiner_destroy(exec_ctx, t->executor.combiner); grpc_combiner_destroy(exec_ctx, t->executor.combiner);
@ -277,8 +275,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
large enough that the exponential growth should happen nicely when it's large enough that the exponential growth should happen nicely when it's
needed. needed.
TODO(ctiller): tune this */ TODO(ctiller): tune this */
grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8); grpc_chttp2_stream_map_init(&t->stream_map, 8);
grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
/* copy in initial settings to all setting sets */ /* copy in initial settings to all setting sets */
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) { for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
@ -501,7 +498,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
REF_TRANSPORT(t, "stream"); REF_TRANSPORT(t, "stream");
if (server_data) { if (server_data) {
GPR_ASSERT(t->executor.parsing_active);
s->global.id = (uint32_t)(uintptr_t)server_data; s->global.id = (uint32_t)(uintptr_t)server_data;
s->global.outgoing_window = s->global.outgoing_window =
t->global.settings[GRPC_PEER_SETTINGS] t->global.settings[GRPC_PEER_SETTINGS]
@ -510,15 +506,16 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
t->global.settings[GRPC_SENT_SETTINGS] t->global.settings[GRPC_SENT_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s; *t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s); grpc_chttp2_stream_map_add(&t->stream_map, s->global.id, s);
grpc_chttp2_register_stream(t, s);
s->global.in_stream_map = true; s->global.in_stream_map = true;
} else {
grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
GRPC_CHTTP2_STREAM_REF(&s->global, "init");
grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
GRPC_ERROR_NONE);
} }
grpc_closure_init(&s->init_stream, finish_init_stream_locked, s);
GRPC_CHTTP2_STREAM_REF(&s->global, "init");
grpc_combiner_execute(exec_ctx, t->executor.combiner, &s->init_stream,
GRPC_ERROR_NONE);
GPR_TIMER_END("init_stream", 0); GPR_TIMER_END("init_stream", 0);
return 0; return 0;
@ -540,9 +537,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
exec_ctx, t, exec_ctx, t,
GRPC_ERROR_CREATE("Last stream closed after sending goaway")); GRPC_ERROR_CREATE("Last stream closed after sending goaway"));
} }
if (!t->executor.parsing_active && s->global.id) { if (s->global.id != 0) {
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map, GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->global.id) ==
s->global.id) == NULL); NULL);
} }
while ( while (
@ -597,8 +594,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream( grpc_chttp2_stream_global *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_global *transport_global, uint32_t id) { grpc_chttp2_transport_global *transport_global, uint32_t id) {
grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global);
grpc_chttp2_stream *s = grpc_chttp2_stream *s = grpc_chttp2_stream_map_find(&t->stream_map, id);
grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
return s ? &s->global : NULL; return s ? &s->global : NULL;
} }
@ -879,7 +875,8 @@ static void maybe_start_some_streams(
/* start streams where we have free grpc_chttp2_stream ids and free /* start streams where we have free grpc_chttp2_stream ids and free
* concurrency */ * concurrency */
while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID && while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
transport_global->concurrent_stream_count < grpc_chttp2_stream_map_size(
&TRANSPORT_FROM_GLOBAL(transport_global)->stream_map) <
transport_global transport_global
->settings[GRPC_PEER_SETTINGS] ->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] && [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
@ -910,10 +907,9 @@ static void maybe_start_some_streams(
stream_global->max_recv_bytes = stream_global->max_recv_bytes =
GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes); GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes);
grpc_chttp2_stream_map_add( grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map, &TRANSPORT_FROM_GLOBAL(transport_global)->stream_map, stream_global->id,
stream_global->id, STREAM_FROM_GLOBAL(stream_global)); STREAM_FROM_GLOBAL(stream_global));
stream_global->in_stream_map = true; stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true,
"new_stream"); "new_stream");
} }
@ -1246,15 +1242,6 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = op->transport_private.args[0]; grpc_chttp2_transport *t = op->transport_private.args[0];
grpc_error *close_transport = op->disconnect_with_error; grpc_error *close_transport = op->disconnect_with_error;
/* If there's a set_accept_stream ensure that we're not parsing
to avoid changing things out from underneath */
if (t->executor.parsing_active && op->set_accept_stream) {
GPR_ASSERT(t->post_parsing_op == NULL);
t->post_parsing_op = gpr_malloc(sizeof(*op));
memcpy(t->post_parsing_op, op, sizeof(*op));
return;
}
if (op->on_connectivity_state_change != NULL) { if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change( grpc_connectivity_state_notify_on_state_change(
exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state, exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
@ -1407,12 +1394,7 @@ static void decrement_active_streams_locked(
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) { uint32_t id, grpc_error *error) {
size_t new_stream_count; grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
grpc_chttp2_stream *s =
grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
if (!s) {
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
}
GPR_ASSERT(s); GPR_ASSERT(s);
s->global.in_stream_map = false; s->global.in_stream_map = false;
if (t->global.incoming_stream == &s->global) { if (t->global.incoming_stream == &s->global) {
@ -1435,14 +1417,9 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing"); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &s->global, "chttp2_writing");
} }
new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
grpc_chttp2_stream_map_size(&t->new_stream_map);
GPR_ASSERT(new_stream_count <= UINT32_MAX);
if (new_stream_count != t->global.concurrent_stream_count) {
t->global.concurrent_stream_count = (uint32_t)new_stream_count;
maybe_start_some_streams(exec_ctx, &t->global);
}
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
maybe_start_some_streams(exec_ctx, &t->global);
} }
static void status_codes_from_error(grpc_error *error, gpr_timespec deadline, static void status_codes_from_error(grpc_error *error, gpr_timespec deadline,
@ -1627,18 +1604,12 @@ void grpc_chttp2_mark_stream_closed(
} }
} }
if (stream_global->read_closed && stream_global->write_closed) { if (stream_global->read_closed && stream_global->write_closed) {
if (stream_global->id != 0 && if (stream_global->id != 0) {
TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, stream_global->id,
stream_global); removal_error(GRPC_ERROR_REF(error), stream_global));
} else {
if (stream_global->id != 0) {
remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
stream_global->id,
removal_error(GRPC_ERROR_REF(error), stream_global));
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
} }
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
@ -1874,13 +1845,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
GPR_ASSERT(!t->executor.parsing_active);
if (!t->closed) { if (!t->closed) {
t->executor.parsing_active = 1;
/* merge stream lists */
grpc_chttp2_stream_map_move_into(&t->new_stream_map,
&t->parsing_stream_map);
GPR_TIMER_BEGIN("reading_action.parse", 0); GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0; size_t i = 0;
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE, grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
@ -1903,8 +1868,8 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
GPR_TIMER_BEGIN("post_parse_locked", 0); GPR_TIMER_BEGIN("post_parse_locked", 0);
if (transport_global->initial_window_update != 0) { if (transport_global->initial_window_update != 0) {
update_global_window_args args = {t, exec_ctx}; update_global_window_args args = {t, exec_ctx};
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window,
update_global_window, &args); &args);
transport_global->initial_window_update = 0; transport_global->initial_window_update = 0;
} }
/* handle higher level things */ /* handle higher level things */
@ -1919,27 +1884,6 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
grpc_chttp2_initiate_write(exec_ctx, transport_global, false, grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"global incoming window"); "global incoming window");
} }
t->executor.parsing_active = 0;
/* handle delayed transport ops (if there is one) */
if (t->post_parsing_op) {
grpc_transport_op *op = t->post_parsing_op;
t->post_parsing_op = NULL;
perform_transport_op_locked(exec_ctx, op, GRPC_ERROR_NONE);
gpr_free(op);
}
/* if a stream is in the stream map, and gets cancelled, we need to
* ensure we are not parsing before continuing the cancellation to keep
* things in a sane state */
grpc_chttp2_stream_global *stream_global;
while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
&stream_global)) {
GPR_ASSERT(stream_global->in_stream_map);
GPR_ASSERT(stream_global->write_closed);
GPR_ASSERT(stream_global->read_closed);
remove_stream(exec_ctx, t, stream_global->id,
removal_error(GRPC_ERROR_NONE, stream_global));
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
}
GPR_TIMER_END("post_parse_locked", 0); GPR_TIMER_END("post_parse_locked", 0);
} }

@ -64,7 +64,6 @@ typedef enum {
GRPC_CHTTP2_LIST_WRITABLE, GRPC_CHTTP2_LIST_WRITABLE,
GRPC_CHTTP2_LIST_WRITING, GRPC_CHTTP2_LIST_WRITING,
GRPC_CHTTP2_LIST_WRITTEN, GRPC_CHTTP2_LIST_WRITTEN,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING, GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_WRITING,
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
/* streams waiting for the outgoing window in the writing path, they will be /* streams waiting for the outgoing window in the writing path, they will be
@ -218,10 +217,6 @@ struct grpc_chttp2_transport_global {
/** next payload for an outgoing ping */ /** next payload for an outgoing ping */
uint64_t ping_counter; uint64_t ping_counter;
/** concurrent stream count: updated when not parsing,
so this is a strict over-estimation on the client */
uint32_t concurrent_stream_count;
/** parser for headers */ /** parser for headers */
grpc_chttp2_hpack_parser hpack_parser; grpc_chttp2_hpack_parser hpack_parser;
/** simple one shot parsers */ /** simple one shot parsers */
@ -308,10 +303,6 @@ struct grpc_chttp2_transport {
struct { struct {
grpc_combiner *combiner; grpc_combiner *combiner;
/** is a thread currently in the global lock */
bool global_active;
/** is a thread currently parsing */
bool parsing_active;
/** write execution state of the transport */ /** write execution state of the transport */
grpc_chttp2_write_state write_state; grpc_chttp2_write_state write_state;
/** has a check_read_ops been scheduled */ /** has a check_read_ops been scheduled */
@ -336,14 +327,8 @@ struct grpc_chttp2_transport {
chain. */ chain. */
grpc_chttp2_transport_writing writing; grpc_chttp2_transport_writing writing;
/** maps stream id to grpc_chttp2_stream objects; /** maps stream id to grpc_chttp2_stream objects */
owned by the parsing thread when parsing */ grpc_chttp2_stream_map stream_map;
grpc_chttp2_stream_map parsing_stream_map;
/** streams created by the client (possibly during parsing);
merged with parsing_stream_map during unlock when no
parsing is occurring */
grpc_chttp2_stream_map new_stream_map;
/** closure to execute writing */ /** closure to execute writing */
grpc_closure writing_action; grpc_closure writing_action;
@ -374,9 +359,6 @@ struct grpc_chttp2_transport {
/** connectivity tracking */ /** connectivity tracking */
grpc_connectivity_state_tracker state_tracker; grpc_connectivity_state_tracker state_tracker;
} channel_callback; } channel_callback;
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
}; };
struct grpc_chttp2_stream_global { struct grpc_chttp2_stream_global {
@ -602,13 +584,6 @@ void grpc_chttp2_list_remove_stalled_by_transport(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);
void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global);
int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
void grpc_chttp2_list_add_closed_waiting_for_writing( void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global); grpc_chttp2_stream_global *stream_global);

@ -334,26 +334,6 @@ void grpc_chttp2_list_remove_stalled_by_transport(
GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
} }
void grpc_chttp2_list_add_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) {
stream_list_add(TRANSPORT_FROM_GLOBAL(transport_global),
STREAM_FROM_GLOBAL(stream_global),
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
}
int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
void grpc_chttp2_list_add_closed_waiting_for_writing( void grpc_chttp2_list_add_closed_waiting_for_writing(
grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global) { grpc_chttp2_stream_global *stream_global) {

@ -265,7 +265,7 @@ static grpc_error *copy_error_and_unref(grpc_error *in) {
} else { } else {
out = gpr_malloc(sizeof(*out)); out = gpr_malloc(sizeof(*out));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG #ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying", out); gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif #endif
out->ints = gpr_avl_ref(in->ints); out->ints = gpr_avl_ref(in->ints);
out->strs = gpr_avl_ref(in->strs); out->strs = gpr_avl_ref(in->strs);

@ -230,33 +230,33 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error); grpc_error *error);
grpc_call *grpc_call_create( grpc_error *grpc_call_create(const grpc_call_create_args *args,
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_call **out_call) {
grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
const void *server_transport_data, grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count, gpr_timespec send_deadline) {
size_t i, j; size_t i, j;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel); grpc_channel_stack *channel_stack =
grpc_channel_get_channel_stack(args->channel);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_call *call; grpc_call *call;
GPR_TIMER_BEGIN("grpc_call_create", 0); GPR_TIMER_BEGIN("grpc_call_create", 0);
call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); *out_call = call =
gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
memset(call, 0, sizeof(grpc_call)); memset(call, 0, sizeof(grpc_call));
gpr_mu_init(&call->mu); gpr_mu_init(&call->mu);
call->channel = channel; call->channel = args->channel;
call->cq = cq; call->cq = args->cq;
call->parent = parent_call; call->parent = args->parent_call;
/* Always support no compression */ /* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = server_transport_data == NULL; call->is_client = args->server_transport_data == NULL;
if (call->is_client) { if (call->is_client) {
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); GPR_ASSERT(args->add_initial_metadata_count <
for (i = 0; i < add_initial_metadata_count; i++) { MAX_SEND_EXTRA_METADATA_COUNT);
call->send_extra_metadata[i].md = add_initial_metadata[i]; for (i = 0; i < args->add_initial_metadata_count; i++) {
call->send_extra_metadata[i].md = args->add_initial_metadata[i];
} }
call->send_extra_metadata_count = (int)add_initial_metadata_count; call->send_extra_metadata_count = (int)args->add_initial_metadata_count;
} else { } else {
GPR_ASSERT(add_initial_metadata_count == 0); GPR_ASSERT(args->add_initial_metadata_count == 0);
call->send_extra_metadata_count = 0; call->send_extra_metadata_count = 0;
} }
for (i = 0; i < 2; i++) { for (i = 0; i < 2; i++) {
@ -265,78 +265,79 @@ grpc_call *grpc_call_create(
} }
} }
call->send_deadline = call->send_deadline =
gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
GRPC_CHANNEL_INTERNAL_REF(channel, "call"); GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */ /* initial refcount dropped by grpc_call_destroy */
grpc_error *error = grpc_call_stack_init( grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, destroy_call, call, call->context, &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
server_transport_data, CALL_STACK_FROM_CALL(call)); args->server_transport_data, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
intptr_t status; intptr_t status;
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
status = GRPC_STATUS_UNKNOWN; status = GRPC_STATUS_UNKNOWN;
}
const char *error_str = const char *error_str =
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION); grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
close_with_status(&exec_ctx, call, (grpc_status_code)status, close_with_status(&exec_ctx, call, (grpc_status_code)status,
error_str == NULL ? "unknown error" : error_str); error_str == NULL ? "unknown error" : error_str);
GRPC_ERROR_UNREF(error);
} }
if (cq != NULL) { if (args->cq != NULL) {
GPR_ASSERT( GPR_ASSERT(
pollset_set_alternative == NULL && args->pollset_set_alternative == NULL &&
"Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
GRPC_CQ_INTERNAL_REF(cq, "bind"); GRPC_CQ_INTERNAL_REF(args->cq, "bind");
call->pollent = call->pollent =
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
} }
if (pollset_set_alternative != NULL) { if (args->pollset_set_alternative != NULL) {
call->pollent = call->pollent = grpc_polling_entity_create_from_pollset_set(
grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); args->pollset_set_alternative);
} }
if (!grpc_polling_entity_is_empty(&call->pollent)) { if (!grpc_polling_entity_is_empty(&call->pollent)) {
grpc_call_stack_set_pollset_or_pollset_set( grpc_call_stack_set_pollset_or_pollset_set(
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
} }
if (parent_call != NULL) { gpr_timespec send_deadline = args->send_deadline;
GRPC_CALL_INTERNAL_REF(parent_call, "child"); if (args->parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client); GPR_ASSERT(call->is_client);
GPR_ASSERT(!parent_call->is_client); GPR_ASSERT(!args->parent_call->is_client);
gpr_mu_lock(&parent_call->mu); gpr_mu_lock(&args->parent_call->mu);
if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
send_deadline = gpr_time_min( send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline, gpr_convert_clock_type(send_deadline,
parent_call->send_deadline.clock_type), args->parent_call->send_deadline.clock_type),
parent_call->send_deadline); args->parent_call->send_deadline);
} }
/* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
* GRPC_PROPAGATE_STATS_CONTEXT */ * GRPC_PROPAGATE_STATS_CONTEXT */
/* TODO(ctiller): This should change to use the appropriate census start_op /* TODO(ctiller): This should change to use the appropriate census start_op
* call. */ * call. */
if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
grpc_call_context_set(call, GRPC_CONTEXT_TRACING, grpc_call_context_set(
parent_call->context[GRPC_CONTEXT_TRACING].value, call, GRPC_CONTEXT_TRACING,
NULL); args->parent_call->context[GRPC_CONTEXT_TRACING].value, NULL);
} else { } else {
GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT); GPR_ASSERT(args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
} }
if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
call->cancellation_is_inherited = 1; call->cancellation_is_inherited = 1;
} }
if (parent_call->first_child == NULL) { if (args->parent_call->first_child == NULL) {
parent_call->first_child = call; args->parent_call->first_child = call;
call->sibling_next = call->sibling_prev = call; call->sibling_next = call->sibling_prev = call;
} else { } else {
call->sibling_next = parent_call->first_child; call->sibling_next = args->parent_call->first_child;
call->sibling_prev = parent_call->first_child->sibling_prev; call->sibling_prev = args->parent_call->first_child->sibling_prev;
call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
call; call;
} }
gpr_mu_unlock(&parent_call->mu); gpr_mu_unlock(&args->parent_call->mu);
} }
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) { 0) {
@ -344,7 +345,7 @@ grpc_call *grpc_call_create(
} }
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0); GPR_TIMER_END("grpc_call_create", 0);
return call; return error;
} }
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,

@ -49,15 +49,29 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success, grpc_call *call, int success,
void *user_data); void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, typedef struct grpc_call_create_args {
uint32_t propagation_mask, grpc_channel *channel;
grpc_completion_queue *cq,
/* if not NULL, it'll be used in lieu of \a cq */ grpc_call *parent_call;
grpc_pollset_set *pollset_set_alternative, uint32_t propagation_mask;
const void *server_transport_data,
grpc_mdelem **add_initial_metadata, grpc_completion_queue *cq;
size_t add_initial_metadata_count, /* if not NULL, it'll be used in lieu of cq */
gpr_timespec send_deadline); grpc_pollset_set *pollset_set_alternative;
const void *server_transport_data;
grpc_mdelem **add_initial_metadata;
size_t add_initial_metadata_count;
gpr_timespec send_deadline;
} grpc_call_create_args;
/* Create a new call based on \a args.
Regardless of success or failure, always returns a valid new call into *call
*/
grpc_error *grpc_call_create(const grpc_call_create_args *args,
grpc_call **call);
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call, void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq); grpc_completion_queue *cq);

@ -208,9 +208,21 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority); send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority);
} }
return grpc_call_create(channel, parent_call, propagation_mask, cq, grpc_call_create_args args;
pollset_set_alternative, NULL, send_metadata, memset(&args, 0, sizeof(args));
num_metadata, deadline); args.channel = channel;
args.parent_call = parent_call;
args.propagation_mask = propagation_mask;
args.cq = cq;
args.pollset_set_alternative = pollset_set_alternative;
args.server_transport_data = NULL;
args.add_initial_metadata = send_metadata;
args.add_initial_metadata_count = num_metadata;
args.send_deadline = deadline;
grpc_call *call;
GRPC_LOG_IF_ERROR("call_create", grpc_call_create(&args, &call));
return call;
} }
grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *grpc_channel_create_call(grpc_channel *channel,

@ -824,11 +824,20 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
const void *transport_server_data) { const void *transport_server_data) {
channel_data *chand = cd; channel_data *chand = cd;
/* create a call */ /* create a call */
grpc_call *call = grpc_call_create(chand->channel, NULL, 0, NULL, NULL, grpc_call_create_args args;
transport_server_data, NULL, 0, memset(&args, 0, sizeof(args));
gpr_inf_future(GPR_CLOCK_MONOTONIC)); args.channel = chand->channel;
args.server_transport_data = transport_server_data;
args.send_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_call *call;
grpc_error *error = grpc_call_create(&args, &call);
grpc_call_element *elem = grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0); grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
if (error != GRPC_ERROR_NONE) {
got_initial_metadata(exec_ctx, elem, error);
GRPC_ERROR_UNREF(error);
return;
}
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
grpc_op op; grpc_op op;
memset(&op, 0, sizeof(op)); memset(&op, 0, sizeof(op));

@ -46,8 +46,9 @@
#ifdef GRPC_STREAM_REFCOUNT_DEBUG #ifdef GRPC_STREAM_REFCOUNT_DEBUG
void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) { void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p REF %d->%d %s", refcount->object_type, gpr_log(GPR_DEBUG, "%s %p:%p REF %" PRIdPTR "->%" PRIdPTR " %s",
refcount, refcount->destroy.cb_arg, val, val + 1, reason); refcount->object_type, refcount, refcount->destroy.cb_arg, val,
val + 1, reason);
#else #else
void grpc_stream_ref(grpc_stream_refcount *refcount) { void grpc_stream_ref(grpc_stream_refcount *refcount) {
#endif #endif
@ -58,8 +59,9 @@ void grpc_stream_ref(grpc_stream_refcount *refcount) {
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount, void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
const char *reason) { const char *reason) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count); gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
gpr_log(GPR_DEBUG, "%s %p:%p UNREF %d->%d %s", refcount->object_type, gpr_log(GPR_DEBUG, "%s %p:%p UNREF %" PRIdPTR "->%" PRIdPTR " %s",
refcount, refcount->destroy.cb_arg, val, val - 1, reason); refcount->object_type, refcount, refcount->destroy.cb_arg, val,
val - 1, reason);
#else #else
void grpc_stream_unref(grpc_exec_ctx *exec_ctx, void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) { grpc_stream_refcount *refcount) {

@ -55,7 +55,7 @@ typedef struct grpc_transport grpc_transport;
for a stream. */ for a stream. */
typedef struct grpc_stream grpc_stream; typedef struct grpc_stream grpc_stream;
//#define GRPC_STREAM_REFCOUNT_DEBUG #define GRPC_STREAM_REFCOUNT_DEBUG
typedef struct grpc_stream_refcount { typedef struct grpc_stream_refcount {
gpr_refcount refs; gpr_refcount refs;

Loading…
Cancel
Save