Merge pull request #12532 from yashykt/ctocc4

Getting files C++ compilable
pull/12571/head
Yash Tibrewal 7 years ago committed by GitHub
commit 561f72f418
  1. 10
      src/core/ext/filters/client_channel/channel_connectivity.c
  2. 5
      src/core/ext/filters/client_channel/client_channel.c
  3. 7
      src/core/ext/filters/client_channel/client_channel_factory.c
  4. 3
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c
  5. 193
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
  6. 6
      src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c
  7. 6
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
  8. 9
      src/core/ext/filters/client_channel/lb_policy_factory.c
  9. 37
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c
  10. 7
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c
  11. 9
      src/core/ext/filters/client_channel/retry_throttle.c
  12. 12
      src/core/ext/filters/client_channel/subchannel_index.c
  13. 6
      src/core/ext/filters/http/server/http_server_filter.c
  14. 2
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  15. 18
      src/core/ext/transport/chttp2/server/chttp2_server.c
  16. 3
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  17. 16
      src/core/ext/transport/chttp2/transport/flow_control.c
  18. 17
      src/core/ext/transport/chttp2/transport/frame_settings.c
  19. 5
      src/core/ext/transport/chttp2/transport/incoming_metadata.c
  20. 6
      src/core/ext/transport/chttp2/transport/stream_map.c
  21. 101
      src/core/ext/transport/inproc/inproc_transport.c
  22. 28
      src/core/lib/surface/completion_queue.c
  23. 35
      src/core/lib/surface/server.c
  24. 32
      src/core/lib/transport/metadata_batch.c
  25. 5
      src/core/lib/transport/transport.c
  26. 12
      test/core/end2end/tests/resource_quota_server.c

@ -86,7 +86,7 @@ static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
grpc_cq_completion *ignored) { grpc_cq_completion *ignored) {
int delete = 0; bool should_delete = false;
state_watcher *w = (state_watcher *)pw; state_watcher *w = (state_watcher *)pw;
gpr_mu_lock(&w->mu); gpr_mu_lock(&w->mu);
switch (w->phase) { switch (w->phase) {
@ -94,12 +94,12 @@ static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
case READY_TO_CALL_BACK: case READY_TO_CALL_BACK:
GPR_UNREACHABLE_CODE(return ); GPR_UNREACHABLE_CODE(return );
case CALLING_BACK_AND_FINISHED: case CALLING_BACK_AND_FINISHED:
delete = 1; should_delete = true;
break; break;
} }
gpr_mu_unlock(&w->mu); gpr_mu_unlock(&w->mu);
if (delete) { if (should_delete) {
delete_state_watcher(exec_ctx, w); delete_state_watcher(exec_ctx, w);
} }
} }
@ -161,12 +161,12 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) { grpc_error *error) {
partly_done(exec_ctx, pw, true, GRPC_ERROR_REF(error)); partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error));
} }
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw,
grpc_error *error) { grpc_error *error) {
partly_done(exec_ctx, pw, false, GRPC_ERROR_REF(error)); partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error));
} }
int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) { int grpc_channel_num_external_connectivity_watchers(grpc_channel *channel) {

@ -85,7 +85,7 @@ static void method_parameters_unref(method_parameters *method_params) {
} }
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
method_parameters_unref(value); method_parameters_unref((method_parameters *)value);
} }
static bool parse_wait_for_ready(grpc_json *field, static bool parse_wait_for_ready(grpc_json *field,
@ -717,7 +717,8 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_CREATE_FROM_STATIC_STRING( return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"client channel factory arg must be a pointer"); "client channel factory arg must be a pointer");
} }
grpc_client_channel_factory_ref(arg->value.pointer.p); grpc_client_channel_factory_ref(
(grpc_client_channel_factory *)arg->value.pointer.p);
chand->client_channel_factory = chand->client_channel_factory =
(grpc_client_channel_factory *)arg->value.pointer.p; (grpc_client_channel_factory *)arg->value.pointer.p;
// Get server name to resolve, using proxy mapper if needed. // Get server name to resolve, using proxy mapper if needed.

@ -43,14 +43,13 @@ grpc_channel* grpc_client_channel_factory_create_channel(
} }
static void* factory_arg_copy(void* factory) { static void* factory_arg_copy(void* factory) {
grpc_client_channel_factory_ref(factory); grpc_client_channel_factory_ref((grpc_client_channel_factory*)factory);
return factory; return factory;
} }
static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) { static void factory_arg_destroy(grpc_exec_ctx* exec_ctx, void* factory) {
// TODO(roth): Remove local exec_ctx when grpc_client_channel_factory_unref(exec_ctx,
// https://github.com/grpc/grpc/pull/8705 is merged. (grpc_client_channel_factory*)factory);
grpc_client_channel_factory_unref(exec_ctx, factory);
} }
static int factory_arg_cmp(void* factory1, void* factory2) { static int factory_arg_cmp(void* factory1, void* factory2) {

@ -75,7 +75,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(args->context != NULL); GPR_ASSERT(args->context != NULL);
GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL);
calld->client_stats = grpc_grpclb_client_stats_ref( calld->client_stats = grpc_grpclb_client_stats_ref(
args->context[GRPC_GRPCLB_CLIENT_STATS].value); (grpc_grpclb_client_stats *)args->context[GRPC_GRPCLB_CLIENT_STATS]
.value);
// Record call started. // Record call started.
grpc_grpclb_client_stats_add_call_started(calld->client_stats); grpc_grpclb_client_stats_add_call_started(calld->client_stats);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;

@ -139,7 +139,7 @@ static grpc_error *initial_metadata_add_lb_token(
} }
static void destroy_client_stats(void *arg) { static void destroy_client_stats(void *arg) {
grpc_grpclb_client_stats_unref(arg); grpc_grpclb_client_stats_unref((grpc_grpclb_client_stats *)arg);
} }
typedef struct wrapped_rr_closure_arg { typedef struct wrapped_rr_closure_arg {
@ -287,7 +287,7 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) {
* glb_lb_policy * glb_lb_policy
*/ */
typedef struct rr_connectivity_data rr_connectivity_data; typedef struct rr_connectivity_data rr_connectivity_data;
static const grpc_lb_policy_vtable glb_lb_policy_vtable;
typedef struct glb_lb_policy { typedef struct glb_lb_policy {
/** base policy: must be first */ /** base policy: must be first */
grpc_lb_policy base; grpc_lb_policy base;
@ -784,7 +784,7 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy,
/* Allocate the data for the tracking of the new RR policy's connectivity. /* Allocate the data for the tracking of the new RR policy's connectivity.
* It'll be deallocated in glb_rr_connectivity_changed() */ * It'll be deallocated in glb_rr_connectivity_changed() */
rr_connectivity_data *rr_connectivity = rr_connectivity_data *rr_connectivity =
gpr_zalloc(sizeof(rr_connectivity_data)); (rr_connectivity_data *)gpr_zalloc(sizeof(rr_connectivity_data));
GRPC_CLOSURE_INIT(&rr_connectivity->on_change, GRPC_CLOSURE_INIT(&rr_connectivity->on_change,
glb_rr_connectivity_changed_locked, rr_connectivity, glb_rr_connectivity_changed_locked, rr_connectivity,
grpc_combiner_scheduler(glb_policy->base.combiner)); grpc_combiner_scheduler(glb_policy->base.combiner));
@ -924,7 +924,8 @@ static grpc_channel_args *build_lb_channel_args(
grpc_lb_addresses *lb_addresses = grpc_lb_addresses *lb_addresses =
grpc_lb_addresses_create(num_grpclb_addrs, NULL); grpc_lb_addresses_create(num_grpclb_addrs, NULL);
grpc_slice_hash_table_entry *targets_info_entries = grpc_slice_hash_table_entry *targets_info_entries =
gpr_zalloc(sizeof(*targets_info_entries) * num_grpclb_addrs); (grpc_slice_hash_table_entry *)gpr_zalloc(sizeof(*targets_info_entries) *
num_grpclb_addrs);
size_t lb_addresses_idx = 0; size_t lb_addresses_idx = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) { for (size_t i = 0; i < addresses->num_addresses; ++i) {
@ -966,97 +967,6 @@ static grpc_channel_args *build_lb_channel_args(
return result; return result;
} }
static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
void *arg,
grpc_error *error);
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
return NULL;
}
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
glb_policy->server_name);
}
grpc_uri_destroy(uri);
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg =
grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
char *uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
return &glb_policy->base;
}
static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol; glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_picks == NULL);
@ -1385,7 +1295,8 @@ static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx,
static bool load_report_counters_are_zero(grpc_grpclb_request *request) { static bool load_report_counters_are_zero(grpc_grpclb_request *request) {
grpc_grpclb_dropped_call_counts *drop_entries = grpc_grpclb_dropped_call_counts *drop_entries =
request->client_stats.calls_finished_with_drop.arg; (grpc_grpclb_dropped_call_counts *)
request->client_stats.calls_finished_with_drop.arg;
return request->client_stats.num_calls_started == 0 && return request->client_stats.num_calls_started == 0 &&
request->client_stats.num_calls_finished == 0 && request->client_stats.num_calls_finished == 0 &&
request->client_stats.num_calls_finished_with_client_failed_to_send == request->client_stats.num_calls_finished_with_client_failed_to_send ==
@ -1757,7 +1668,7 @@ static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, static void lb_on_fallback_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
glb_lb_policy *glb_policy = arg; glb_lb_policy *glb_policy = (glb_lb_policy *)arg;
/* If we receive a serverlist after the timer fires but before this callback /* If we receive a serverlist after the timer fires but before this callback
* actually runs, don't do anything. */ * actually runs, don't do anything. */
if (glb_policy->serverlist != NULL) return; if (glb_policy->serverlist != NULL) return;
@ -1985,6 +1896,94 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_notify_on_state_change_locked, glb_notify_on_state_change_locked,
glb_update_locked}; glb_update_locked};
static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
return NULL;
}
grpc_lb_addresses *addresses = (grpc_lb_addresses *)arg->value.pointer.p;
size_t num_grpclb_addrs = 0;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) ++num_grpclb_addrs;
}
if (num_grpclb_addrs == 0) return NULL;
glb_lb_policy *glb_policy = (glb_lb_policy *)gpr_zalloc(sizeof(*glb_policy));
/* Get server name. */
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "Will use '%s' as the server name for LB request.",
glb_policy->server_name);
}
grpc_uri_destroy(uri);
glb_policy->cc_factory = args->client_channel_factory;
GPR_ASSERT(glb_policy->cc_factory != NULL);
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
glb_policy->lb_call_timeout_ms =
grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX});
arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
glb_policy->lb_fallback_timeout_ms = grpc_channel_arg_get_integer(
arg, (grpc_integer_options){GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0,
INT_MAX});
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
grpc_arg new_arg =
grpc_channel_arg_string_create(GRPC_ARG_LB_POLICY_NAME, "grpclb");
static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
glb_policy->args = grpc_channel_args_copy_and_add_and_remove(
args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
/* Extract the backend addresses (may be empty) from the resolver for
* fallback. */
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
/* Create a client channel over them to communicate with a LB service */
glb_policy->response_generator =
grpc_fake_resolver_response_generator_create();
grpc_channel_args *lb_channel_args = build_lb_channel_args(
exec_ctx, addresses, glb_policy->response_generator, args->args);
char *uri_str;
gpr_asprintf(&uri_str, "fake:///%s", glb_policy->server_name);
glb_policy->lb_channel = grpc_lb_policy_grpclb_create_lb_channel(
exec_ctx, uri_str, args->client_channel_factory, lb_channel_args);
/* Propagate initial resolution */
grpc_fake_resolver_response_generator_set_response(
exec_ctx, glb_policy->response_generator, lb_channel_args);
grpc_channel_args_destroy(exec_ctx, lb_channel_args);
gpr_free(uri_str);
if (glb_policy->lb_channel == NULL) {
gpr_free((void *)glb_policy->server_name);
grpc_channel_args_destroy(exec_ctx, glb_policy->args);
gpr_free(glb_policy);
return NULL;
}
grpc_subchannel_index_ref();
GRPC_CLOSURE_INIT(&glb_policy->lb_channel_on_connectivity_changed,
glb_lb_channel_on_connectivity_changed_cb, glb_policy,
grpc_combiner_scheduler(args->combiner));
grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner);
grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE,
"grpclb");
return &glb_policy->base;
}
static void glb_factory_ref(grpc_lb_policy_factory *factory) {} static void glb_factory_ref(grpc_lb_policy_factory *factory) {}
static void glb_factory_unref(grpc_lb_policy_factory *factory) {} static void glb_factory_unref(grpc_lb_policy_factory *factory) {}

@ -148,7 +148,8 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { void grpc_grpclb_request_destroy(grpc_grpclb_request *request) {
if (request->has_client_stats) { if (request->has_client_stats) {
grpc_grpclb_dropped_call_counts *drop_entries = grpc_grpclb_dropped_call_counts *drop_entries =
request->client_stats.calls_finished_with_drop.arg; (grpc_grpclb_dropped_call_counts *)
request->client_stats.calls_finished_with_drop.arg;
grpc_grpclb_dropped_call_counts_destroy(drop_entries); grpc_grpclb_dropped_call_counts_destroy(drop_entries);
} }
gpr_free(request); gpr_free(request);
@ -170,7 +171,8 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse(
if (!res.has_initial_response) return NULL; if (!res.has_initial_response) return NULL;
grpc_grpclb_initial_response *initial_res = grpc_grpclb_initial_response *initial_res =
gpr_malloc(sizeof(grpc_grpclb_initial_response)); (grpc_grpclb_initial_response *)gpr_malloc(
sizeof(grpc_grpclb_initial_response));
memcpy(initial_res, &res.initial_response, memcpy(initial_res, &res.initial_response,
sizeof(grpc_grpclb_initial_response)); sizeof(grpc_grpclb_initial_response));

@ -331,8 +331,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
(void *)p, (unsigned long)addresses->num_addresses); (void *)p, (unsigned long)addresses->num_addresses);
} }
grpc_subchannel_args *sc_args = grpc_subchannel_args *sc_args = (grpc_subchannel_args *)gpr_zalloc(
gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to /* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */ * subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@ -404,7 +404,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
} }
/* Create the subchannels for the new subchannel args/addresses. */ /* Create the subchannels for the new subchannel args/addresses. */
grpc_subchannel **new_subchannels = grpc_subchannel **new_subchannels =
gpr_zalloc(sizeof(*new_subchannels) * sc_args_count); (grpc_subchannel **)gpr_zalloc(sizeof(*new_subchannels) * sc_args_count);
size_t num_new_subchannels = 0; size_t num_new_subchannels = 0;
for (size_t i = 0; i < sc_args_count; i++) { for (size_t i = 0; i < sc_args_count; i++) {
grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel(

@ -126,13 +126,14 @@ void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx,
} }
static void* lb_addresses_copy(void* addresses) { static void* lb_addresses_copy(void* addresses) {
return grpc_lb_addresses_copy(addresses); return grpc_lb_addresses_copy((grpc_lb_addresses*)addresses);
} }
static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) {
grpc_lb_addresses_destroy(exec_ctx, addresses); grpc_lb_addresses_destroy(exec_ctx, (grpc_lb_addresses*)addresses);
} }
static int lb_addresses_cmp(void* addresses1, void* addresses2) { static int lb_addresses_cmp(void* addresses1, void* addresses2) {
return grpc_lb_addresses_cmp(addresses1, addresses2); return grpc_lb_addresses_cmp((grpc_lb_addresses*)addresses1,
(grpc_lb_addresses*)addresses2);
} }
static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = { static const grpc_arg_pointer_vtable lb_addresses_arg_vtable = {
lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp}; lb_addresses_copy, lb_addresses_destroy, lb_addresses_cmp};
@ -149,7 +150,7 @@ grpc_lb_addresses* grpc_lb_addresses_find_channel_arg(
grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES); grpc_channel_args_find(channel_args, GRPC_ARG_LB_ADDRESSES);
if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER) if (lb_addresses_arg == NULL || lb_addresses_arg->type != GRPC_ARG_POINTER)
return NULL; return NULL;
return lb_addresses_arg->value.pointer.p; return (grpc_lb_addresses*)lb_addresses_arg->value.pointer.p;
} }
void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) { void grpc_lb_policy_factory_ref(grpc_lb_policy_factory* factory) {

@ -38,7 +38,7 @@ typedef struct fd_node {
/** the owner of this fd node */ /** the owner of this fd node */
grpc_ares_ev_driver *ev_driver; grpc_ares_ev_driver *ev_driver;
/** the grpc_fd owned by this fd node */ /** the grpc_fd owned by this fd node */
grpc_fd *grpc_fd; grpc_fd *fd;
/** a closure wrapping on_readable_cb, which should be invoked when the /** a closure wrapping on_readable_cb, which should be invoked when the
grpc_fd in this node becomes readable. */ grpc_fd in this node becomes readable. */
grpc_closure read_closure; grpc_closure read_closure;
@ -96,15 +96,15 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver *ev_driver) {
} }
static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(!fdn->writable_registered);
gpr_mu_destroy(&fdn->mu); gpr_mu_destroy(&fdn->mu);
grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->fd);
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, grpc_fd_orphan(exec_ctx, fdn->fd, NULL, NULL, true /* already_closed */,
"c-ares query finished"); "c-ares query finished");
gpr_free(fdn); gpr_free(fdn);
} }
@ -150,9 +150,8 @@ void grpc_ares_ev_driver_shutdown(grpc_exec_ctx *exec_ctx,
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
fd_node *fn = ev_driver->fds; fd_node *fn = ev_driver->fds;
while (fn != NULL) { while (fn != NULL) {
grpc_fd_shutdown( grpc_fd_shutdown(exec_ctx, fn->fd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
exec_ctx, fn->grpc_fd, "grpc_ares_ev_driver_shutdown"));
GRPC_ERROR_CREATE_FROM_STATIC_STRING("grpc_ares_ev_driver_shutdown"));
fn = fn->next; fn = fn->next;
} }
gpr_mu_unlock(&ev_driver->mu); gpr_mu_unlock(&ev_driver->mu);
@ -165,7 +164,7 @@ static fd_node *pop_fd_node(fd_node **head, int fd) {
dummy_head.next = *head; dummy_head.next = *head;
fd_node *node = &dummy_head; fd_node *node = &dummy_head;
while (node->next != NULL) { while (node->next != NULL) {
if (grpc_fd_wrapped_fd(node->next->grpc_fd) == fd) { if (grpc_fd_wrapped_fd(node->next->fd) == fd) {
fd_node *ret = node->next; fd_node *ret = node->next;
node->next = node->next->next; node->next = node->next->next;
*head = dummy_head.next; *head = dummy_head.next;
@ -184,9 +183,9 @@ static void on_readable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->readable_registered = false; fdn->readable_registered = false;
gpr_mu_unlock(&fdn->mu); gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); gpr_log(GPR_DEBUG, "readable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->grpc_fd), ares_process_fd(ev_driver->channel, grpc_fd_wrapped_fd(fdn->fd),
ARES_SOCKET_BAD); ARES_SOCKET_BAD);
} else { } else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
@ -211,10 +210,10 @@ static void on_writable_cb(grpc_exec_ctx *exec_ctx, void *arg,
fdn->writable_registered = false; fdn->writable_registered = false;
gpr_mu_unlock(&fdn->mu); gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->grpc_fd)); gpr_log(GPR_DEBUG, "writable on %d", grpc_fd_wrapped_fd(fdn->fd));
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD,
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_wrapped_fd(fdn->fd));
} else { } else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled // timed out. The pending lookups made on this ev_driver will be cancelled
@ -253,7 +252,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i);
fdn = (fd_node *)gpr_malloc(sizeof(fd_node)); fdn = (fd_node *)gpr_malloc(sizeof(fd_node));
gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]);
fdn->grpc_fd = grpc_fd_create(socks[i], fd_name); fdn->fd = grpc_fd_create(socks[i], fd_name);
fdn->ev_driver = ev_driver; fdn->ev_driver = ev_driver;
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
@ -262,8 +261,7 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, grpc_pollset_set_add_fd(exec_ctx, ev_driver->pollset_set, fdn->fd);
fdn->grpc_fd);
gpr_free(fd_name); gpr_free(fd_name);
} }
fdn->next = new_list; fdn->next = new_list;
@ -274,9 +272,8 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_READABLE(socks_bitmask, i) && if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
!fdn->readable_registered) { !fdn->readable_registered) {
grpc_ares_ev_driver_ref(ev_driver); grpc_ares_ev_driver_ref(ev_driver);
gpr_log(GPR_DEBUG, "notify read on: %d", gpr_log(GPR_DEBUG, "notify read on: %d", grpc_fd_wrapped_fd(fdn->fd));
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_notify_on_read(exec_ctx, fdn->fd, &fdn->read_closure);
grpc_fd_notify_on_read(exec_ctx, fdn->grpc_fd, &fdn->read_closure);
fdn->readable_registered = true; fdn->readable_registered = true;
} }
// Register write_closure if the socket is writable and write_closure // Register write_closure if the socket is writable and write_closure
@ -284,9 +281,9 @@ static void grpc_ares_notify_on_event_locked(grpc_exec_ctx *exec_ctx,
if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) && if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
!fdn->writable_registered) { !fdn->writable_registered) {
gpr_log(GPR_DEBUG, "notify write on: %d", gpr_log(GPR_DEBUG, "notify write on: %d",
grpc_fd_wrapped_fd(fdn->grpc_fd)); grpc_fd_wrapped_fd(fdn->fd));
grpc_ares_ev_driver_ref(ev_driver); grpc_ares_ev_driver_ref(ev_driver);
grpc_fd_notify_on_write(exec_ctx, fdn->grpc_fd, &fdn->write_closure); grpc_fd_notify_on_write(exec_ctx, fdn->fd, &fdn->write_closure);
fdn->writable_registered = true; fdn->writable_registered = true;
} }
gpr_mu_unlock(&fdn->mu); gpr_mu_unlock(&fdn->mu);

@ -123,8 +123,8 @@ static void grpc_ares_request_unref(grpc_exec_ctx *exec_ctx,
static grpc_ares_hostbyname_request *create_hostbyname_request( static grpc_ares_hostbyname_request *create_hostbyname_request(
grpc_ares_request *parent_request, char *host, uint16_t port, grpc_ares_request *parent_request, char *host, uint16_t port,
bool is_balancer) { bool is_balancer) {
grpc_ares_hostbyname_request *hr = grpc_ares_hostbyname_request *hr = (grpc_ares_hostbyname_request *)gpr_zalloc(
gpr_zalloc(sizeof(grpc_ares_hostbyname_request)); sizeof(grpc_ares_hostbyname_request));
hr->parent_request = parent_request; hr->parent_request = parent_request;
hr->host = gpr_strdup(host); hr->host = gpr_strdup(host);
hr->port = port; hr->port = port;
@ -527,7 +527,8 @@ static void grpc_resolve_address_ares_impl(grpc_exec_ctx *exec_ctx,
grpc_closure *on_done, grpc_closure *on_done,
grpc_resolved_addresses **addrs) { grpc_resolved_addresses **addrs) {
grpc_resolve_address_ares_request *r = grpc_resolve_address_ares_request *r =
gpr_zalloc(sizeof(grpc_resolve_address_ares_request)); (grpc_resolve_address_ares_request *)gpr_zalloc(
sizeof(grpc_resolve_address_ares_request));
r->addrs_out = addrs; r->addrs_out = addrs;
r->on_resolve_address_done = on_done; r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,

@ -99,7 +99,7 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
int max_milli_tokens, int milli_token_ratio, int max_milli_tokens, int milli_token_ratio,
grpc_server_retry_throttle_data* old_throttle_data) { grpc_server_retry_throttle_data* old_throttle_data) {
grpc_server_retry_throttle_data* throttle_data = grpc_server_retry_throttle_data* throttle_data =
gpr_malloc(sizeof(*throttle_data)); (grpc_server_retry_throttle_data*)gpr_malloc(sizeof(*throttle_data));
memset(throttle_data, 0, sizeof(*throttle_data)); memset(throttle_data, 0, sizeof(*throttle_data));
gpr_ref_init(&throttle_data->refs, 1); gpr_ref_init(&throttle_data->refs, 1);
throttle_data->max_milli_tokens = max_milli_tokens; throttle_data->max_milli_tokens = max_milli_tokens;
@ -131,11 +131,11 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create(
// //
static void* copy_server_name(void* key, void* unused) { static void* copy_server_name(void* key, void* unused) {
return gpr_strdup(key); return gpr_strdup((const char*)key);
} }
static long compare_server_name(void* key1, void* key2, void* unused) { static long compare_server_name(void* key1, void* key2, void* unused) {
return strcmp(key1, key2); return strcmp((const char*)key1, (const char*)key2);
} }
static void destroy_server_retry_throttle_data(void* value, void* unused) { static void destroy_server_retry_throttle_data(void* value, void* unused) {
@ -177,7 +177,8 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server(
const char* server_name, int max_milli_tokens, int milli_token_ratio) { const char* server_name, int max_milli_tokens, int milli_token_ratio) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
grpc_server_retry_throttle_data* throttle_data = grpc_server_retry_throttle_data* throttle_data =
gpr_avl_get(g_avl, (char*)server_name, NULL); (grpc_server_retry_throttle_data*)gpr_avl_get(g_avl, (char*)server_name,
NULL);
if (throttle_data == NULL) { if (throttle_data == NULL) {
// Entry not found. Create a new one. // Entry not found. Create a new one.
throttle_data = grpc_server_retry_throttle_data_create( throttle_data = grpc_server_retry_throttle_data_create(

@ -90,24 +90,26 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx,
static void sck_avl_destroy(void *p, void *user_data) { static void sck_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
grpc_subchannel_key_destroy(exec_ctx, p); grpc_subchannel_key_destroy(exec_ctx, (grpc_subchannel_key *)p);
} }
static void *sck_avl_copy(void *p, void *unused) { static void *sck_avl_copy(void *p, void *unused) {
return subchannel_key_copy(p); return subchannel_key_copy((grpc_subchannel_key *)p);
} }
static long sck_avl_compare(void *a, void *b, void *unused) { static long sck_avl_compare(void *a, void *b, void *unused) {
return grpc_subchannel_key_compare(a, b); return grpc_subchannel_key_compare((grpc_subchannel_key *)a,
(grpc_subchannel_key *)b);
} }
static void scv_avl_destroy(void *p, void *user_data) { static void scv_avl_destroy(void *p, void *user_data) {
grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data;
GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, (grpc_subchannel *)p,
"subchannel_index");
} }
static void *scv_avl_copy(void *p, void *unused) { static void *scv_avl_copy(void *p, void *unused) {
GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel *)p, "subchannel_index");
return p; return p;
} }

@ -83,12 +83,12 @@ static grpc_error *server_filter_outgoing_metadata(grpc_exec_ctx *exec_ctx,
} }
static void add_error(const char *error_name, grpc_error **cumulative, static void add_error(const char *error_name, grpc_error **cumulative,
grpc_error *new) { grpc_error *new_err) {
if (new == GRPC_ERROR_NONE) return; if (new_err == GRPC_ERROR_NONE) return;
if (*cumulative == GRPC_ERROR_NONE) { if (*cumulative == GRPC_ERROR_NONE) {
*cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name); *cumulative = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_name);
} }
*cumulative = grpc_error_add_child(*cumulative, new); *cumulative = grpc_error_add_child(*cumulative, new_err);
} }
static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,

@ -161,7 +161,7 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error)); grpc_endpoint_shutdown(exec_ctx, c->endpoint, GRPC_ERROR_REF(error));
} }
gpr_mu_unlock(&c->mu); gpr_mu_unlock(&c->mu);
chttp2_connector_unref(exec_ctx, arg); chttp2_connector_unref(exec_ctx, (grpc_connector *)arg);
} else { } else {
GPR_ASSERT(c->endpoint != NULL); GPR_ASSERT(c->endpoint != NULL);
start_handshake_locked(exec_ctx, c); start_handshake_locked(exec_ctx, c);

@ -52,7 +52,7 @@ typedef struct {
} server_state; } server_state;
typedef struct { typedef struct {
server_state *server_state; server_state *svr_state;
grpc_pollset *accepting_pollset; grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor; grpc_tcp_server_acceptor *acceptor;
grpc_handshake_manager *handshake_mgr; grpc_handshake_manager *handshake_mgr;
@ -63,8 +63,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_handshaker_args *args = (grpc_handshaker_args *)arg; grpc_handshaker_args *args = (grpc_handshaker_args *)arg;
server_connection_state *connection_state = server_connection_state *connection_state =
(server_connection_state *)args->user_data; (server_connection_state *)args->user_data;
gpr_mu_lock(&connection_state->server_state->mu); gpr_mu_lock(&connection_state->svr_state->mu);
if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char *error_str = grpc_error_string(error); const char *error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
@ -89,7 +89,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport *transport = grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0); grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
grpc_server_setup_transport( grpc_server_setup_transport(
exec_ctx, connection_state->server_state->server, transport, exec_ctx, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args); connection_state->accepting_pollset, args->args);
grpc_chttp2_transport_start_reading(exec_ctx, transport, grpc_chttp2_transport_start_reading(exec_ctx, transport,
args->read_buffer); args->read_buffer);
@ -97,11 +97,11 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
} }
} }
grpc_handshake_manager_pending_list_remove( grpc_handshake_manager_pending_list_remove(
&connection_state->server_state->pending_handshake_mgrs, &connection_state->svr_state->pending_handshake_mgrs,
connection_state->handshake_mgr); connection_state->handshake_mgr);
gpr_mu_unlock(&connection_state->server_state->mu); gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor); gpr_free(connection_state->acceptor);
gpr_free(connection_state); gpr_free(connection_state);
} }
@ -124,8 +124,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
grpc_tcp_server_ref(state->tcp_server); grpc_tcp_server_ref(state->tcp_server);
server_connection_state *connection_state = server_connection_state *connection_state =
gpr_malloc(sizeof(*connection_state)); (server_connection_state *)gpr_malloc(sizeof(*connection_state));
connection_state->server_state = state; connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset; connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor; connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr; connection_state->handshake_mgr = handshake_mgr;

@ -2912,7 +2912,8 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) { uint32_t frame_size, uint32_t flags) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream = grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
gpr_malloc(sizeof(*incoming_byte_stream)); (grpc_chttp2_incoming_byte_stream *)gpr_malloc(
sizeof(*incoming_byte_stream));
incoming_byte_stream->base.length = frame_size; incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.flags = flags;

@ -60,24 +60,24 @@ static void pretrace(shadow_flow_control* shadow_fc,
#define TRACE_PADDING 30 #define TRACE_PADDING 30
static char* fmt_int64_diff_str(int64_t old, int64_t new) { static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
char* str; char* str;
if (old != new) { if (old_val != new_val) {
gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old, new); gpr_asprintf(&str, "%" PRId64 " -> %" PRId64 "", old_val, new_val);
} else { } else {
gpr_asprintf(&str, "%" PRId64 "", old); gpr_asprintf(&str, "%" PRId64 "", old_val);
} }
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str); gpr_free(str);
return str_lp; return str_lp;
} }
static char* fmt_uint32_diff_str(uint32_t old, uint32_t new) { static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
char* str; char* str;
if (new > 0 && old != new) { if (new_val > 0 && old_val != new_val) {
gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old, new); gpr_asprintf(&str, "%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
} else { } else {
gpr_asprintf(&str, "%" PRIu32 "", old); gpr_asprintf(&str, "%" PRIu32 "", old_val);
} }
char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING); char* str_lp = gpr_leftpad(str, ' ', TRACE_PADDING);
gpr_free(str); gpr_free(str);

@ -44,7 +44,8 @@ static uint8_t *fill_header(uint8_t *out, uint32_t length, uint8_t flags) {
return out; return out;
} }
grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, grpc_slice grpc_chttp2_settings_create(uint32_t *old_settings,
const uint32_t *new_settings,
uint32_t force_mask, size_t count) { uint32_t force_mask, size_t count) {
size_t i; size_t i;
uint32_t n = 0; uint32_t n = 0;
@ -52,21 +53,21 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new,
uint8_t *p; uint8_t *p;
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); n += (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0);
} }
output = GRPC_SLICE_MALLOC(9 + 6 * n); output = GRPC_SLICE_MALLOC(9 + 6 * n);
p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0);
for (i = 0; i < count; i++) { for (i = 0; i < count; i++) {
if (new[i] != old[i] || (force_mask & (1u << i)) != 0) { if (new_settings[i] != old_settings[i] || (force_mask & (1u << i)) != 0) {
*p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8); *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i] >> 8);
*p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]); *p++ = (uint8_t)(grpc_setting_id_to_wire_id[i]);
*p++ = (uint8_t)(new[i] >> 24); *p++ = (uint8_t)(new_settings[i] >> 24);
*p++ = (uint8_t)(new[i] >> 16); *p++ = (uint8_t)(new_settings[i] >> 16);
*p++ = (uint8_t)(new[i] >> 8); *p++ = (uint8_t)(new_settings[i] >> 8);
*p++ = (uint8_t)(new[i]); *p++ = (uint8_t)(new_settings[i]);
old[i] = new[i]; old_settings[i] = new_settings[i];
} }
} }

@ -42,8 +42,9 @@ grpc_error *grpc_chttp2_incoming_metadata_buffer_add(
grpc_mdelem elem) { grpc_mdelem elem) {
buffer->size += GRPC_MDELEM_LENGTH(elem); buffer->size += GRPC_MDELEM_LENGTH(elem);
return grpc_metadata_batch_add_tail( return grpc_metadata_batch_add_tail(
exec_ctx, &buffer->batch, exec_ctx, &buffer->batch, (grpc_linked_mdelem *)gpr_arena_alloc(
gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); buffer->arena, sizeof(grpc_linked_mdelem)),
elem);
} }
grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add(

@ -72,8 +72,10 @@ void grpc_chttp2_stream_map_add(grpc_chttp2_stream_map *map, uint32_t key,
/* resize when less than 25% of the table is free, because compaction /* resize when less than 25% of the table is free, because compaction
won't help much */ won't help much */
map->capacity = capacity = 3 * capacity / 2; map->capacity = capacity = 3 * capacity / 2;
map->keys = keys = gpr_realloc(keys, capacity * sizeof(uint32_t)); map->keys = keys =
map->values = values = gpr_realloc(values, capacity * sizeof(void *)); (uint32_t *)gpr_realloc(keys, capacity * sizeof(uint32_t));
map->values = values =
(void **)gpr_realloc(values, capacity * sizeof(void *));
} }
} }

@ -37,7 +37,6 @@
if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \
} while (0) } while (0)
static const grpc_transport_vtable inproc_vtable;
static grpc_slice g_empty_slice; static grpc_slice g_empty_slice;
static grpc_slice g_fake_path_key; static grpc_slice g_fake_path_key;
static grpc_slice g_fake_path_value; static grpc_slice g_fake_path_value;
@ -1166,6 +1165,55 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
unref_transport(exec_ctx, t); unref_transport(exec_ctx, t);
} }
/*******************************************************************************
* INTEGRATION GLUE
*/
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
// Nothing to do here
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
// Nothing to do here
}
static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return NULL;
}
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
void grpc_inproc_transport_init(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(NULL, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
g_fake_path_key = grpc_slice_intern(key_tmp);
grpc_slice_unref_internal(&exec_ctx, key_tmp);
g_fake_path_value = grpc_slice_from_static_string("/");
grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
g_fake_auth_key = grpc_slice_intern(auth_tmp);
grpc_slice_unref_internal(&exec_ctx, auth_tmp);
g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
grpc_exec_ctx_finish(&exec_ctx);
}
static const grpc_transport_vtable inproc_vtable = {
sizeof(inproc_stream), "inproc", init_stream,
set_pollset, set_pollset_set, perform_stream_op,
perform_transport_op, destroy_stream, destroy_transport,
get_endpoint};
/******************************************************************************* /*******************************************************************************
* Main inproc transport functions * Main inproc transport functions
*/ */
@ -1178,7 +1226,7 @@ static void inproc_transports_create(grpc_exec_ctx *exec_ctx,
inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st)); inproc_transport *st = (inproc_transport *)gpr_zalloc(sizeof(*st));
inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct)); inproc_transport *ct = (inproc_transport *)gpr_zalloc(sizeof(*ct));
// Share one lock between both sides since both sides get affected // Share one lock between both sides since both sides get affected
st->mu = ct->mu = gpr_malloc(sizeof(*st->mu)); st->mu = ct->mu = (shared_mu *)gpr_malloc(sizeof(*st->mu));
gpr_mu_init(&st->mu->mu); gpr_mu_init(&st->mu->mu);
gpr_ref_init(&st->mu->refs, 2); gpr_ref_init(&st->mu->refs, 2);
st->base.vtable = &inproc_vtable; st->base.vtable = &inproc_vtable;
@ -1240,55 +1288,6 @@ grpc_channel *grpc_inproc_channel_create(grpc_server *server,
return channel; return channel;
} }
/*******************************************************************************
* INTEGRATION GLUE
*/
static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {
// Nothing to do here
}
static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset_set *pollset_set) {
// Nothing to do here
}
static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
return NULL;
}
static const grpc_transport_vtable inproc_vtable = {
sizeof(inproc_stream), "inproc", init_stream,
set_pollset, set_pollset_set, perform_stream_op,
perform_transport_op, destroy_stream, destroy_transport,
get_endpoint};
/*******************************************************************************
* GLOBAL INIT AND DESTROY
*/
static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {}
void grpc_inproc_transport_init(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL,
grpc_schedule_on_exec_ctx);
g_empty_slice = grpc_slice_from_static_buffer(NULL, 0);
grpc_slice key_tmp = grpc_slice_from_static_string(":path");
g_fake_path_key = grpc_slice_intern(key_tmp);
grpc_slice_unref_internal(&exec_ctx, key_tmp);
g_fake_path_value = grpc_slice_from_static_string("/");
grpc_slice auth_tmp = grpc_slice_from_static_string(":authority");
g_fake_auth_key = grpc_slice_intern(auth_tmp);
grpc_slice_unref_internal(&exec_ctx, auth_tmp);
g_fake_auth_value = grpc_slice_from_static_string("inproc-fail");
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_inproc_transport_shutdown(void) { void grpc_inproc_transport_shutdown(void) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_slice_unref_internal(&exec_ctx, g_empty_slice); grpc_slice_unref_internal(&exec_ctx, g_empty_slice);

@ -580,12 +580,12 @@ static bool atm_inc_if_nonzero(gpr_atm *counter) {
} }
static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) { static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events); return atm_inc_if_nonzero(&cqd->pending_events);
} }
static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) { static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
return atm_inc_if_nonzero(&cqd->pending_events); return atm_inc_if_nonzero(&cqd->pending_events);
} }
@ -630,7 +630,7 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
} }
} }
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE); int is_success = (error == GRPC_ERROR_NONE);
storage->tag = tag; storage->tag = tag;
@ -691,7 +691,7 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
void *done_arg, void *done_arg,
grpc_cq_completion *storage), grpc_cq_completion *storage),
void *done_arg, grpc_cq_completion *storage) { void *done_arg, grpc_cq_completion *storage) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
int is_success = (error == GRPC_ERROR_NONE); int is_success = (error == GRPC_ERROR_NONE);
GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0); GPR_TIMER_BEGIN("cq_end_op_for_pluck", 0);
@ -774,7 +774,7 @@ typedef struct {
static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) { static bool cq_is_next_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg; cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq; grpc_completion_queue *cq = a->cq;
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL); GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever = gpr_atm current_last_seen_things_queued_ever =
@ -825,7 +825,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
void *reserved) { void *reserved) {
grpc_event ret; grpc_event ret;
gpr_timespec now; gpr_timespec now;
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@ -958,7 +958,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
this function */ this function */
static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) { grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0); GPR_ASSERT(gpr_atm_no_barrier_load(&cqd->pending_events) == 0);
@ -969,7 +969,7 @@ static void cq_finish_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) { grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq); cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because: /* Need an extra ref for cq here because:
* We call cq_finish_shutdown_next() below, that would call pollset shutdown. * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
@ -999,7 +999,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
static int add_plucker(grpc_completion_queue *cq, void *tag, static int add_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) { grpc_pollset_worker **worker) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
return 0; return 0;
} }
@ -1011,7 +1011,7 @@ static int add_plucker(grpc_completion_queue *cq, void *tag,
static void del_plucker(grpc_completion_queue *cq, void *tag, static void del_plucker(grpc_completion_queue *cq, void *tag,
grpc_pollset_worker **worker) { grpc_pollset_worker **worker) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
for (int i = 0; i < cqd->num_pluckers; i++) { for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) { if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
cqd->num_pluckers--; cqd->num_pluckers--;
@ -1025,7 +1025,7 @@ static void del_plucker(grpc_completion_queue *cq, void *tag,
static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) { static bool cq_is_pluck_finished(grpc_exec_ctx *exec_ctx, void *arg) {
cq_is_finished_arg *a = (cq_is_finished_arg *)arg; cq_is_finished_arg *a = (cq_is_finished_arg *)arg;
grpc_completion_queue *cq = a->cq; grpc_completion_queue *cq = a->cq;
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(a->stolen_completion == NULL); GPR_ASSERT(a->stolen_completion == NULL);
gpr_atm current_last_seen_things_queued_ever = gpr_atm current_last_seen_things_queued_ever =
@ -1062,7 +1062,7 @@ static grpc_event cq_pluck(grpc_completion_queue *cq, void *tag,
grpc_cq_completion *prev; grpc_cq_completion *prev;
grpc_pollset_worker *worker = NULL; grpc_pollset_worker *worker = NULL;
gpr_timespec now; gpr_timespec now;
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
@ -1186,7 +1186,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx, static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) { grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
GPR_ASSERT(cqd->shutdown_called); GPR_ASSERT(cqd->shutdown_called);
GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown)); GPR_ASSERT(!gpr_atm_no_barrier_load(&cqd->shutdown));
@ -1200,7 +1200,7 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
* merging them is a bit tricky and probably not worth it */ * merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx, static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) { grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq); cq_pluck_data *cqd = (cq_pluck_data *)DATA_FROM_CQ(cq);
/* Need an extra ref for cq here because: /* Need an extra ref for cq here because:
* We call cq_finish_shutdown_pluck() below, that would call pollset shutdown. * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.

@ -76,7 +76,7 @@ typedef struct requested_call {
grpc_call_details *details; grpc_call_details *details;
} batch; } batch;
struct { struct {
registered_method *registered_method; registered_method *method;
gpr_timespec *deadline; gpr_timespec *deadline;
grpc_byte_buffer **optional_payload; grpc_byte_buffer **optional_payload;
} registered; } registered;
@ -145,7 +145,7 @@ struct call_data {
uint32_t recv_initial_metadata_flags; uint32_t recv_initial_metadata_flags;
grpc_metadata_array initial_metadata; grpc_metadata_array initial_metadata;
request_matcher *request_matcher; request_matcher *matcher;
grpc_byte_buffer *payload; grpc_byte_buffer *payload;
grpc_closure got_initial_metadata; grpc_closure got_initial_metadata;
@ -171,7 +171,7 @@ struct registered_method {
grpc_server_register_method_payload_handling payload_handling; grpc_server_register_method_payload_handling payload_handling;
uint32_t flags; uint32_t flags;
/* one request matcher per method */ /* one request matcher per method */
request_matcher request_matcher; request_matcher matcher;
registered_method *next; registered_method *next;
}; };
@ -334,7 +334,7 @@ static void request_matcher_destroy(request_matcher *rm) {
static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem,
grpc_error *error) { grpc_error *error) {
grpc_call_unref(grpc_call_from_top_element(elem)); grpc_call_unref(grpc_call_from_top_element((grpc_call_element *)elem));
} }
static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
@ -387,7 +387,7 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
while ((rm = server->registered_methods) != NULL) { while ((rm = server->registered_methods) != NULL) {
server->registered_methods = rm->next; server->registered_methods = rm->next;
if (server->started) { if (server->started) {
request_matcher_destroy(&rm->request_matcher); request_matcher_destroy(&rm->matcher);
} }
gpr_free(rm->method); gpr_free(rm->method);
gpr_free(rm->host); gpr_free(rm->host);
@ -519,7 +519,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *call_elem = (grpc_call_element *)arg; grpc_call_element *call_elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)call_elem->call_data; call_data *calld = (call_data *)call_elem->call_data;
channel_data *chand = (channel_data *)call_elem->channel_data; channel_data *chand = (channel_data *)call_elem->channel_data;
request_matcher *rm = calld->request_matcher; request_matcher *rm = calld->matcher;
grpc_server *server = rm->server; grpc_server *server = rm->server;
if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) { if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
@ -583,7 +583,7 @@ static void finish_start_new_rpc(
return; return;
} }
calld->request_matcher = rm; calld->matcher = rm;
switch (payload_handling) { switch (payload_handling) {
case GRPC_SRM_PAYLOAD_NONE: case GRPC_SRM_PAYLOAD_NONE:
@ -629,7 +629,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue; continue;
} }
finish_start_new_rpc(exec_ctx, server, elem, finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling); rm->server_registered_method->payload_handling);
return; return;
} }
@ -647,7 +647,7 @@ static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
continue; continue;
} }
finish_start_new_rpc(exec_ctx, server, elem, finish_start_new_rpc(exec_ctx, server, elem,
&rm->server_registered_method->request_matcher, &rm->server_registered_method->matcher,
rm->server_registered_method->payload_handling); rm->server_registered_method->payload_handling);
return; return;
} }
@ -668,7 +668,7 @@ static int num_listeners(grpc_server *server) {
static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server, static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
grpc_cq_completion *completion) { grpc_cq_completion *completion) {
server_unref(exec_ctx, server); server_unref(exec_ctx, (grpc_server *)server);
} }
static int num_channels(grpc_server *server) { static int num_channels(grpc_server *server) {
@ -691,9 +691,9 @@ static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
exec_ctx, &server->unregistered_request_matcher); exec_ctx, &server->unregistered_request_matcher);
for (registered_method *rm = server->registered_methods; rm; for (registered_method *rm = server->registered_methods; rm;
rm = rm->next) { rm = rm->next) {
request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher, request_matcher_kill_requests(exec_ctx, server, &rm->matcher,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher); request_matcher_zombify_all_pending_calls(exec_ctx, &rm->matcher);
} }
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
@ -1114,7 +1114,7 @@ void grpc_server_start(grpc_server *server) {
request_matcher_init(&server->unregistered_request_matcher, request_matcher_init(&server->unregistered_request_matcher,
(size_t)server->max_requested_calls_per_cq, server); (size_t)server->max_requested_calls_per_cq, server);
for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { for (registered_method *rm = server->registered_methods; rm; rm = rm->next) {
request_matcher_init(&rm->request_matcher, request_matcher_init(&rm->matcher,
(size_t)server->max_requested_calls_per_cq, server); (size_t)server->max_requested_calls_per_cq, server);
} }
@ -1267,8 +1267,9 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* stay locked, and gather up some stuff to do */ /* stay locked, and gather up some stuff to do */
GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) { if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, grpc_cq_end_op(
NULL, gpr_malloc(sizeof(grpc_cq_completion))); &exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown, NULL,
(grpc_cq_completion *)gpr_malloc(sizeof(grpc_cq_completion)));
gpr_mu_unlock(&server->mu_global); gpr_mu_unlock(&server->mu_global);
goto done; goto done;
} }
@ -1390,7 +1391,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
rm = &server->unregistered_request_matcher; rm = &server->unregistered_request_matcher;
break; break;
case REGISTERED_CALL: case REGISTERED_CALL:
rm = &rc->data.registered.registered_method->request_matcher; rm = &rc->data.registered.method->matcher;
break; break;
} }
server->requested_calls_per_cq[cq_idx][request_id] = *rc; server->requested_calls_per_cq[cq_idx][request_id] = *rc;
@ -1519,7 +1520,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->tag = tag; rc->tag = tag;
rc->cq_bound_to_call = cq_bound_to_call; rc->cq_bound_to_call = cq_bound_to_call;
rc->call = call; rc->call = call;
rc->data.registered.registered_method = rm; rc->data.registered.method = rm;
rc->data.registered.deadline = deadline; rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata; rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload; rc->data.registered.optional_payload = optional_payload;

@ -233,32 +233,32 @@ void grpc_metadata_batch_remove(grpc_exec_ctx *exec_ctx,
void grpc_metadata_batch_set_value(grpc_exec_ctx *exec_ctx, void grpc_metadata_batch_set_value(grpc_exec_ctx *exec_ctx,
grpc_linked_mdelem *storage, grpc_linked_mdelem *storage,
grpc_slice value) { grpc_slice value) {
grpc_mdelem old = storage->md; grpc_mdelem old_mdelem = storage->md;
grpc_mdelem new = grpc_mdelem_from_slices( grpc_mdelem new_mdelem = grpc_mdelem_from_slices(
exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old)), value); exec_ctx, grpc_slice_ref_internal(GRPC_MDKEY(old_mdelem)), value);
storage->md = new; storage->md = new_mdelem;
GRPC_MDELEM_UNREF(exec_ctx, old); GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
} }
grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx, grpc_error *grpc_metadata_batch_substitute(grpc_exec_ctx *exec_ctx,
grpc_metadata_batch *batch, grpc_metadata_batch *batch,
grpc_linked_mdelem *storage, grpc_linked_mdelem *storage,
grpc_mdelem new) { grpc_mdelem new_mdelem) {
assert_valid_callouts(exec_ctx, batch); assert_valid_callouts(exec_ctx, batch);
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
grpc_mdelem old = storage->md; grpc_mdelem old_mdelem = storage->md;
if (!grpc_slice_eq(GRPC_MDKEY(new), GRPC_MDKEY(old))) { if (!grpc_slice_eq(GRPC_MDKEY(new_mdelem), GRPC_MDKEY(old_mdelem))) {
maybe_unlink_callout(batch, storage); maybe_unlink_callout(batch, storage);
storage->md = new; storage->md = new_mdelem;
error = maybe_link_callout(batch, storage); error = maybe_link_callout(batch, storage);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
unlink_storage(&batch->list, storage); unlink_storage(&batch->list, storage);
GRPC_MDELEM_UNREF(exec_ctx, storage->md); GRPC_MDELEM_UNREF(exec_ctx, storage->md);
} }
} else { } else {
storage->md = new; storage->md = new_mdelem;
} }
GRPC_MDELEM_UNREF(exec_ctx, old); GRPC_MDELEM_UNREF(exec_ctx, old_mdelem);
assert_valid_callouts(exec_ctx, batch); assert_valid_callouts(exec_ctx, batch);
return error; return error;
} }
@ -302,12 +302,12 @@ grpc_error *grpc_metadata_batch_filter(grpc_exec_ctx *exec_ctx,
grpc_error *error = GRPC_ERROR_NONE; grpc_error *error = GRPC_ERROR_NONE;
while (l) { while (l) {
grpc_linked_mdelem *next = l->next; grpc_linked_mdelem *next = l->next;
grpc_filtered_mdelem new = func(exec_ctx, user_data, l->md); grpc_filtered_mdelem new_mdelem = func(exec_ctx, user_data, l->md);
add_error(&error, new.error, composite_error_string); add_error(&error, new_mdelem.error, composite_error_string);
if (GRPC_MDISNULL(new.md)) { if (GRPC_MDISNULL(new_mdelem.md)) {
grpc_metadata_batch_remove(exec_ctx, batch, l); grpc_metadata_batch_remove(exec_ctx, batch, l);
} else if (new.md.payload != l->md.payload) { } else if (new_mdelem.md.payload != l->md.payload) {
grpc_metadata_batch_substitute(exec_ctx, batch, l, new.md); grpc_metadata_batch_substitute(exec_ctx, batch, l, new_mdelem.md);
} }
l = next; l = next;
} }

@ -102,8 +102,9 @@ static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) {
grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount,
void *buffer, size_t length) { void *buffer, size_t length) {
slice_stream_ref(&refcount->slice_refcount); slice_stream_ref(&refcount->slice_refcount);
return (grpc_slice){.refcount = &refcount->slice_refcount, return (grpc_slice){
.data.refcounted = {.bytes = buffer, .length = length}}; .refcount = &refcount->slice_refcount,
.data.refcounted = {.bytes = (uint8_t *)buffer, .length = length}};
} }
static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { static const grpc_slice_refcount_vtable stream_ref_slice_vtable = {

@ -136,20 +136,20 @@ void resource_quota_server(grpc_end2end_test_config config) {
grpc_call **server_calls = grpc_call **server_calls =
(grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS); (grpc_call **)malloc(sizeof(grpc_call *) * NUM_CALLS);
grpc_metadata_array *initial_metadata_recv = grpc_metadata_array *initial_metadata_recv =
malloc(sizeof(grpc_metadata_array) * NUM_CALLS); (grpc_metadata_array *)malloc(sizeof(grpc_metadata_array) * NUM_CALLS);
grpc_metadata_array *trailing_metadata_recv = grpc_metadata_array *trailing_metadata_recv =
malloc(sizeof(grpc_metadata_array) * NUM_CALLS); (grpc_metadata_array *)malloc(sizeof(grpc_metadata_array) * NUM_CALLS);
grpc_metadata_array *request_metadata_recv = grpc_metadata_array *request_metadata_recv =
malloc(sizeof(grpc_metadata_array) * NUM_CALLS); (grpc_metadata_array *)malloc(sizeof(grpc_metadata_array) * NUM_CALLS);
grpc_call_details *call_details = grpc_call_details *call_details =
malloc(sizeof(grpc_call_details) * NUM_CALLS); (grpc_call_details *)malloc(sizeof(grpc_call_details) * NUM_CALLS);
grpc_status_code *status = grpc_status_code *status =
(grpc_status_code *)malloc(sizeof(grpc_status_code) * NUM_CALLS); (grpc_status_code *)malloc(sizeof(grpc_status_code) * NUM_CALLS);
grpc_slice *details = (grpc_slice *)malloc(sizeof(grpc_slice) * NUM_CALLS); grpc_slice *details = (grpc_slice *)malloc(sizeof(grpc_slice) * NUM_CALLS);
grpc_byte_buffer **request_payload = grpc_byte_buffer **request_payload =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); (grpc_byte_buffer **)malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
grpc_byte_buffer **request_payload_recv = grpc_byte_buffer **request_payload_recv =
malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS); (grpc_byte_buffer **)malloc(sizeof(grpc_byte_buffer *) * NUM_CALLS);
int *was_cancelled = (int *)malloc(sizeof(int) * NUM_CALLS); int *was_cancelled = (int *)malloc(sizeof(int) * NUM_CALLS);
grpc_call_error error; grpc_call_error error;
int pending_client_calls = 0; int pending_client_calls = 0;

Loading…
Cancel
Save