diff --git a/include/grpc++/client_context.h b/include/grpc++/client_context.h index 5e108752604..09aa10508d5 100644 --- a/include/grpc++/client_context.h +++ b/include/grpc++/client_context.h @@ -38,6 +38,7 @@ #include #include +#include #include #include #include @@ -107,6 +108,10 @@ class ClientContext { creds_ = creds; } + // Get and set census context + void set_census_context(census_context* ccp) { census_context_ = ccp; } + census_context* get_census_context() const { return census_context_; } + void TryCancel(); private: @@ -154,6 +159,7 @@ class ClientContext { gpr_timespec deadline_; grpc::string authority_; std::shared_ptr creds_; + census_context* census_context_; std::multimap send_initial_metadata_; std::multimap recv_initial_metadata_; std::multimap trailing_metadata_; diff --git a/include/grpc/census.h b/include/grpc/census.h index b2049b3289b..3fc07affc84 100644 --- a/include/grpc/census.h +++ b/include/grpc/census.h @@ -61,6 +61,10 @@ enum census_functions { int census_initialize(int functions); void census_shutdown(); +/* If any census feature has been initialized, this funtion will return a + * non-zero value. */ +int census_available(); + /* Internally, Census relies on a context, which should be propagated across * RPC's. From the RPC subsystems viewpoint, this is an opaque data structure. * A context must be used as the first argument to all other census diff --git a/src/core/census/grpc_context.c b/src/core/census/grpc_context.c index cf2353199f3..ffdab825705 100644 --- a/src/core/census/grpc_context.c +++ b/src/core/census/grpc_context.c @@ -34,12 +34,30 @@ #include #include "src/core/census/grpc_context.h" -void *grpc_census_context_create() { - census_context *context; - census_context_deserialize(NULL, &context); - return (void *)context; -} +void *grpc_census_context_create() { return NULL; } void grpc_census_context_destroy(void *context) { census_context_destroy((census_context *)context); } + +void grpc_census_call_set_context(grpc_call *call, census_context *context) { + if (!census_available()) { + return; + } + if (context == NULL) { + if (grpc_call_is_client(call)) { + census_context *context_ptr; + census_context_deserialize(NULL, &context_ptr); + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context_ptr, + grpc_census_context_destroy); + } else { + /* TODO(aveitch): server side context code to be implemented. */ + } + } else { + grpc_call_context_set(call, GRPC_CONTEXT_TRACING, context, NULL); + } +} + +census_context *grpc_census_call_get_context(grpc_call *call) { + return (census_context *)grpc_call_context_get(call, GRPC_CONTEXT_TRACING); +} diff --git a/src/core/census/grpc_context.h b/src/core/census/grpc_context.h index f610f6ce21d..01f35c32136 100644 --- a/src/core/census/grpc_context.h +++ b/src/core/census/grpc_context.h @@ -36,7 +36,25 @@ #ifndef CENSUS_GRPC_CONTEXT_H #define CENSUS_GRPC_CONTEXT_H +#include +#include "src/core/surface/call.h" + +#ifdef __cplusplus +extern "C" { +#endif + void *grpc_census_context_create(); void grpc_census_context_destroy(void *context); +/* Set census context for the call; Must be called before first call to + grpc_call_start_batch(). */ +void grpc_census_call_set_context(grpc_call *call, census_context *context); + +/* Retrieve the calls current census context. */ +census_context *grpc_census_call_get_context(grpc_call *call); + +#ifdef __cplusplus +} +#endif + #endif /* CENSUS_GRPC_CONTEXT_H */ diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c index 057ac78ee72..80165206417 100644 --- a/src/core/census/initialize.c +++ b/src/core/census/initialize.c @@ -48,3 +48,5 @@ int census_initialize(int functions) { } void census_shutdown() { census_fns_enabled = CENSUS_NONE; } + +int census_available() { return (census_fns_enabled != CENSUS_NONE); } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 02e0e59cadf..ef430dd9df6 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -296,8 +296,6 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, if (call->is_client) { call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE; call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE; - call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create(); - call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy; } GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT); for (i = 0; i < add_initial_metadata_count; i++) { @@ -462,8 +460,7 @@ static int need_more_data(grpc_call *call) { (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || (call->write_state == WRITE_STATE_INITIAL && !call->is_client) || - (call->cancel_with_status != GRPC_STATUS_OK) || - call->destroy_called; + (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called; } static void unlock(grpc_call *call) { @@ -1151,7 +1148,8 @@ static void execute_op(grpc_call *call, grpc_transport_op *op) { } else { finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args)); args->call = call; - grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, args); + grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated, + args); op->on_consumed = &args->closure; } } @@ -1223,13 +1221,13 @@ static gpr_uint32 decode_compression(grpc_mdelem *md) { } else { gpr_uint32 parsed_clevel_bytes; if (gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value), - GPR_SLICE_LENGTH(md->value->slice), - &parsed_clevel_bytes)) { + GPR_SLICE_LENGTH(md->value->slice), + &parsed_clevel_bytes)) { /* the following cast is safe, as a gpr_uint32 should be able to hold all * possible values of the grpc_compression_level enum */ - clevel = (grpc_compression_level) parsed_clevel_bytes; + clevel = (grpc_compression_level)parsed_clevel_bytes; } else { - clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ + clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */ } grpc_mdelem_set_user_data(md, destroy_compression, (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET)); @@ -1252,7 +1250,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); } else if (key == grpc_channel_get_message_string(call->channel)) { set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); - } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) { + } else if (key == + grpc_channel_get_compresssion_level_string(call->channel)) { set_decode_compression_level(call, decode_compression(md)); } else { dest = &call->buffered_metadata[is_trailing]; diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 72593f877e1..5bc6f6fd913 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -39,6 +39,7 @@ #include #include +#include "src/core/census/grpc_context.h" #include "src/core/profiling/timers.h" #include #include @@ -68,6 +69,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, ? target_.c_str() : context->authority().c_str(), context->raw_deadline()); + grpc_census_call_set_context(c_call, context->get_census_context()); GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); context->set_call(c_call, shared_from_this()); return Call(c_call, this, cq);