Merge github.com:grpc/grpc into shindig

pull/3423/head
Craig Tiller 9 years ago
commit 8dc0971133
  1. 2
      Makefile
  2. 14
      src/core/channel/client_channel.c
  3. 20
      src/core/channel/http_client_filter.c
  4. 9
      src/core/channel/http_server_filter.c
  5. 4
      src/core/iomgr/resolve_address_posix.c
  6. 10
      src/core/security/credentials.c
  7. 60
      src/core/support/cmdline.c
  8. 22
      src/core/surface/call.c
  9. 14
      src/core/surface/secure_channel_create.c
  10. 56
      src/core/surface/server.c
  11. 2
      templates/Makefile.template

@ -248,7 +248,7 @@ ifdef EXTRA_DEFINES
DEFINES += $(EXTRA_DEFINES) DEFINES += $(EXTRA_DEFINES)
endif endif
CFLAGS += -std=c89 -pedantic -Wsign-conversion -Wconversion CFLAGS += -std=c89 -pedantic -Wsign-conversion -Wconversion -Wshadow
ifeq ($(HAS_CXX11),true) ifeq ($(HAS_CXX11),true)
CXXFLAGS += -std=c++11 CXXFLAGS += -std=c++11
else else

@ -384,18 +384,18 @@ static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&chand->mu_config); gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy; lb_policy = chand->lb_policy;
if (lb_policy) { if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op; grpc_transport_stream_op *waiting_op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset; grpc_pollset *bind_pollset = waiting_op->bind_pollset;
grpc_metadata_batch *initial_metadata = grpc_metadata_batch *initial_metadata =
&op->send_ops->ops[0].data.metadata; &waiting_op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick"); GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK; calld->state = CALL_WAITING_FOR_PICK;
GPR_ASSERT(op->bind_pollset); GPR_ASSERT(waiting_op->bind_pollset);
GPR_ASSERT(op->send_ops); GPR_ASSERT(waiting_op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1); GPR_ASSERT(waiting_op->send_ops->nops >= 1);
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA); GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, picked_target, calld); grpc_closure_init(&calld->async_setup_task, picked_target, calld);

@ -127,21 +127,25 @@ static void hc_mutate_op(grpc_call_element *elem,
size_t nops = op->send_ops->nops; size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops; grpc_stream_op *ops = op->send_ops->ops;
for (i = 0; i < nops; i++) { for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i]; grpc_stream_op *stream_op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue; if (stream_op->type != GRPC_OP_METADATA) continue;
calld->sent_initial_metadata = 1; calld->sent_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); grpc_metadata_batch_filter(&stream_op->data.metadata, client_strip_filter,
elem);
/* Send : prefixed headers, which have to be before any application /* Send : prefixed headers, which have to be before any application
layer headers. */ layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->method,
GRPC_MDELEM_REF(channeld->method)); GRPC_MDELEM_REF(channeld->method));
grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->scheme,
GRPC_MDELEM_REF(channeld->scheme)); GRPC_MDELEM_REF(channeld->scheme));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->te_trailers,
GRPC_MDELEM_REF(channeld->te_trailers)); GRPC_MDELEM_REF(channeld->te_trailers));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->content_type,
GRPC_MDELEM_REF(channeld->content_type)); GRPC_MDELEM_REF(channeld->content_type));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent, grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->user_agent,
GRPC_MDELEM_REF(channeld->user_agent)); GRPC_MDELEM_REF(channeld->user_agent));
break; break;
} }

@ -206,12 +206,13 @@ static void hs_mutate_op(grpc_call_element *elem,
size_t nops = op->send_ops->nops; size_t nops = op->send_ops->nops;
grpc_stream_op *ops = op->send_ops->ops; grpc_stream_op *ops = op->send_ops->ops;
for (i = 0; i < nops; i++) { for (i = 0; i < nops; i++) {
grpc_stream_op *op = &ops[i]; grpc_stream_op *stream_op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue; if (stream_op->type != GRPC_OP_METADATA) continue;
calld->sent_status = 1; calld->sent_status = 1;
grpc_metadata_batch_add_head(&op->data.metadata, &calld->status, grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->status,
GRPC_MDELEM_REF(channeld->status_ok)); GRPC_MDELEM_REF(channeld->status_ok));
grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, grpc_metadata_batch_add_tail(&stream_op->data.metadata,
&calld->content_type,
GRPC_MDELEM_REF(channeld->content_type)); GRPC_MDELEM_REF(channeld->content_type));
break; break;
} }

@ -50,6 +50,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/thd.h> #include <grpc/support/thd.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
#include <grpc/support/useful.h>
typedef struct { typedef struct {
char *name; char *name;
@ -106,8 +107,7 @@ grpc_resolved_addresses *grpc_blocking_resolve_address(
if (s != 0) { if (s != 0) {
/* Retry if well-known service name is recognized */ /* Retry if well-known service name is recognized */
char *svc[][2] = {{"http", "80"}, {"https", "443"}}; char *svc[][2] = {{"http", "80"}, {"https", "443"}};
int i; for (i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
for (i = 0; i < (int)(sizeof(svc) / sizeof(svc[0])); i++) {
if (strcmp(port, svc[i][0]) == 0) { if (strcmp(port, svc[i][0]) == 0) {
s = getaddrinfo(host, svc[i][1], &hints, &result); s = getaddrinfo(host, svc[i][1], &hints, &result);
break; break;

@ -218,7 +218,7 @@ static grpc_security_status ssl_create_security_connector(
grpc_security_status status = GRPC_SECURITY_OK; grpc_security_status status = GRPC_SECURITY_OK;
size_t i = 0; size_t i = 0;
const char *overridden_target_name = NULL; const char *overridden_target_name = NULL;
grpc_arg arg; grpc_arg new_arg;
for (i = 0; args && i < args->num_args; i++) { for (i = 0; args && i < args->num_args; i++) {
grpc_arg *arg = &args->args[i]; grpc_arg *arg = &args->args[i];
@ -233,10 +233,10 @@ static grpc_security_status ssl_create_security_connector(
if (status != GRPC_SECURITY_OK) { if (status != GRPC_SECURITY_OK) {
return status; return status;
} }
arg.type = GRPC_ARG_STRING; new_arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_HTTP2_SCHEME; new_arg.key = GRPC_ARG_HTTP2_SCHEME;
arg.value.string = "https"; new_arg.value.string = "https";
*new_args = grpc_channel_args_copy_and_add(args, &arg, 1); *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1);
return status; return status;
} }

@ -192,9 +192,9 @@ static void print_usage_and_die(gpr_cmdline *cl) {
exit(1); exit(1);
} }
static void extra_state(gpr_cmdline *cl, char *arg) { static void extra_state(gpr_cmdline *cl, char *str) {
if (!cl->extra_arg) print_usage_and_die(cl); if (!cl->extra_arg) print_usage_and_die(cl);
cl->extra_arg(cl->extra_arg_user_data, arg); cl->extra_arg(cl->extra_arg_user_data, str);
} }
static arg *find_arg(gpr_cmdline *cl, char *name) { static arg *find_arg(gpr_cmdline *cl, char *name) {
@ -214,7 +214,7 @@ static arg *find_arg(gpr_cmdline *cl, char *name) {
return a; return a;
} }
static void value_state(gpr_cmdline *cl, char *arg) { static void value_state(gpr_cmdline *cl, char *str) {
long intval; long intval;
char *end; char *end;
@ -222,80 +222,80 @@ static void value_state(gpr_cmdline *cl, char *arg) {
switch (cl->cur_arg->type) { switch (cl->cur_arg->type) {
case ARGTYPE_INT: case ARGTYPE_INT:
intval = strtol(arg, &end, 0); intval = strtol(str, &end, 0);
if (*end || intval < INT_MIN || intval > INT_MAX) { if (*end || intval < INT_MIN || intval > INT_MAX) {
fprintf(stderr, "expected integer, got '%s' for %s\n", arg, fprintf(stderr, "expected integer, got '%s' for %s\n", str,
cl->cur_arg->name); cl->cur_arg->name);
print_usage_and_die(cl); print_usage_and_die(cl);
} }
*(int *)cl->cur_arg->value = (int)intval; *(int *)cl->cur_arg->value = (int)intval;
break; break;
case ARGTYPE_BOOL: case ARGTYPE_BOOL:
if (0 == strcmp(arg, "1") || 0 == strcmp(arg, "true")) { if (0 == strcmp(str, "1") || 0 == strcmp(str, "true")) {
*(int *)cl->cur_arg->value = 1; *(int *)cl->cur_arg->value = 1;
} else if (0 == strcmp(arg, "0") || 0 == strcmp(arg, "false")) { } else if (0 == strcmp(str, "0") || 0 == strcmp(str, "false")) {
*(int *)cl->cur_arg->value = 0; *(int *)cl->cur_arg->value = 0;
} else { } else {
fprintf(stderr, "expected boolean, got '%s' for %s\n", arg, fprintf(stderr, "expected boolean, got '%s' for %s\n", str,
cl->cur_arg->name); cl->cur_arg->name);
print_usage_and_die(cl); print_usage_and_die(cl);
} }
break; break;
case ARGTYPE_STRING: case ARGTYPE_STRING:
*(char **)cl->cur_arg->value = arg; *(char **)cl->cur_arg->value = str;
break; break;
} }
cl->state = normal_state; cl->state = normal_state;
} }
static void normal_state(gpr_cmdline *cl, char *arg) { static void normal_state(gpr_cmdline *cl, char *str) {
char *eq = NULL; char *eq = NULL;
char *tmp = NULL; char *tmp = NULL;
char *arg_name = NULL; char *arg_name = NULL;
if (0 == strcmp(arg, "-help") || 0 == strcmp(arg, "--help") || if (0 == strcmp(str, "-help") || 0 == strcmp(str, "--help") ||
0 == strcmp(arg, "-h")) { 0 == strcmp(str, "-h")) {
print_usage_and_die(cl); print_usage_and_die(cl);
} }
cl->cur_arg = NULL; cl->cur_arg = NULL;
if (arg[0] == '-') { if (str[0] == '-') {
if (arg[1] == '-') { if (str[1] == '-') {
if (arg[2] == 0) { if (str[2] == 0) {
/* handle '--' to move to just extra args */ /* handle '--' to move to just extra args */
cl->state = extra_state; cl->state = extra_state;
return; return;
} }
arg += 2; str += 2;
} else { } else {
arg += 1; str += 1;
} }
/* first byte of arg is now past the leading '-' or '--' */ /* first byte of str is now past the leading '-' or '--' */
if (arg[0] == 'n' && arg[1] == 'o' && arg[2] == '-') { if (str[0] == 'n' && str[1] == 'o' && str[2] == '-') {
/* arg is of the form '--no-foo' - it's a flag disable */ /* str is of the form '--no-foo' - it's a flag disable */
arg += 3; str += 3;
cl->cur_arg = find_arg(cl, arg); cl->cur_arg = find_arg(cl, str);
if (cl->cur_arg->type != ARGTYPE_BOOL) { if (cl->cur_arg->type != ARGTYPE_BOOL) {
fprintf(stderr, "%s is not a flag argument\n", arg); fprintf(stderr, "%s is not a flag argument\n", str);
print_usage_and_die(cl); print_usage_and_die(cl);
} }
*(int *)cl->cur_arg->value = 0; *(int *)cl->cur_arg->value = 0;
return; /* early out */ return; /* early out */
} }
eq = strchr(arg, '='); eq = strchr(str, '=');
if (eq != NULL) { if (eq != NULL) {
/* copy the string into a temp buffer and extract the name */ /* copy the string into a temp buffer and extract the name */
tmp = arg_name = gpr_malloc((size_t)(eq - arg + 1)); tmp = arg_name = gpr_malloc((size_t)(eq - str + 1));
memcpy(arg_name, arg, (size_t)(eq - arg)); memcpy(arg_name, str, (size_t)(eq - str));
arg_name[eq - arg] = 0; arg_name[eq - str] = 0;
} else { } else {
arg_name = arg; arg_name = str;
} }
cl->cur_arg = find_arg(cl, arg_name); cl->cur_arg = find_arg(cl, arg_name);
if (eq != NULL) { if (eq != NULL) {
/* arg was of the type --foo=value, parse the value */ /* str was of the type --foo=value, parse the value */
value_state(cl, eq + 1); value_state(cl, eq + 1);
} else if (cl->cur_arg->type != ARGTYPE_BOOL) { } else if (cl->cur_arg->type != ARGTYPE_BOOL) {
/* flag types don't have a '--foo value' variant, other types do */ /* flag types don't have a '--foo value' variant, other types do */
@ -305,7 +305,7 @@ static void normal_state(gpr_cmdline *cl, char *arg) {
*(int *)cl->cur_arg->value = 1; *(int *)cl->cur_arg->value = 1;
} }
} else { } else {
extra_state(cl, arg); extra_state(cl, str);
} }
gpr_free(tmp); gpr_free(tmp);

@ -1479,18 +1479,18 @@ static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA; is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
for (l = md->list.head; l != NULL; l = l->next) { for (l = md->list.head; l != NULL; l = l->next) {
grpc_mdelem *md = l->md; grpc_mdelem *mdel = l->md;
grpc_mdstr *key = md->key; grpc_mdstr *key = mdel->key;
if (key == grpc_channel_get_status_string(call->channel)) { if (key == grpc_channel_get_status_string(call->channel)) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md)); set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel));
} else if (key == grpc_channel_get_message_string(call->channel)) { } else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(md->value)); set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value));
} else if (key == } else if (key ==
grpc_channel_get_compression_algorithm_string(call->channel)) { grpc_channel_get_compression_algorithm_string(call->channel)) {
set_compression_algorithm(call, decode_compression(md)); set_compression_algorithm(call, decode_compression(mdel));
} else if (key == grpc_channel_get_encodings_accepted_by_peer_string( } else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
call->channel)) { call->channel)) {
set_encodings_accepted_by_peer(call, md->value->slice); set_encodings_accepted_by_peer(call, mdel->value->slice);
} else { } else {
dest = &call->buffered_metadata[is_trailing]; dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) { if (dest->count == dest->capacity) {
@ -1499,9 +1499,9 @@ static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity); gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
} }
mdusr = &dest->metadata[dest->count++]; mdusr = &dest->metadata[dest->count++];
mdusr->key = grpc_mdstr_as_c_string(md->key); mdusr->key = grpc_mdstr_as_c_string(mdel->key);
mdusr->value = grpc_mdstr_as_c_string(md->value); mdusr->value = grpc_mdstr_as_c_string(mdel->value);
mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice);
if (call->owned_metadata_count == call->owned_metadata_capacity) { if (call->owned_metadata_count == call->owned_metadata_capacity) {
call->owned_metadata_capacity = call->owned_metadata_capacity =
GPR_MAX(call->owned_metadata_capacity + 8, GPR_MAX(call->owned_metadata_capacity + 8,
@ -1510,8 +1510,8 @@ static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
gpr_realloc(call->owned_metadata, gpr_realloc(call->owned_metadata,
sizeof(grpc_mdelem *) * call->owned_metadata_capacity); sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
} }
call->owned_metadata[call->owned_metadata_count++] = md; call->owned_metadata[call->owned_metadata_count++] = mdel;
l->md = 0; l->md = NULL;
} }
} }
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=

@ -237,7 +237,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_arg connector_arg; grpc_arg connector_arg;
grpc_channel_args *args_copy; grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector; grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector *connector; grpc_channel_security_connector *security_connector;
grpc_mdctx *mdctx; grpc_mdctx *mdctx;
grpc_resolver *resolver; grpc_resolver *resolver;
subchannel_factory *f; subchannel_factory *f;
@ -255,15 +255,15 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
} }
if (grpc_credentials_create_security_connector( if (grpc_credentials_create_security_connector(
creds, target, args, NULL, &connector, &new_args_from_connector) != creds, target, args, NULL, &security_connector,
GRPC_SECURITY_OK) { &new_args_from_connector) != GRPC_SECURITY_OK) {
return grpc_lame_client_channel_create( return grpc_lame_client_channel_create(
target, GRPC_STATUS_INVALID_ARGUMENT, target, GRPC_STATUS_INVALID_ARGUMENT,
"Failed to create security connector."); "Failed to create security connector.");
} }
mdctx = grpc_mdctx_create(); mdctx = grpc_mdctx_create();
connector_arg = grpc_security_connector_to_arg(&connector->base); connector_arg = grpc_security_connector_to_arg(&security_connector->base);
args_copy = grpc_channel_args_copy_and_add( args_copy = grpc_channel_args_copy_and_add(
new_args_from_connector != NULL ? new_args_from_connector : args, new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg, 1); &connector_arg, 1);
@ -282,8 +282,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
gpr_ref_init(&f->refs, 1); gpr_ref_init(&f->refs, 1);
grpc_mdctx_ref(mdctx); grpc_mdctx_ref(mdctx);
f->mdctx = mdctx; f->mdctx = mdctx;
GRPC_SECURITY_CONNECTOR_REF(&connector->base, "subchannel_factory"); GRPC_SECURITY_CONNECTOR_REF(&security_connector->base, "subchannel_factory");
f->security_connector = connector; f->security_connector = security_connector;
f->merge_args = grpc_channel_args_copy(args_copy); f->merge_args = grpc_channel_args_copy(args_copy);
f->master = channel; f->master = channel;
GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory"); GRPC_CHANNEL_INTERNAL_REF(channel, "subchannel_factory");
@ -296,7 +296,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
&exec_ctx, grpc_channel_get_channel_stack(channel), resolver); &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create"); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
grpc_subchannel_factory_unref(&exec_ctx, &f->base); grpc_subchannel_factory_unref(&exec_ctx, &f->base);
GRPC_SECURITY_CONNECTOR_UNREF(&connector->base, "channel_create"); GRPC_SECURITY_CONNECTOR_UNREF(&security_connector->base, "channel_create");
grpc_channel_args_destroy(args_copy); grpc_channel_args_destroy(args_copy);
if (new_args_from_connector != NULL) { if (new_args_from_connector != NULL) {

@ -303,26 +303,25 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher * request_matcher
*/ */
static void request_matcher_init(request_matcher *request_matcher, static void request_matcher_init(request_matcher *rm, size_t entries) {
size_t entries) { memset(rm, 0, sizeof(*rm));
memset(request_matcher, 0, sizeof(*request_matcher)); rm->requests = gpr_stack_lockfree_create(entries);
request_matcher->requests = gpr_stack_lockfree_create(entries);
} }
static void request_matcher_destroy(request_matcher *request_matcher) { static void request_matcher_destroy(request_matcher *rm) {
GPR_ASSERT(gpr_stack_lockfree_pop(request_matcher->requests) == -1); GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
gpr_stack_lockfree_destroy(request_matcher->requests); gpr_stack_lockfree_destroy(rm->requests);
} }
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem)); grpc_call_destroy(grpc_call_from_top_element(elem));
} }
static void request_matcher_zombify_all_pending_calls( static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, request_matcher *request_matcher) { request_matcher *rm) {
while (request_matcher->pending_head) { while (rm->pending_head) {
call_data *calld = request_matcher->pending_head; call_data *calld = rm->pending_head;
request_matcher->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
gpr_mu_lock(&calld->mu_state); gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED; calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
@ -413,8 +412,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
} }
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server, static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
grpc_call_element *elem, grpc_call_element *elem, request_matcher *rm) {
request_matcher *request_matcher) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
int request_id; int request_id;
@ -427,17 +425,17 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
return; return;
} }
request_id = gpr_stack_lockfree_pop(request_matcher->requests); request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) { if (request_id == -1) {
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
gpr_mu_lock(&calld->mu_state); gpr_mu_lock(&calld->mu_state);
calld->state = PENDING; calld->state = PENDING;
gpr_mu_unlock(&calld->mu_state); gpr_mu_unlock(&calld->mu_state);
if (request_matcher->pending_head == NULL) { if (rm->pending_head == NULL) {
request_matcher->pending_tail = request_matcher->pending_head = calld; rm->pending_tail = rm->pending_head = calld;
} else { } else {
request_matcher->pending_tail->pending_next = calld; rm->pending_tail->pending_next = calld;
request_matcher->pending_tail = calld; rm->pending_tail = calld;
} }
calld->pending_next = NULL; calld->pending_next = NULL;
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
@ -1119,7 +1117,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
grpc_server *server, grpc_server *server,
requested_call *rc) { requested_call *rc) {
call_data *calld = NULL; call_data *calld = NULL;
request_matcher *request_matcher = NULL; request_matcher *rm = NULL;
int request_id; int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) { if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, rc); fail_call(exec_ctx, server, rc);
@ -1133,22 +1131,22 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
} }
switch (rc->type) { switch (rc->type) {
case BATCH_CALL: case BATCH_CALL:
request_matcher = &server->unregistered_request_matcher; rm = &server->unregistered_request_matcher;
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
request_matcher = &rc->data.registered.registered_method->request_matcher; rm = &rc->data.registered.registered_method->request_matcher;
break; break;
} }
server->requested_calls[request_id] = *rc; server->requested_calls[request_id] = *rc;
gpr_free(rc); gpr_free(rc);
if (gpr_stack_lockfree_push(request_matcher->requests, request_id)) { if (gpr_stack_lockfree_push(rm->requests, request_id)) {
/* this was the first queued request: we need to lock and start /* this was the first queued request: we need to lock and start
matching calls */ matching calls */
gpr_mu_lock(&server->mu_call); gpr_mu_lock(&server->mu_call);
while ((calld = request_matcher->pending_head) != NULL) { while ((calld = rm->pending_head) != NULL) {
request_id = gpr_stack_lockfree_pop(request_matcher->requests); request_id = gpr_stack_lockfree_pop(rm->requests);
if (request_id == -1) break; if (request_id == -1) break;
request_matcher->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
gpr_mu_unlock(&server->mu_call); gpr_mu_unlock(&server->mu_call);
gpr_mu_lock(&calld->mu_state); gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) { if (calld->state == ZOMBIED) {
@ -1204,14 +1202,14 @@ done:
} }
grpc_call_error grpc_server_request_registered_call( grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *rm, grpc_call **call, gpr_timespec *deadline, grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload, grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_bound_to_call,
grpc_completion_queue *cq_for_notification, void *tag) { grpc_completion_queue *cq_for_notification, void *tag) {
grpc_call_error error; grpc_call_error error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
requested_call *rc = gpr_malloc(sizeof(*rc)); requested_call *rc = gpr_malloc(sizeof(*rc));
registered_method *registered_method = rm; registered_method *rm = rmp;
if (!grpc_cq_is_server_cq(cq_for_notification)) { if (!grpc_cq_is_server_cq(cq_for_notification)) {
gpr_free(rc); gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE; error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
@ -1224,7 +1222,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->cq_bound_to_call = cq_bound_to_call; rc->cq_bound_to_call = cq_bound_to_call;
rc->cq_for_notification = cq_for_notification; rc->cq_for_notification = cq_for_notification;
rc->call = call; rc->call = call;
rc->data.registered.registered_method = registered_method; rc->data.registered.registered_method = rm;
rc->data.registered.deadline = deadline; rc->data.registered.deadline = deadline;
rc->data.registered.initial_metadata = initial_metadata; rc->data.registered.initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload; rc->data.registered.optional_payload = optional_payload;

@ -264,7 +264,7 @@
DEFINES += $(EXTRA_DEFINES) DEFINES += $(EXTRA_DEFINES)
endif endif
CFLAGS += -std=c89 -pedantic -Wsign-conversion -Wconversion CFLAGS += -std=c89 -pedantic -Wsign-conversion -Wconversion -Wshadow
ifeq ($(HAS_CXX11),true) ifeq ($(HAS_CXX11),true)
CXXFLAGS += -std=c++11 CXXFLAGS += -std=c++11
else else

Loading…
Cancel
Save