From 3e7c6a701c8aea58cacb023ffa8b3c75e7e67390 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 31 Jul 2015 16:17:04 -0700 Subject: [PATCH] Core-supported context inheritance sketch --- include/grpc/grpc.h | 16 ++++++++++++++-- src/core/surface/call.c | 36 ++++++++++++++++++++++++++++++++---- src/core/surface/call.h | 4 +++- src/core/surface/channel.c | 21 +++++++++++++-------- src/core/surface/server.c | 4 ++-- src/cpp/client/channel.cc | 20 ++++++++++---------- 6 files changed, 74 insertions(+), 27 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index b33d2cd68cb..a34751f891e 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -349,6 +349,15 @@ typedef struct grpc_op { } data; } grpc_op; +#define GRPC_INHERIT_DEADLINE 1 +#define GRPC_INHERIT_CENSUS_CONTEXT 2 +/* TODO(ctiller): +#define GRPC_INHERIT_CANCELLATION 4 +*/ + +#define GRPC_INHERIT_DEFAULTS \ + (GRPC_INHERIT_DEADLINE | GRPC_INHERIT_CENSUS_CONTEXT) + /** Initialize the grpc library. It is not safe to call any other grpc functions before calling this. @@ -427,6 +436,8 @@ void grpc_channel_watch_connectivity_state( completions are sent to 'completion_queue'. 'method' and 'host' need only live through the invocation of this function. */ grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_call *parent_call, + gpr_uint32 inheritance_mask, grpc_completion_queue *completion_queue, const char *method, const char *host, gpr_timespec deadline); @@ -437,8 +448,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, /** Create a call given a handle returned from grpc_channel_register_call */ grpc_call *grpc_channel_create_registered_call( - grpc_channel *channel, grpc_completion_queue *completion_queue, - void *registered_call_handle, gpr_timespec deadline); + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask, + grpc_completion_queue *completion_queue, void *registered_call_handle, + gpr_timespec deadline); /** Start a batch of operations defined in the array ops; when complete, post a completion of type 'tag' to the completion queue bound to the call. diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 327a096ffbc..5154ea5d65d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -143,6 +143,7 @@ typedef enum { struct grpc_call { grpc_completion_queue *cq; grpc_channel *channel; + grpc_call *parent; grpc_mdctx *metadata_context; /* TODO(ctiller): share with cq if possible? */ gpr_mu mu; @@ -290,7 +291,9 @@ static void finished_loose_op(void *call, int success); static void lock(grpc_call *call); static void unlock(grpc_call *call); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 inheritance_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, @@ -309,7 +312,28 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, if (cq) { GRPC_CQ_INTERNAL_REF(cq, "bind"); } + call->parent = parent_call; call->is_client = server_transport_data == NULL; + if (parent_call != NULL) { + GRPC_CALL_INTERNAL_REF(parent_call, "child"); + GPR_ASSERT(call->is_client); + GPR_ASSERT(!parent_call->is_client); + + if (inheritance_mask & GRPC_INHERIT_DEADLINE) { + send_deadline = gpr_time_min( + gpr_convert_clock_type(send_deadline, + parent_call->send_deadline.clock_type), + parent_call->send_deadline); + } + if (inheritance_mask & GRPC_INHERIT_CENSUS_CONTEXT) { + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, + parent_call->context[GRPC_CONTEXT_TRACING].value, + NULL); + } + /* cancellation is done last */ + } else { + GPR_ASSERT(inheritance_mask == 0); + } for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { call->request_set[i] = REQSET_EMPTY; } @@ -404,6 +428,7 @@ void grpc_call_internal_ref(grpc_call *c) { static void destroy_call(void *call, int ignored_success) { size_t i; grpc_call *c = call; + grpc_call *parent = c->parent; grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c)); GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call"); gpr_mu_destroy(&c->mu); @@ -436,6 +461,9 @@ static void destroy_call(void *call, int ignored_success) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } gpr_free(c); + if (parent) { + GRPC_CALL_INTERNAL_UNREF(parent, "child", 1); + } } #ifdef GRPC_CALL_REF_COUNT_DEBUG @@ -1283,9 +1311,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) { } GRPC_CALL_INTERNAL_REF(call, "alarm"); call->have_alarm = 1; - grpc_alarm_init(&call->alarm, - gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC)); + call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); + grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call, + gpr_now(GPR_CLOCK_MONOTONIC)); } /* we offset status by a small amount when storing it into transport metadata diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 265638d5196..f2cc8fa352d 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -85,7 +85,9 @@ typedef struct { typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, void *user_data); -grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, +grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call, + gpr_uint32 inheritance_mask, + grpc_completion_queue *cq, const void *server_transport_data, grpc_mdelem **add_initial_metadata, size_t add_initial_metadata_count, diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 583d3501287..44b6e226c4a 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -146,7 +146,8 @@ char *grpc_channel_get_target(grpc_channel *channel) { } static grpc_call *grpc_channel_create_call_internal( - grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem, + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask, + grpc_completion_queue *cq, grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem, gpr_timespec deadline) { grpc_mdelem *send_metadata[2]; @@ -155,16 +156,19 @@ static grpc_call *grpc_channel_create_call_internal( send_metadata[0] = path_mdelem; send_metadata[1] = authority_mdelem; - return grpc_call_create(channel, cq, NULL, send_metadata, - GPR_ARRAY_SIZE(send_metadata), deadline); + return grpc_call_create(channel, parent_call, inheritance_mask, cq, NULL, + send_metadata, GPR_ARRAY_SIZE(send_metadata), + deadline); } grpc_call *grpc_channel_create_call(grpc_channel *channel, + grpc_call *parent_call, + gpr_uint32 inheritance_mask, grpc_completion_queue *cq, const char *method, const char *host, gpr_timespec deadline) { return grpc_channel_create_call_internal( - channel, cq, + channel, parent_call, inheritance_mask, cq, grpc_mdelem_from_metadata_strings( channel->metadata_context, GRPC_MDSTR_REF(channel->path_string), grpc_mdstr_from_string(channel->metadata_context, method, 0)), @@ -191,12 +195,13 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method, } grpc_call *grpc_channel_create_registered_call( - grpc_channel *channel, grpc_completion_queue *completion_queue, - void *registered_call_handle, gpr_timespec deadline) { + grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask, + grpc_completion_queue *completion_queue, void *registered_call_handle, + gpr_timespec deadline) { registered_call *rc = registered_call_handle; return grpc_channel_create_call_internal( - channel, completion_queue, GRPC_MDELEM_REF(rc->path), - GRPC_MDELEM_REF(rc->authority), deadline); + channel, parent_call, inheritance_mask, completion_queue, + GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline); } #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 7031e639166..212d1bec270 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -621,8 +621,8 @@ static void accept_stream(void *cd, grpc_transport *transport, const void *transport_server_data) { channel_data *chand = cd; /* create a call */ - grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0, - gpr_inf_future(GPR_CLOCK_REALTIME)); + grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL, + 0, gpr_inf_future(GPR_CLOCK_MONOTONIC)); } static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) { diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 5df81e641ef..c2f9db20aae 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -58,16 +58,16 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); } Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, CompletionQueue* cq) { - auto c_call = - method.channel_tag() && context->authority().empty() - ? grpc_channel_create_registered_call(c_channel_, cq->cq(), - method.channel_tag(), - context->raw_deadline()) - : grpc_channel_create_call(c_channel_, cq->cq(), method.name(), - context->authority().empty() - ? target_.c_str() - : context->authority().c_str(), - context->raw_deadline()); + auto c_call = method.channel_tag() && context->authority().empty() + ? grpc_channel_create_registered_call( + c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), + method.channel_tag(), context->raw_deadline()) + : grpc_channel_create_call( + c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(), + method.name(), context->authority().empty() + ? target_.c_str() + : context->authority().c_str(), + context->raw_deadline()); grpc_census_call_set_context(c_call, context->census_context()); GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); context->set_call(c_call, shared_from_this());