removing a completed TODO, removing forward declarations and rearranging code instead, removing _t suffixes with other recommendations

pull/12532/head
Yash Tibrewal 8 years ago
parent bc130daf5f
commit a495220f31
  1. 2
      src/core/ext/filters/client_channel/client_channel_factory.c
  2. 235
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  3. 30
      src/core/ext/transport/chttp2/server/chttp2_server.c
  4. 69
      src/core/lib/surface/server.c

@ -48,8 +48,6 @@ static void* factory_arg_copy(void* factory) {
} }
static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
// TODO(roth): Remove local exec_ctx when
// https://github.com/grpc/grpc/pull/8705 is merged.
grpc_client_channel_factory_unref(exec_ctx, grpc_client_channel_factory_unref(exec_ctx,
(grpc_client_channel_factory*)factory); (grpc_client_channel_factory*)factory);
} }

@ -287,48 +287,6 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
*/ */
typedef struct rr_connectivity_data rr_connectivity_data; typedef struct rr_connectivity_data rr_connectivity_data;
/* Forward declare functions referred in glb_lb_policy_vtable */
static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol);
static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol);
static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
const grpc_lb_policy_pick_args *pick_args,
grpc_connected_subchannel **target,
grpc_call_context_element *context, void **user_data,
grpc_closure *on_complete);
static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target,
grpc_error *error);
static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq,
grpc_error *error);
static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *closure);
static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol);
static grpc_connectivity_state glb_check_connectivity_locked(
grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_error **connectivity_error);
static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *pol,
grpc_connectivity_state *current,
grpc_closure *notify);
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args);
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
glb_shutdown_locked,
glb_pick_locked,
glb_cancel_pick_locked,
glb_cancel_picks_locked,
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
glb_update_locked};
typedef struct glb_lb_policy { typedef struct glb_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
@ -1011,92 +969,6 @@ static grpc_channel_args *build_lb_channel_args(
static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
void *arg, void *arg,
grpc_error *error); grpc_error *error);
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
return NULL;
}
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
glb_policy->server_name);
}
grpc_uri_destroy(uri);
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg =
grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
char *uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
return &glb_policy->base;
}
static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
@ -1936,17 +1808,6 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
&glb_policy->lb_channel_connectivity, &glb_policy->lb_channel_connectivity,
&glb_policy->lb_channel_on_connectivity_changed, NULL); &glb_policy->lb_channel_on_connectivity_changed, NULL);
} }
// Propagate update to fallback_backend_addresses if a non-empty serverlist
// hasn't been received from the balancer.
if (glb_policy->serverlist == NULL) {
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
if (glb_policy->rr_policy != NULL) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
} }
// Invoked as part of the update process. It continues watching the LB channel // Invoked as part of the update process. It continues watching the LB channel
@ -2013,6 +1874,102 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
} }
} }
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
glb_shutdown_locked,
glb_pick_locked,
glb_cancel_pick_locked,
glb_cancel_picks_locked,
glb_ping_one_locked,
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
glb_update_locked};
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one.
* TODO(roth): For now, we ignore non-balancer addresses, but in the
* future, we may change the behavior such that we fall back to using
* the non-balancer addresses if we cannot reach any balancers. In the
* fallback case, we should use the LB policy indicated by
* GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is
* unset, we should default to pick_first). */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
return NULL;
}
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
glb_policy->server_name);
}
grpc_uri_destroy(uri);
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg =
grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
char *uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
return &glb_policy->base;
}
static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
static void glb_factory_unref(grpc_lb_policy_factory *factory) {} static void glb_factory_unref(grpc_lb_policy_factory *factory) {}

@ -49,10 +49,10 @@ typedef struct {
grpc_closure tcp_server_shutdown_complete; grpc_closure tcp_server_shutdown_complete;
grpc_closure *server_destroy_listener_done; grpc_closure *server_destroy_listener_done;
grpc_handshake_manager *pending_handshake_mgrs; grpc_handshake_manager *pending_handshake_mgrs;
} server_state_t; } server_state;
typedef struct { typedef struct {
server_state_t *server_state; server_state *svr_state;
grpc_pollset *accepting_pollset; grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor; grpc_tcp_server_acceptor *acceptor;
grpc_handshake_manager *handshake_mgr; grpc_handshake_manager *handshake_mgr;
@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_handshaker_args *args = (grpc_handshaker_args *)arg; grpc_handshaker_args *args = (grpc_handshaker_args *)arg;
server_connection_state *connection_state = server_connection_state *connection_state =
(server_connection_state *)args->user_data; (server_connection_state *)args->user_data;
gpr_mu_lock(&connection_state->server_state->mu); gpr_mu_lock(&connection_state->svr_state->mu);
if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char *error_str = grpc_error_string(error); const char *error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport *transport = grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
grpc_server_setup_transport( grpc_server_setup_transport(
exec_ctx, connection_state->server_state->server, transport, exec_ctx, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args); connection_state->accepting_pollset, args->args);
grpc_chttp2_transport_start_reading(exec_ctx, transport, grpc_chttp2_transport_start_reading(exec_ctx, transport,
args->read_buffer); args->read_buffer);
@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
grpc_handshake_manager_pending_list_remove( grpc_handshake_manager_pending_list_remove(
&connection_state->server_state->pending_handshake_mgrs, &connection_state->svr_state->pending_handshake_mgrs,
connection_state->handshake_mgr); connection_state->handshake_mgr);
gpr_mu_unlock(&connection_state->server_state->mu); gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor); gpr_free(connection_state->acceptor);
gpr_free(connection_state); gpr_free(connection_state);
} }
@ -109,7 +109,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) { grpc_tcp_server_acceptor *acceptor) {
server_state_t *state = (server_state_t *)arg; server_state *state = (server_state *)arg;
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
if (state->shutdown) { if (state->shutdown) {
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
@ -125,7 +125,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_tcp_server_ref(state->tcp_server); grpc_tcp_server_ref(state->tcp_server);
server_connection_state *connection_state = server_connection_state *connection_state =
(server_connection_state *)gpr_malloc(sizeof(*connection_state)); (server_connection_state *)gpr_malloc(sizeof(*connection_state));
connection_state->server_state = state; connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset; connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor; connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr; connection_state->handshake_mgr = handshake_mgr;
@ -144,7 +144,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server, static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
void *arg, grpc_pollset **pollsets, void *arg, grpc_pollset **pollsets,
size_t pollset_count) { size_t pollset_count) {
server_state_t *state = (server_state_t *)arg; server_state *state = (server_state *)arg;
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
state->shutdown = false; state->shutdown = false;
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
@ -154,7 +154,7 @@ static void server_start_listener(grpc_exec_ctx *exec_ctx, grpc_server *server,
static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
server_state_t *state = (server_state_t *)arg; server_state *state = (server_state *)arg;
/* ensure all threads have unlocked */ /* ensure all threads have unlocked */
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
grpc_closure *destroy_done = state->server_destroy_listener_done; grpc_closure *destroy_done = state->server_destroy_listener_done;
@ -179,7 +179,7 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg,
static void server_destroy_listener(grpc_exec_ctx *exec_ctx, static void server_destroy_listener(grpc_exec_ctx *exec_ctx,
grpc_server *server, void *arg, grpc_server *server, void *arg,
grpc_closure *destroy_done) { grpc_closure *destroy_done) {
server_state_t *state = (server_state_t *)arg; server_state *state = (server_state *)arg;
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
state->shutdown = true; state->shutdown = true;
state->server_destroy_listener_done = destroy_done; state->server_destroy_listener_done = destroy_done;
@ -199,7 +199,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
size_t count = 0; size_t count = 0;
int port_temp; int port_temp;
grpc_error *err = GRPC_ERROR_NONE; grpc_error *err = GRPC_ERROR_NONE;
server_state_t *state = NULL; server_state *state = NULL;
grpc_error **errors = NULL; grpc_error **errors = NULL;
*port_num = -1; *port_num = -1;
@ -209,7 +209,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx,
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
goto error; goto error;
} }
state = (server_state_t *)gpr_zalloc(sizeof(*state)); state = (server_state *)gpr_zalloc(sizeof(*state));
GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete, GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete,
tcp_server_shutdown_complete, state, tcp_server_shutdown_complete, state,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);

@ -55,7 +55,7 @@ typedef struct listener {
typedef struct call_data call_data; typedef struct call_data call_data;
typedef struct channel_data channel_data; typedef struct channel_data channel_data;
typedef struct registered_method_t registered_method_t; typedef struct registered_method registered_method;
typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
@ -76,7 +76,7 @@ typedef struct requested_call {
grpc_call_details *details; grpc_call_details *details;
} batch; } batch;
struct { struct {
registered_method_t *registered_method; registered_method *method;
gpr_timespec *deadline; gpr_timespec *deadline;
grpc_byte_buffer **optional_payload; grpc_byte_buffer **optional_payload;
} registered; } registered;
@ -84,7 +84,7 @@ typedef struct requested_call {
} requested_call; } requested_call;
typedef struct channel_registered_method { typedef struct channel_registered_method {
registered_method_t *server_registered_method; registered_method *server_registered_method;
uint32_t flags; uint32_t flags;
bool has_host; bool has_host;
grpc_slice method; grpc_slice method;
@ -123,7 +123,7 @@ typedef enum {
ZOMBIED ZOMBIED
} call_state; } call_state;
typedef struct request_matcher_t request_matcher_t; typedef struct request_matcher request_matcher;
struct call_data { struct call_data {
grpc_call *call; grpc_call *call;
@ -145,7 +145,7 @@ struct call_data {
uint32_t recv_initial_metadata_flags; uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata; grpc_metadata_array initial_metadata;
request_matcher_t *request_matcher; request_matcher *matcher;
grpc_byte_buffer *payload; grpc_byte_buffer *payload;
grpc_closure got_initial_metadata; grpc_closure got_initial_metadata;
@ -158,21 +158,21 @@ struct call_data {
call_data *pending_next; call_data *pending_next;
}; };
struct request_matcher_t { struct request_matcher {
grpc_server *server; grpc_server *server;
call_data *pending_head; call_data *pending_head;
call_data *pending_tail; call_data *pending_tail;
gpr_stack_lockfree **requests_per_cq; gpr_stack_lockfree **requests_per_cq;
}; };
struct registered_method_t { struct registered_method {
char *method; char *method;
char *host; char *host;
grpc_server_register_method_payload_handling payload_handling; grpc_server_register_method_payload_handling payload_handling;
uint32_t flags; uint32_t flags;
/* one request matcher per method */ /* one request matcher per method */
request_matcher_t request_matcher; request_matcher matcher;
registered_method_t *next; registered_method *next;
}; };
typedef struct { typedef struct {
@ -204,9 +204,9 @@ struct grpc_server {
bool starting; bool starting;
gpr_cv starting_cv; gpr_cv starting_cv;
registered_method_t *registered_methods; registered_method *registered_methods;
/** one request matcher for unregistered methods */ /** one request matcher for unregistered methods */
request_matcher_t unregistered_request_matcher; request_matcher unregistered_request_matcher;
/** free list of available requested_calls_per_cq indices */ /** free list of available requested_calls_per_cq indices */
gpr_stack_lockfree **request_freelist_per_cq; gpr_stack_lockfree **request_freelist_per_cq;
/** requested call backing data */ /** requested call backing data */
@ -313,7 +313,7 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
* request_matcher * request_matcher
*/ */
static void request_matcher_init(request_matcher_t *rm, size_t entries, static void request_matcher_init(request_matcher *rm, size_t entries,
grpc_server *server) { grpc_server *server) {
memset(rm, 0, sizeof(*rm)); memset(rm, 0, sizeof(*rm));
rm->server = server; rm->server = server;
@ -324,7 +324,7 @@ static void request_matcher_init(request_matcher_t *rm, size_t entries,
} }
} }
static void request_matcher_destroy(request_matcher_t *rm) { static void request_matcher_destroy(request_matcher *rm) {
for (size_t i = 0; i < rm->server->cq_count; i++) { for (size_t i = 0; i < rm->server->cq_count; i++) {
GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1);
gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); gpr_stack_lockfree_destroy(rm->requests_per_cq[i]);
@ -338,7 +338,7 @@ static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem,
} }
static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
request_matcher_t *rm) { request_matcher *rm) {
while (rm->pending_head) { while (rm->pending_head) {
call_data *calld = rm->pending_head; call_data *calld = rm->pending_head;
rm->pending_head = calld->pending_next; rm->pending_head = calld->pending_next;
@ -355,7 +355,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
grpc_server *server, grpc_server *server,
request_matcher_t *rm, request_matcher *rm,
grpc_error *error) { grpc_error *error) {
int request_id; int request_id;
for (size_t i = 0; i < server->cq_count; i++) { for (size_t i = 0; i < server->cq_count; i++) {
@ -378,7 +378,7 @@ static void server_ref(grpc_server *server) {
} }
static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
registered_method_t *rm; registered_method *rm;
size_t i; size_t i;
grpc_channel_args_destroy(exec_ctx, server->channel_args); grpc_channel_args_destroy(exec_ctx, server->channel_args);
gpr_mu_destroy(&server->mu_global); gpr_mu_destroy(&server->mu_global);
@ -387,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
while ((rm = server->registered_methods) != NULL) { while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next; server->registered_methods = rm->next;
if (server->started) { if (server->started) {
request_matcher_destroy(&rm->request_matcher); request_matcher_destroy(&rm->matcher);
} }
gpr_free(rm->method); gpr_free(rm->method);
gpr_free(rm->host); gpr_free(rm->host);
@ -519,7 +519,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *call_elem = (grpc_call_element *)arg; grpc_call_element *call_elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)call_elem->call_data; call_data *calld = (call_data *)call_elem->call_data;
channel_data *chand = (channel_data *)call_elem->channel_data; channel_data *chand = (channel_data *)call_elem->channel_data;
request_matcher_t *rm = calld->request_matcher; request_matcher *rm = calld->matcher;
grpc_server *server = rm->server; grpc_server *server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
@ -569,7 +569,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
static void finish_start_new_rpc( static void finish_start_new_rpc(
grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem, grpc_exec_ctx *exec_ctx, grpc_server *server, grpc_call_element *elem,
request_matcher_t *rm, request_matcher *rm,
grpc_server_register_method_payload_handling payload_handling) { grpc_server_register_method_payload_handling payload_handling) {
call_data *calld = (call_data *)elem->call_data; call_data *calld = (call_data *)elem->call_data;
@ -583,7 +583,7 @@ static void finish_start_new_rpc(
return; return;
} }
calld->request_matcher = rm; calld->matcher = rm;
switch (payload_handling) { switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE: case GRPC_SRM_PAYLOAD_NONE:
@ -629,7 +629,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue; continue;
} }
finish_start_new_rpc(exec_ctx, server, elem, finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling); rm->server_registered_method->payload_handling);
return; return;
} }
@ -647,7 +647,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue; continue;
} }
finish_start_new_rpc(exec_ctx, server, elem, finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling); rm->server_registered_method->payload_handling);
return; return;
} }
@ -689,11 +689,11 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls( request_matcher_zombify_all_pending_calls(
exec_ctx, &server->unregistered_request_matcher); exec_ctx, &server->unregistered_request_matcher);
for (registered_method_t *rm = server->registered_methods; rm; for (registered_method *rm = server->registered_methods; rm;
rm = rm->next) { rm = rm->next) {
request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, request_matcher_kill_requests(exec_ctx, server, &rm->matcher,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher);
} }
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -1036,7 +1036,7 @@ void *grpc_server_register_method(
grpc_server *server, const char *method, const char *host, grpc_server *server, const char *method, const char *host,
grpc_server_register_method_payload_handling payload_handling, grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) { uint32_t flags) {
registered_method_t *m; registered_method *m;
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_server_register_method(server=%p, method=%s, host=%s, " "grpc_server_register_method(server=%p, method=%s, host=%s, "
"flags=0x%08x)", "flags=0x%08x)",
@ -1058,7 +1058,7 @@ void *grpc_server_register_method(
flags); flags);
return NULL; return NULL;
} }
m = (registered_method_t *)gpr_zalloc(sizeof(registered_method_t)); m = (registered_method *)gpr_zalloc(sizeof(registered_method));
m->method = gpr_strdup(method); m->method = gpr_strdup(method);
m->host = gpr_strdup(host); m->host = gpr_strdup(host);
m->next = server->registered_methods; m->next = server->registered_methods;
@ -1113,9 +1113,8 @@ void grpc_server_start(grpc_server *server) {
} }
request_matcher_init(&server->unregistered_request_matcher, request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server); (size_t)server->max_requested_calls_per_cq, server);
for (registered_method_t *rm = server->registered_methods; rm; for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
rm = rm->next) { request_matcher_init(&rm->matcher,
request_matcher_init(&rm->request_matcher,
(size_t)server->max_requested_calls_per_cq, server); (size_t)server->max_requested_calls_per_cq, server);
} }
@ -1142,7 +1141,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
const grpc_channel_args *args) { const grpc_channel_args *args) {
size_t num_registered_methods; size_t num_registered_methods;
size_t alloc; size_t alloc;
registered_method_t *rm; registered_method *rm;
channel_registered_method *crm; channel_registered_method *crm;
grpc_channel *channel; grpc_channel *channel;
channel_data *chand; channel_data *chand;
@ -1371,7 +1370,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
grpc_server *server, size_t cq_idx, grpc_server *server, size_t cq_idx,
requested_call *rc) { requested_call *rc) {
call_data *calld = NULL; call_data *calld = NULL;
request_matcher_t *rm = NULL; request_matcher *rm = NULL;
int request_id; int request_id;
if (gpr_atm_acq_load(&server->shutdown_flag)) { if (gpr_atm_acq_load(&server->shutdown_flag)) {
fail_call(exec_ctx, server, cq_idx, rc, fail_call(exec_ctx, server, cq_idx, rc,
@ -1392,7 +1391,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &server->unregistered_request_matcher; rm = &server->unregistered_request_matcher;
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
rm = &rc->data.registered.registered_method->request_matcher; rm = &rc->data.registered.method->matcher;
break; break;
} }
server->requested_calls_per_cq[cq_idx][request_id] = *rc; server->requested_calls_per_cq[cq_idx][request_id] = *rc;
@ -1483,7 +1482,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_call_error error; grpc_call_error error;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc)); requested_call *rc = (requested_call *)gpr_malloc(sizeof(*rc));
registered_method_t *rm = (registered_method_t *)rmp; registered_method *rm = (registered_method *)rmp;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx); GRPC_STATS_INC_SERVER_REQUESTED_CALLS(&exec_ctx);
GRPC_API_TRACE( GRPC_API_TRACE(
"grpc_server_request_registered_call(" "grpc_server_request_registered_call("
@ -1521,7 +1520,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->tag = tag; rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call; rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call; rc->call = call;
rc->data.registered.registered_method = rm; rc->data.registered.method = rm;
rc->data.registered.deadline = deadline; rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata; rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload; rc->data.registered.optional_payload = optional_payload;

Loading…
Cancel
Save