Core-supported context inheritance sketch

pull/2800/head
Craig Tiller 9 years ago
parent 11af974715
commit 3e7c6a701c
  1. 16
      include/grpc/grpc.h
  2. 36
      src/core/surface/call.c
  3. 4
      src/core/surface/call.h
  4. 21
      src/core/surface/channel.c
  5. 4
      src/core/surface/server.c
  6. 20
      src/cpp/client/channel.cc

@ -349,6 +349,15 @@ typedef struct grpc_op {
} data;
} grpc_op;
#define GRPC_INHERIT_DEADLINE 1
#define GRPC_INHERIT_CENSUS_CONTEXT 2
/* TODO(ctiller):
#define GRPC_INHERIT_CANCELLATION 4
*/
#define GRPC_INHERIT_DEFAULTS \
(GRPC_INHERIT_DEADLINE | GRPC_INHERIT_CENSUS_CONTEXT)
/** Initialize the grpc library.
It is not safe to call any other grpc functions before calling this.
@ -427,6 +436,8 @@ void grpc_channel_watch_connectivity_state(
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function. */
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
gpr_uint32 inheritance_mask,
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline);
@ -437,8 +448,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
/** Create a call given a handle returned from grpc_channel_register_call */
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_completion_queue *completion_queue,
void *registered_call_handle, gpr_timespec deadline);
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline);
/** Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.

@ -143,6 +143,7 @@ typedef enum {
struct grpc_call {
grpc_completion_queue *cq;
grpc_channel *channel;
grpc_call *parent;
grpc_mdctx *metadata_context;
/* TODO(ctiller): share with cq if possible? */
gpr_mu mu;
@ -290,7 +291,9 @@ static void finished_loose_op(void *call, int success);
static void lock(grpc_call *call);
static void unlock(grpc_call *call);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 inheritance_mask,
grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
@ -309,7 +312,28 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (cq) {
GRPC_CQ_INTERNAL_REF(cq, "bind");
}
call->parent = parent_call;
call->is_client = server_transport_data == NULL;
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
GPR_ASSERT(call->is_client);
GPR_ASSERT(!parent_call->is_client);
if (inheritance_mask & GRPC_INHERIT_DEADLINE) {
send_deadline = gpr_time_min(
gpr_convert_clock_type(send_deadline,
parent_call->send_deadline.clock_type),
parent_call->send_deadline);
}
if (inheritance_mask & GRPC_INHERIT_CENSUS_CONTEXT) {
grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
parent_call->context[GRPC_CONTEXT_TRACING].value,
NULL);
}
/* cancellation is done last */
} else {
GPR_ASSERT(inheritance_mask == 0);
}
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
call->request_set[i] = REQSET_EMPTY;
}
@ -404,6 +428,7 @@ void grpc_call_internal_ref(grpc_call *c) {
static void destroy_call(void *call, int ignored_success) {
size_t i;
grpc_call *c = call;
grpc_call *parent = c->parent;
grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
GRPC_CHANNEL_INTERNAL_UNREF(c->channel, "call");
gpr_mu_destroy(&c->mu);
@ -436,6 +461,9 @@ static void destroy_call(void *call, int ignored_success) {
GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
}
gpr_free(c);
if (parent) {
GRPC_CALL_INTERNAL_UNREF(parent, "child", 1);
}
}
#ifdef GRPC_CALL_REF_COUNT_DEBUG
@ -1283,9 +1311,9 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
grpc_alarm_init(&call->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
call_alarm, call, gpr_now(GPR_CLOCK_MONOTONIC));
call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
grpc_alarm_init(&call->alarm, call->send_deadline, call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata

@ -85,7 +85,9 @@ typedef struct {
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
gpr_uint32 inheritance_mask,
grpc_completion_queue *cq,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,

@ -146,7 +146,8 @@ char *grpc_channel_get_target(grpc_channel *channel) {
}
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask,
grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
@ -155,16 +156,19 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[0] = path_mdelem;
send_metadata[1] = authority_mdelem;
return grpc_call_create(channel, cq, NULL, send_metadata,
GPR_ARRAY_SIZE(send_metadata), deadline);
return grpc_call_create(channel, parent_call, inheritance_mask, cq, NULL,
send_metadata, GPR_ARRAY_SIZE(send_metadata),
deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
gpr_uint32 inheritance_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call_internal(
channel, cq,
channel, parent_call, inheritance_mask, cq,
grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0)),
@ -191,12 +195,13 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
}
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_completion_queue *completion_queue,
void *registered_call_handle, gpr_timespec deadline) {
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 inheritance_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
gpr_timespec deadline) {
registered_call *rc = registered_call_handle;
return grpc_channel_create_call_internal(
channel, completion_queue, GRPC_MDELEM_REF(rc->path),
GRPC_MDELEM_REF(rc->authority), deadline);
channel, parent_call, inheritance_mask, completion_queue,
GRPC_MDELEM_REF(rc->path), GRPC_MDELEM_REF(rc->authority), deadline);
}
#ifdef GRPC_CHANNEL_REF_COUNT_DEBUG

@ -621,8 +621,8 @@ static void accept_stream(void *cd, grpc_transport *transport,
const void *transport_server_data) {
channel_data *chand = cd;
/* create a call */
grpc_call_create(chand->channel, NULL, transport_server_data, NULL, 0,
gpr_inf_future(GPR_CLOCK_REALTIME));
grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data, NULL,
0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
static void channel_connectivity_changed(void *cd, int iomgr_status_ignored) {

@ -58,16 +58,16 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
auto c_call =
method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(c_channel_, cq->cq(),
method.channel_tag(),
context->raw_deadline())
: grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
context->authority().empty()
? target_.c_str()
: context->authority().c_str(),
context->raw_deadline());
auto c_call = method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(
c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(),
method.channel_tag(), context->raw_deadline())
: grpc_channel_create_call(
c_channel_, NULL, GRPC_INHERIT_DEFAULTS, cq->cq(),
method.name(), context->authority().empty()
? target_.c_str()
: context->authority().c_str(),
context->raw_deadline());
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());

Loading…
Cancel
Save