pull/2151/head
Siddharth Rakesh 10 years ago
commit c78d70e3bc
  1. 18
      include/grpc/grpc.h
  2. 2
      src/core/json/json.h
  3. 32
      src/core/surface/call.c
  4. 9
      src/core/surface/channel.c
  5. 1
      src/core/surface/channel.h
  6. 102
      src/core/surface/server.c
  7. 130
      src/core/tsi/ssl_transport_security.c
  8. 20
      src/cpp/server/server.cc
  9. 2
      src/ruby/.rubocop_todo.yml
  10. 3
      src/ruby/bin/interop/interop_client.rb
  11. 21
      src/ruby/lib/grpc/generic/active_call.rb
  12. 14
      src/ruby/lib/grpc/generic/bidi_call.rb
  13. 2
      src/ruby/lib/grpc/logconfig.rb
  14. 5
      test/core/json/json_rewrite_test.c

@ -449,7 +449,9 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *call,
/* Destroy a call. */ /* Destroy a call. */
void grpc_call_destroy(grpc_call *call); void grpc_call_destroy(grpc_call *call);
/* Request notification of a new call */ /* Request notification of a new call. 'cq_for_notification' must
have been registered to the server via grpc_server_register_completion_queue.
*/
grpc_call_error grpc_server_request_call( grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata, grpc_metadata_array *request_metadata,
@ -466,7 +468,9 @@ grpc_call_error grpc_server_request_call(
void *grpc_server_register_method(grpc_server *server, const char *method, void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host); const char *host);
/* Request notification of a new pre-registered call */ /* Request notification of a new pre-registered call. 'cq_for_notification' must
have been registered to the server via grpc_server_register_completion_queue.
*/
grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call, grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata, gpr_timespec *deadline, grpc_metadata_array *request_metadata,
@ -480,9 +484,10 @@ grpc_call_error grpc_server_request_registered_call(
through the invocation of this function. */ through the invocation of this function. */
grpc_server *grpc_server_create(const grpc_channel_args *args); grpc_server *grpc_server_create(const grpc_channel_args *args);
/* Register a completion queue with the server. Must be done for any completion /* Register a completion queue with the server. Must be done for any
queue that is passed to grpc_server_request_* call. Must be performed prior notification completion queue that is passed to grpc_server_request_*_call
to grpc_server_start. */ and to grpc_server_shutdown_and_notify. Must be performed prior to
grpc_server_start. */
void grpc_server_register_completion_queue(grpc_server *server, void grpc_server_register_completion_queue(grpc_server *server,
grpc_completion_queue *cq); grpc_completion_queue *cq);
@ -499,7 +504,8 @@ void grpc_server_start(grpc_server *server);
Existing calls will be allowed to complete. Existing calls will be allowed to complete.
Send a GRPC_OP_COMPLETE event when there are no more calls being serviced. Send a GRPC_OP_COMPLETE event when there are no more calls being serviced.
Shutdown is idempotent, and all tags will be notified at once if multiple Shutdown is idempotent, and all tags will be notified at once if multiple
grpc_server_shutdown_and_notify calls are made. */ grpc_server_shutdown_and_notify calls are made. 'cq' must have been
registered to this server via grpc_server_register_completion_queue. */
void grpc_server_shutdown_and_notify(grpc_server *server, void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag); grpc_completion_queue *cq, void *tag);

@ -60,7 +60,7 @@ typedef struct grpc_json {
* strings in the tree. The input stream's UTF-8 isn't validated, * strings in the tree. The input stream's UTF-8 isn't validated,
* as in, what you input is what you get as an output. * as in, what you input is what you get as an output.
* *
* All the keys and values in the grpc_json_t objects will be strings * All the keys and values in the grpc_json objects will be strings
* pointing at your input buffer. * pointing at your input buffer.
* *
* Delete the allocated tree afterward using grpc_json_destroy(). * Delete the allocated tree afterward using grpc_json_destroy().

@ -214,6 +214,9 @@ struct grpc_call {
/* Received call statuses from various sources */ /* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT]; received_status status[STATUS_SOURCE_COUNT];
/** Compression level for the call */
grpc_compression_level compression_level;
/* Contexts for various subsystems (security, tracing, ...). */ /* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT]; grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@ -410,6 +413,11 @@ static void set_status_code(grpc_call *call, status_source source,
} }
} }
static void set_decode_compression_level(grpc_call *call,
grpc_compression_level clevel) {
call->compression_level = clevel;
}
static void set_status_details(grpc_call *call, status_source source, static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) { grpc_mdstr *status) {
if (call->status[source].details != NULL) { if (call->status[source].details != NULL) {
@ -1169,6 +1177,28 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status; return status;
} }
/* just as for status above, we need to offset: metadata userdata can't hold a
* zero (null), which in this case is used to signal no compression */
#define COMPRESS_OFFSET 1
static void destroy_compression(void *ignored) {}
static gpr_uint32 decode_compression(grpc_mdelem *md) {
grpc_compression_level clevel;
void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
if (user_data) {
clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
} else {
if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
GPR_SLICE_LENGTH(md->value->slice),
&clevel)) {
clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
}
grpc_mdelem_set_user_data(md, destroy_compression,
(void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
}
return clevel;
}
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) { static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l; grpc_linked_mdelem *l;
grpc_metadata_array *dest; grpc_metadata_array *dest;
@ -1184,6 +1214,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value)); set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
} else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
set_decode_compression_level(call, decode_compression(md));
} else { } else {
dest = &call->buffered_metadata[is_trailing]; dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {

@ -64,6 +64,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context; grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */ /** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string; grpc_mdstr *grpc_status_string;
grpc_mdstr *grpc_compression_level_string;
grpc_mdstr *grpc_message_string; grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string; grpc_mdstr *path_string;
grpc_mdstr *authority_string; grpc_mdstr *authority_string;
@ -98,6 +99,8 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1 + is_client); gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx; channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
channel->grpc_compression_level_string =
grpc_mdstr_from_string(mdctx, "grpc-compression-level");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message"); channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) { for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE]; char buf[GPR_LTOA_MIN_BUFSIZE];
@ -205,6 +208,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdelem_unref(channel->grpc_status_elem[i]); grpc_mdelem_unref(channel->grpc_status_elem[i]);
} }
grpc_mdstr_unref(channel->grpc_status_string); grpc_mdstr_unref(channel->grpc_status_string);
grpc_mdstr_unref(channel->grpc_compression_level_string);
grpc_mdstr_unref(channel->grpc_message_string); grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string); grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string); grpc_mdstr_unref(channel->authority_string);
@ -269,6 +273,11 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string; return channel->grpc_status_string;
} }
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
return channel->grpc_compression_level_string;
}
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) { grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) { if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]); return grpc_mdelem_ref(channel->grpc_status_elem[i]);

@ -53,6 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code); int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);

@ -141,7 +141,15 @@ struct grpc_server {
grpc_pollset **pollsets; grpc_pollset **pollsets;
size_t cq_count; size_t cq_count;
gpr_mu mu; /* The two following mutexes control access to server-state
mu_global controls access to non-call-related state (e.g., channel state)
mu_call controls access to call-related state (e.g., the call lists)
If they are ever required to be nested, you must lock mu_global
before mu_call. This is currently used in shutdown processing
(grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
gpr_mu mu_global; /* mutex for server and channel state */
gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods; registered_method *registered_methods;
requested_call_array requested_calls; requested_call_array requested_calls;
@ -200,6 +208,8 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc);
static void shutdown_channel(channel_data *chand, int send_goaway, static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect); int send_disconnect);
/* Before calling maybe_finish_shutdown, we must hold mu_global and not
hold mu_call */
static void maybe_finish_shutdown(grpc_server *server); static void maybe_finish_shutdown(grpc_server *server);
static int call_list_join(call_data **root, call_data *call, call_list list) { static int call_list_join(call_data **root, call_data *call, call_list list) {
@ -273,7 +283,8 @@ static void server_delete(grpc_server *server) {
registered_method *rm; registered_method *rm;
size_t i; size_t i;
grpc_channel_args_destroy(server->channel_args); grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu); gpr_mu_destroy(&server->mu_global);
gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters); gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls); requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) { while ((rm = server->registered_methods) != NULL) {
@ -335,11 +346,11 @@ static void finish_start_new_rpc_and_unlock(grpc_server *server,
if (array->count == 0) { if (array->count == 0) {
calld->state = PENDING; calld->state = PENDING;
call_list_join(pending_root, calld, PENDING_START); call_list_join(pending_root, calld, PENDING_START);
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
} else { } else {
rc = array->calls[--array->count]; rc = array->calls[--array->count];
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, &rc); begin_call(server, calld, &rc);
} }
} }
@ -352,7 +363,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash; gpr_uint32 hash;
channel_registered_method *rm; channel_registered_method *rm;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) { if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */ /* TODO(ctiller): unify these two searches */
/* check for an exact match with host */ /* check for an exact match with host */
@ -404,11 +415,16 @@ static void maybe_finish_shutdown(grpc_server *server) {
if (!server->shutdown || server->shutdown_published) { if (!server->shutdown || server->shutdown_published) {
return; return;
} }
gpr_mu_lock(&server->mu_call);
if (server->lists[ALL_CALLS] != NULL) { if (server->lists[ALL_CALLS] != NULL) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"Waiting for all calls to finish before destroying server"); "Waiting for all calls to finish before destroying server");
gpr_mu_unlock(&server->mu_call);
return; return;
} }
gpr_mu_unlock(&server->mu_call);
if (server->root_channel_data.next != &server->root_channel_data) { if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server"); "Waiting for all channels to close before destroying server");
@ -452,6 +468,7 @@ static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr; grpc_call_element *elem = ptr;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data; channel_data *chand = elem->channel_data;
int remove_res;
if (success && !calld->got_initial_metadata) { if (success && !calld->got_initial_metadata) {
size_t i; size_t i;
@ -476,16 +493,16 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED: case GRPC_STREAM_SEND_CLOSED:
break; break;
case GRPC_STREAM_RECV_CLOSED: case GRPC_STREAM_RECV_CLOSED:
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) { if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} }
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu_call);
break; break;
case GRPC_STREAM_CLOSED: case GRPC_STREAM_CLOSED:
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) { if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED; calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
@ -496,10 +513,13 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure); grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} }
if (call_list_remove(calld, ALL_CALLS)) { remove_res = call_list_remove(calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
if (remove_res) {
decrement_call_count(chand); decrement_call_count(chand);
} }
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu_global);
break; break;
} }
@ -542,10 +562,10 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_TRANSPORT_CLOSED: case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the /* if the transport is closed for a server channel, we destroy the
channel */ channel */
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_global);
server_ref(server); server_ref(server);
destroy_channel(chand); destroy_channel(chand);
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
server_unref(server); server_unref(server);
break; break;
case GRPC_TRANSPORT_GOAWAY: case GRPC_TRANSPORT_GOAWAY:
@ -612,10 +632,13 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future; calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem); calld->call = grpc_call_from_top_element(elem);
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu_call);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS); call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
gpr_mu_unlock(&chand->server->mu_call);
gpr_mu_lock(&chand->server->mu_global);
chand->num_calls++; chand->num_calls++;
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu_global);
server_ref(chand->server); server_ref(chand->server);
@ -628,14 +651,16 @@ static void destroy_call_elem(grpc_call_element *elem) {
int removed[CALL_LIST_COUNT]; int removed[CALL_LIST_COUNT];
size_t i; size_t i;
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) { for (i = 0; i < CALL_LIST_COUNT; i++) {
removed[i] = call_list_remove(elem->call_data, i); removed[i] = call_list_remove(elem->call_data, i);
} }
gpr_mu_unlock(&chand->server->mu_call);
if (removed[ALL_CALLS]) { if (removed[ALL_CALLS]) {
gpr_mu_lock(&chand->server->mu_global);
decrement_call_count(chand); decrement_call_count(chand);
gpr_mu_unlock(&chand->server->mu_global);
} }
gpr_mu_unlock(&chand->server->mu);
if (calld->host) { if (calld->host) {
grpc_mdstr_unref(calld->host); grpc_mdstr_unref(calld->host);
@ -678,12 +703,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_free(chand->registered_methods); gpr_free(chand->registered_methods);
} }
if (chand->server) { if (chand->server) {
gpr_mu_lock(&chand->server->mu); gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev; chand->next->prev = chand->prev;
chand->prev->next = chand->next; chand->prev->next = chand->next;
chand->next = chand->prev = chand; chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server); maybe_finish_shutdown(chand->server);
gpr_mu_unlock(&chand->server->mu); gpr_mu_unlock(&chand->server->mu_global);
grpc_mdstr_unref(chand->path_key); grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key); grpc_mdstr_unref(chand->authority_key);
server_unref(chand->server); server_unref(chand->server);
@ -730,7 +755,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server)); memset(server, 0, sizeof(grpc_server));
gpr_mu_init(&server->mu); gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
/* decremented by grpc_server_destroy */ /* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1); gpr_ref_init(&server->internal_refcount, 1);
@ -880,11 +906,11 @@ grpc_transport_setup_result grpc_server_setup_transport(
result = grpc_connected_channel_bind_transport( result = grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport); grpc_channel_get_channel_stack(channel), transport);
gpr_mu_lock(&s->mu); gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data; chand->next = &s->root_channel_data;
chand->prev = chand->next->prev; chand->prev = chand->next->prev;
chand->next->prev = chand->prev->next = chand; chand->next->prev = chand->prev->next = chand;
gpr_mu_unlock(&s->mu); gpr_mu_unlock(&s->mu_global);
gpr_free(filters); gpr_free(filters);
@ -901,7 +927,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
shutdown_tag *sdt; shutdown_tag *sdt;
/* lock, and gather up some stuff to do */ /* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq, NULL); grpc_cq_begin_op(cq, NULL);
server->shutdown_tags = server->shutdown_tags =
gpr_realloc(server->shutdown_tags, gpr_realloc(server->shutdown_tags,
@ -910,7 +936,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt->tag = tag; sdt->tag = tag;
sdt->cq = cq; sdt->cq = cq;
if (server->shutdown) { if (server->shutdown) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
return; return;
} }
@ -920,6 +946,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
} }
/* collect all unregistered then registered calls */ /* collect all unregistered then registered calls */
gpr_mu_lock(&server->mu_call);
requested_calls = server->requested_calls; requested_calls = server->requested_calls;
memset(&server->requested_calls, 0, sizeof(server->requested_calls)); memset(&server->requested_calls, 0, sizeof(server->requested_calls));
for (rm = server->registered_methods; rm; rm = rm->next) { for (rm = server->registered_methods; rm; rm = rm->next) {
@ -938,10 +965,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_free(rm->requested.calls); gpr_free(rm->requested.calls);
memset(&rm->requested, 0, sizeof(rm->requested)); memset(&rm->requested, 0, sizeof(rm->requested));
} }
gpr_mu_unlock(&server->mu_call);
server->shutdown = 1; server->shutdown = 1;
maybe_finish_shutdown(server); maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */ /* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) { for (i = 0; i < requested_calls.count; i++) {
@ -957,10 +985,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
void grpc_server_listener_destroy_done(void *s) { void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s; grpc_server *server = s;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++; server->listeners_destroyed++;
maybe_finish_shutdown(server); maybe_finish_shutdown(server);
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
} }
void grpc_server_cancel_all_calls(grpc_server *server) { void grpc_server_cancel_all_calls(grpc_server *server) {
@ -971,12 +999,12 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
int is_first = 1; int is_first = 1;
size_t i; size_t i;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_call);
GPR_ASSERT(server->shutdown); GPR_ASSERT(server->shutdown);
if (!server->lists[ALL_CALLS]) { if (!server->lists[ALL_CALLS]) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
return; return;
} }
@ -996,7 +1024,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
is_first = 0; is_first = 0;
} }
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
for (i = 0; i < call_count; i++) { for (i = 0; i < call_count; i++) {
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE, grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
@ -1010,7 +1038,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
void grpc_server_destroy(grpc_server *server) { void grpc_server_destroy(grpc_server *server) {
listener *l; listener *l;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_global);
GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->shutdown || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
@ -1020,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l); gpr_free(l);
} }
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
server_unref(server); server_unref(server);
} }
@ -1042,9 +1070,9 @@ static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) { requested_call *rc) {
call_data *calld = NULL; call_data *calld = NULL;
requested_call_array *requested_calls = NULL; requested_call_array *requested_calls = NULL;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_call);
if (server->shutdown) { if (server->shutdown) {
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
fail_call(server, rc); fail_call(server, rc);
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
@ -1063,12 +1091,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
if (calld) { if (calld) {
GPR_ASSERT(calld->state == PENDING); GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED; calld->state = ACTIVATED;
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, rc); begin_call(server, calld, rc);
return GRPC_CALL_OK; return GRPC_CALL_OK;
} else { } else {
*requested_call_array_add(requested_calls) = *rc; *requested_call_array_add(requested_calls) = *rc;
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK; return GRPC_CALL_OK;
} }
} }
@ -1212,8 +1240,8 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
int grpc_server_has_open_connections(grpc_server *server) { int grpc_server_has_open_connections(grpc_server *server) {
int r; int r;
gpr_mu_lock(&server->mu); gpr_mu_lock(&server->mu_global);
r = server->root_channel_data.next != &server->root_channel_data; r = server->root_channel_data.next != &server->root_channel_data;
gpr_mu_unlock(&server->mu); gpr_mu_unlock(&server->mu_global);
return r; return r;
} }

@ -54,8 +54,16 @@
#define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND 16384 #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND 16384
#define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_LOWER_BOUND 1024 #define TSI_SSL_MAX_PROTECTED_FRAME_SIZE_LOWER_BOUND 1024
/* Putting a macro like this and littering the source file with #if is really
bad practice.
TODO(jboeuf): refactor all the #if / #endif in a separate module. */
#ifndef TSI_OPENSSL_ALPN_SUPPORT
#define TSI_OPENSSL_ALPN_SUPPORT 1
#endif
/* TODO(jboeuf): I have not found a way to get this number dynamically from the /* TODO(jboeuf): I have not found a way to get this number dynamically from the
* SSL structure. This is what we would ultimately want though... */ SSL structure. This is what we would ultimately want though... */
#define TSI_SSL_MAX_PROTECTION_OVERHEAD 100 #define TSI_SSL_MAX_PROTECTION_OVERHEAD 100
/* --- Structure definitions. ---*/ /* --- Structure definitions. ---*/
@ -70,6 +78,8 @@ struct tsi_ssl_handshaker_factory {
typedef struct { typedef struct {
tsi_ssl_handshaker_factory base; tsi_ssl_handshaker_factory base;
SSL_CTX* ssl_context; SSL_CTX* ssl_context;
unsigned char* alpn_protocol_list;
size_t alpn_protocol_list_length;
} tsi_ssl_client_handshaker_factory; } tsi_ssl_client_handshaker_factory;
typedef struct { typedef struct {
@ -841,7 +851,7 @@ static tsi_result ssl_handshaker_process_bytes_from_peer(
static tsi_result ssl_handshaker_extract_peer(tsi_handshaker* self, static tsi_result ssl_handshaker_extract_peer(tsi_handshaker* self,
tsi_peer* peer) { tsi_peer* peer) {
tsi_result result = TSI_OK; tsi_result result = TSI_OK;
const unsigned char* alpn_selected; const unsigned char* alpn_selected = NULL;
unsigned int alpn_selected_len; unsigned int alpn_selected_len;
tsi_ssl_handshaker* impl = (tsi_ssl_handshaker*)self; tsi_ssl_handshaker* impl = (tsi_ssl_handshaker*)self;
X509* peer_cert = SSL_get_peer_certificate(impl->ssl); X509* peer_cert = SSL_get_peer_certificate(impl->ssl);
@ -850,7 +860,14 @@ static tsi_result ssl_handshaker_extract_peer(tsi_handshaker* self,
X509_free(peer_cert); X509_free(peer_cert);
if (result != TSI_OK) return result; if (result != TSI_OK) return result;
} }
#if TSI_OPENSSL_ALPN_SUPPORT
SSL_get0_alpn_selected(impl->ssl, &alpn_selected, &alpn_selected_len); SSL_get0_alpn_selected(impl->ssl, &alpn_selected, &alpn_selected_len);
#endif /* TSI_OPENSSL_ALPN_SUPPORT */
if (alpn_selected == NULL) {
/* Try npn. */
SSL_get0_next_proto_negotiated(impl->ssl, &alpn_selected,
&alpn_selected_len);
}
if (alpn_selected != NULL) { if (alpn_selected != NULL) {
size_t i; size_t i;
tsi_peer_property* new_properties = tsi_peer_property* new_properties =
@ -1012,6 +1029,32 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX* ctx, int is_client,
return TSI_OK; return TSI_OK;
} }
static int select_protocol_list(const unsigned char** out,
unsigned char* outlen,
const unsigned char* client_list,
unsigned int client_list_len,
const unsigned char* server_list,
unsigned int server_list_len) {
const unsigned char* client_current = client_list;
while ((unsigned int)(client_current - client_list) < client_list_len) {
unsigned char client_current_len = *(client_current++);
const unsigned char* server_current = server_list;
while ((server_current >= server_list) &&
(gpr_uintptr)(server_current - server_list) < server_list_len) {
unsigned char server_current_len = *(server_current++);
if ((client_current_len == server_current_len) &&
!memcmp(client_current, server_current, server_current_len)) {
*out = server_current;
*outlen = server_current_len;
return SSL_TLSEXT_ERR_OK;
}
server_current += server_current_len;
}
client_current += client_current_len;
}
return SSL_TLSEXT_ERR_NOACK;
}
/* --- tsi_ssl__client_handshaker_factory methods implementation. --- */ /* --- tsi_ssl__client_handshaker_factory methods implementation. --- */
static tsi_result ssl_client_handshaker_factory_create_handshaker( static tsi_result ssl_client_handshaker_factory_create_handshaker(
@ -1027,10 +1070,21 @@ static void ssl_client_handshaker_factory_destroy(
tsi_ssl_handshaker_factory* self) { tsi_ssl_handshaker_factory* self) {
tsi_ssl_client_handshaker_factory* impl = tsi_ssl_client_handshaker_factory* impl =
(tsi_ssl_client_handshaker_factory*)self; (tsi_ssl_client_handshaker_factory*)self;
SSL_CTX_free(impl->ssl_context); if (impl->ssl_context != NULL) SSL_CTX_free(impl->ssl_context);
if (impl->alpn_protocol_list != NULL) free(impl->alpn_protocol_list);
free(impl); free(impl);
} }
static int client_handshaker_factory_npn_callback(
SSL* ssl, unsigned char** out, unsigned char* outlen,
const unsigned char* in, unsigned int inlen, void* arg) {
tsi_ssl_client_handshaker_factory* factory =
(tsi_ssl_client_handshaker_factory*)arg;
return select_protocol_list((const unsigned char**)out, outlen,
factory->alpn_protocol_list,
factory->alpn_protocol_list_length, in, inlen);
}
/* --- tsi_ssl_server_handshaker_factory methods implementation. --- */ /* --- tsi_ssl_server_handshaker_factory methods implementation. --- */
static tsi_result ssl_server_handshaker_factory_create_handshaker( static tsi_result ssl_server_handshaker_factory_create_handshaker(
@ -1134,30 +1188,25 @@ static int ssl_server_handshaker_factory_servername_callback(SSL* ssl, int* ap,
return SSL_TLSEXT_ERR_ALERT_WARNING; return SSL_TLSEXT_ERR_ALERT_WARNING;
} }
#if TSI_OPENSSL_ALPN_SUPPORT
static int server_handshaker_factory_alpn_callback( static int server_handshaker_factory_alpn_callback(
SSL* ssl, const unsigned char** out, unsigned char* outlen, SSL* ssl, const unsigned char** out, unsigned char* outlen,
const unsigned char* in, unsigned int inlen, void* arg) { const unsigned char* in, unsigned int inlen, void* arg) {
tsi_ssl_server_handshaker_factory* factory = tsi_ssl_server_handshaker_factory* factory =
(tsi_ssl_server_handshaker_factory*)arg; (tsi_ssl_server_handshaker_factory*)arg;
const unsigned char* client_current = in; return select_protocol_list(out, outlen, in, inlen,
while ((unsigned int)(client_current - in) < inlen) { factory->alpn_protocol_list,
unsigned char client_current_len = *(client_current++); factory->alpn_protocol_list_length);
const unsigned char* server_current = factory->alpn_protocol_list; }
while ((server_current >= factory->alpn_protocol_list) && #endif /* TSI_OPENSSL_ALPN_SUPPORT */
(gpr_uintptr)(server_current - factory->alpn_protocol_list) <
factory->alpn_protocol_list_length) { static int server_handshaker_factory_npn_advertised_callback(
unsigned char server_current_len = *(server_current++); SSL* ssl, const unsigned char** out, unsigned int* outlen, void* arg) {
if ((client_current_len == server_current_len) && tsi_ssl_server_handshaker_factory* factory =
!memcmp(client_current, server_current, server_current_len)) { (tsi_ssl_server_handshaker_factory*)arg;
*out = server_current; *out = factory->alpn_protocol_list;
*outlen = server_current_len; *outlen = factory->alpn_protocol_list_length;
return SSL_TLSEXT_ERR_OK; return SSL_TLSEXT_ERR_OK;
}
server_current += server_current_len;
}
client_current += client_current_len;
}
return SSL_TLSEXT_ERR_NOACK;
} }
/* --- tsi_ssl_handshaker_factory constructors. --- */ /* --- tsi_ssl_handshaker_factory constructors. --- */
@ -1184,6 +1233,14 @@ tsi_result tsi_create_ssl_client_handshaker_factory(
gpr_log(GPR_ERROR, "Could not create ssl context."); gpr_log(GPR_ERROR, "Could not create ssl context.");
return TSI_INVALID_ARGUMENT; return TSI_INVALID_ARGUMENT;
} }
impl = calloc(1, sizeof(tsi_ssl_client_handshaker_factory));
if (impl == NULL) {
SSL_CTX_free(ssl_context);
return TSI_OUT_OF_RESOURCES;
}
impl->ssl_context = ssl_context;
do { do {
result = result =
populate_ssl_context(ssl_context, pem_private_key, pem_private_key_size, populate_ssl_context(ssl_context, pem_private_key, pem_private_key_size,
@ -1197,41 +1254,33 @@ tsi_result tsi_create_ssl_client_handshaker_factory(
} }
if (num_alpn_protocols != 0) { if (num_alpn_protocols != 0) {
unsigned char* alpn_protocol_list = NULL;
size_t alpn_protocol_list_length = 0;
int ssl_failed;
result = build_alpn_protocol_name_list( result = build_alpn_protocol_name_list(
alpn_protocols, alpn_protocols_lengths, num_alpn_protocols, alpn_protocols, alpn_protocols_lengths, num_alpn_protocols,
&alpn_protocol_list, &alpn_protocol_list_length); &impl->alpn_protocol_list, &impl->alpn_protocol_list_length);
if (result != TSI_OK) { if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Building alpn list failed with error %s.", gpr_log(GPR_ERROR, "Building alpn list failed with error %s.",
tsi_result_to_string(result)); tsi_result_to_string(result));
free(alpn_protocol_list);
break; break;
} }
ssl_failed = SSL_CTX_set_alpn_protos(ssl_context, alpn_protocol_list, #if TSI_OPENSSL_ALPN_SUPPORT
alpn_protocol_list_length); if (SSL_CTX_set_alpn_protos(ssl_context, impl->alpn_protocol_list,
free(alpn_protocol_list); impl->alpn_protocol_list_length)) {
if (ssl_failed) {
gpr_log(GPR_ERROR, "Could not set alpn protocol list to context."); gpr_log(GPR_ERROR, "Could not set alpn protocol list to context.");
result = TSI_INVALID_ARGUMENT; result = TSI_INVALID_ARGUMENT;
break; break;
} }
#endif /* TSI_OPENSSL_ALPN_SUPPORT */
SSL_CTX_set_next_proto_select_cb(
ssl_context, client_handshaker_factory_npn_callback, impl);
} }
} while (0); } while (0);
if (result != TSI_OK) { if (result != TSI_OK) {
SSL_CTX_free(ssl_context); ssl_client_handshaker_factory_destroy(&impl->base);
return result; return result;
} }
SSL_CTX_set_verify(ssl_context, SSL_VERIFY_PEER, NULL); SSL_CTX_set_verify(ssl_context, SSL_VERIFY_PEER, NULL);
/* TODO(jboeuf): Add revocation verification. */ /* TODO(jboeuf): Add revocation verification. */
impl = calloc(1, sizeof(tsi_ssl_client_handshaker_factory));
if (impl == NULL) {
SSL_CTX_free(ssl_context);
return TSI_OUT_OF_RESOURCES;
}
impl->ssl_context = ssl_context;
impl->base.create_handshaker = impl->base.create_handshaker =
ssl_client_handshaker_factory_create_handshaker; ssl_client_handshaker_factory_create_handshaker;
impl->base.destroy = ssl_client_handshaker_factory_destroy; impl->base.destroy = ssl_client_handshaker_factory_destroy;
@ -1322,8 +1371,13 @@ tsi_result tsi_create_ssl_server_handshaker_factory(
impl->ssl_contexts[i], impl->ssl_contexts[i],
ssl_server_handshaker_factory_servername_callback); ssl_server_handshaker_factory_servername_callback);
SSL_CTX_set_tlsext_servername_arg(impl->ssl_contexts[i], impl); SSL_CTX_set_tlsext_servername_arg(impl->ssl_contexts[i], impl);
#if TSI_OPENSSL_ALPN_SUPPORT
SSL_CTX_set_alpn_select_cb(impl->ssl_contexts[i], SSL_CTX_set_alpn_select_cb(impl->ssl_contexts[i],
server_handshaker_factory_alpn_callback, impl); server_handshaker_factory_alpn_callback, impl);
#endif /* TSI_OPENSSL_ALPN_SUPPORT */
SSL_CTX_set_next_protos_advertised_cb(
impl->ssl_contexts[i],
server_handshaker_factory_npn_advertised_callback, impl);
} while (0); } while (0);
if (result != TSI_OK) { if (result != TSI_OK) {

@ -71,7 +71,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
RpcMethod::SERVER_STREAMING), RpcMethod::SERVER_STREAMING),
has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC || has_response_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() == method->method_type() ==
RpcMethod::CLIENT_STREAMING) { RpcMethod::CLIENT_STREAMING),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_); grpc_metadata_array_init(&request_metadata_);
} }
@ -90,10 +91,18 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd; return mrd;
} }
void SetupRequest() {
cq_ = grpc_completion_queue_create();
}
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
cq_ = nullptr;
}
void Request(grpc_server* server, grpc_completion_queue* notify_cq) { void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(!in_flight_); GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true; in_flight_ = true;
cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK == GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call( grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_, server, tag_, &call_, &deadline_, &request_metadata_,
@ -288,6 +297,7 @@ bool Server::Start() {
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
m->SetupRequest();
m->Request(server_, cq_.cq()); m->Request(server_, cq_.cq());
} }
@ -472,9 +482,13 @@ void Server::RunRpc() {
if (ok) { if (ok) {
SyncRequest::CallData cd(this, mrd); SyncRequest::CallData cd(this, mrd);
{ {
mrd->SetupRequest();
grpc::unique_lock<grpc::mutex> lock(mu_); grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) { if (!shutdown_) {
mrd->Request(server_, cq_.cq()); mrd->Request(server_, cq_.cq());
} else {
// destroy the structure that was created
mrd->TeardownRequest();
} }
} }
cd.Run(); cd.Run();

@ -12,7 +12,7 @@ Metrics/AbcSize:
# Offense count: 3 # Offense count: 3
# Configuration parameters: CountComments. # Configuration parameters: CountComments.
Metrics/ClassLength: Metrics/ClassLength:
Max: 192 Max: 200
# Offense count: 35 # Offense count: 35
# Configuration parameters: CountComments. # Configuration parameters: CountComments.

@ -284,7 +284,8 @@ class NamedTests
op = @stub.full_duplex_call(ppp.each_item, return_op: true) op = @stub.full_duplex_call(ppp.each_item, return_op: true)
ppp.canceller_op = op # causes ppp to cancel after the 1st message ppp.canceller_op = op # causes ppp to cancel after the 1st message
op.execute.each { |r| ppp.queue.push(r) } op.execute.each { |r| ppp.queue.push(r) }
assert(op.cancelled, 'call operation should be CANCELLED') op.wait
assert(op.cancelled, 'call operation was not CANCELLED')
p 'OK: cancel_after_first_response' p 'OK: cancel_after_first_response'
end end

@ -120,6 +120,7 @@ module GRPC
@started = started @started = started
@unmarshal = unmarshal @unmarshal = unmarshal
@metadata_tag = metadata_tag @metadata_tag = metadata_tag
@op_notifier = nil
end end
# output_metadata are provides access to hash that can be used to # output_metadata are provides access to hash that can be used to
@ -148,6 +149,7 @@ module GRPC
# operation provides a restricted view of this ActiveCall for use as # operation provides a restricted view of this ActiveCall for use as
# a Operation. # a Operation.
def operation def operation
@op_notifier = Notifier.new
Operation.new(self) Operation.new(self)
end end
@ -167,6 +169,7 @@ module GRPC
batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
return unless assert_finished return unless assert_finished
@call.status = batch_result.status @call.status = batch_result.status
op_is_done
batch_result.check_status batch_result.check_status
end end
@ -184,6 +187,7 @@ module GRPC
end end
end end
@call.status = batch_result.status @call.status = batch_result.status
op_is_done
batch_result.check_status batch_result.check_status
end end
@ -415,7 +419,7 @@ module GRPC
def bidi_streamer(requests, **kw, &blk) def bidi_streamer(requests, **kw, &blk)
start_call(**kw) unless @started start_call(**kw) unless @started
bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline) bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, @deadline)
bd.run_on_client(requests, &blk) bd.run_on_client(requests, @op_notifier, &blk)
end end
# run_server_bidi orchestrates a BiDi stream processing on a server. # run_server_bidi orchestrates a BiDi stream processing on a server.
@ -434,6 +438,19 @@ module GRPC
bd.run_on_server(gen_each_reply) bd.run_on_server(gen_each_reply)
end end
# Waits till an operation completes
def wait
return if @op_notifier.nil?
GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
@op_notifier.wait
end
# Signals that an operation is done
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
end
private private
# Starts the call if not already started # Starts the call if not already started
@ -468,6 +485,6 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as # Operation limits access to an ActiveCall's methods for use as
# a Operation on the client. # a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute, Operation = view_class(:cancel, :cancelled, :deadline, :execute,
:metadata, :status, :start_call) :metadata, :status, :start_call, :wait)
end end
end end

@ -66,6 +66,7 @@ module GRPC
@cq = q @cq = q
@deadline = deadline @deadline = deadline
@marshal = marshal @marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new @readq = Queue.new
@unmarshal = unmarshal @unmarshal = unmarshal
end end
@ -76,8 +77,10 @@ module GRPC
# block that can be invoked with each response. # block that can be invoked with each response.
# #
# @param requests the Enumerable of requests to send # @param requests the Enumerable of requests to send
# @op_notifier a Notifier used to signal completion
# @return an Enumerator of requests to yield # @return an Enumerator of requests to yield
def run_on_client(requests, &blk) def run_on_client(requests, op_notifier, &blk)
@op_notifier = op_notifier
@enq_th = Thread.new { write_loop(requests) } @enq_th = Thread.new { write_loop(requests) }
@loop_th = start_read_loop @loop_th = start_read_loop
each_queued_msg(&blk) each_queued_msg(&blk)
@ -105,6 +108,13 @@ module GRPC
END_OF_READS = :end_of_reads END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes END_OF_WRITES = :end_of_writes
# signals that bidi operation is complete
def notify_done
return unless @op_notifier
GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
@op_notifier.notify(self)
end
# each_queued_msg yields each message on this instances readq # each_queued_msg yields each message on this instances readq
# #
# - messages are added to the readq by #read_loop # - messages are added to the readq by #read_loop
@ -143,11 +153,13 @@ module GRPC
@call.status = batch_result.status @call.status = batch_result.status
batch_result.check_status batch_result.check_status
GRPC.logger.debug("bidi-write-loop: done status #{@call.status}") GRPC.logger.debug("bidi-write-loop: done status #{@call.status}")
notify_done
end end
GRPC.logger.debug('bidi-write-loop: finished') GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e) GRPC.logger.warn(e)
notify_done
raise e raise e
end end

@ -38,6 +38,6 @@ Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info Logging.logger.root.level = :info
# TODO: provide command-line configuration for logging # TODO: provide command-line configuration for logging
Logging.logger['GRPC'].level = :debug Logging.logger['GRPC'].level = :info
Logging.logger['GRPC::ActiveCall'].level = :info Logging.logger['GRPC::ActiveCall'].level = :info
Logging.logger['GRPC::BidiCall'].level = :info Logging.logger['GRPC::BidiCall'].level = :info

@ -64,6 +64,11 @@ typedef struct json_reader_userdata {
static void json_writer_output_char(void* userdata, char c) { static void json_writer_output_char(void* userdata, char c) {
json_writer_userdata* state = userdata; json_writer_userdata* state = userdata;
int cmp = fgetc(state->cmp); int cmp = fgetc(state->cmp);
/* treat CRLF as LF */
if (cmp == '\r' && c == '\n') {
cmp = fgetc(state->cmp);
}
GPR_ASSERT(cmp == c); GPR_ASSERT(cmp == c);
} }

Loading…
Cancel
Save