Merge branch 'master' of github.com:grpc/grpc into backoff_cpp

reviewable/pr13494/r9
David Garcia Quintas 7 years ago
commit b02fde2de3
  1. 1
      .gitignore
  2. 235
      src/core/ext/filters/client_channel/client_channel.cc
  3. 27
      src/core/ext/filters/client_channel/lb_policy.cc
  4. 19
      src/core/ext/filters/client_channel/lb_policy.h
  5. 49
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  6. 98
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  7. 128
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  8. 5
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  9. 7
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  10. 23
      src/core/ext/transport/inproc/inproc_transport.cc
  11. 8
      src/core/lib/channel/handshaker.cc
  12. 12
      src/core/lib/channel/handshaker.h
  13. 5
      src/core/lib/http/httpcli_security_connector.cc
  14. 100
      src/core/lib/iomgr/tcp_server_uv.cc
  15. 5
      src/csharp/generate_proto_csharp.sh
  16. 169
      src/python/grpcio/grpc/__init__.py
  17. 2
      src/python/grpcio/grpc/_auth.py
  18. 33
      src/python/grpcio/grpc/_credential_composition.py
  19. 13
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  20. 8
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  21. 82
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi
  22. 322
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  23. 127
      src/python/grpcio/grpc/_plugin_wrapping.py
  24. 20
      src/python/grpcio/grpc/_server.py
  25. 2
      src/python/grpcio_tests/tests/tests.json
  26. 6
      src/python/grpcio_tests/tests/unit/_auth_test.py
  27. 22
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
  28. 22
      src/python/grpcio_tests/tests/unit/_invocation_defects_test.py
  29. 5
      test/core/security/ssl_server_fuzzer.cc
  30. 2
      tools/run_tests/run_tests.py

1
.gitignore vendored

@ -121,6 +121,7 @@ gdb.txt
tags
# perf data
memory_usage.csv
perf.data
perf.data.old

@ -210,6 +210,14 @@ typedef struct client_channel_channel_data {
char* info_service_config_json;
} channel_data;
typedef struct {
channel_data* chand;
/** used as an identifier, don't dereference it because the LB policy may be
* non-existing when the callback is run */
grpc_lb_policy* lb_policy;
grpc_closure closure;
} reresolution_request_args;
/** We create one watcher for each new lb_policy that is returned from a
resolver, to watch for state changes from the lb_policy. When a state
change is seen, we update the channel, and create a new watcher. */
@ -258,21 +266,13 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx* exec_ctx,
static void on_lb_policy_state_changed_locked(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
lb_policy_connectivity_watcher* w = (lb_policy_connectivity_watcher*)arg;
grpc_connectivity_state publish_state = w->state;
/* check if the notification is for the latest policy */
if (w->lb_policy == w->chand->lb_policy) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand,
w->lb_policy, grpc_connectivity_state_name(w->state));
}
if (publish_state == GRPC_CHANNEL_SHUTDOWN &&
w->chand->resolver != nullptr) {
publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver);
GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel");
w->chand->lb_policy = nullptr;
}
set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state,
set_channel_connectivity_state_locked(exec_ctx, w->chand, w->state,
GRPC_ERROR_REF(error), "lb_changed");
if (w->state != GRPC_CHANNEL_SHUTDOWN) {
watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state);
@ -369,6 +369,27 @@ static void parse_retry_throttle_params(const grpc_json* field, void* arg) {
}
}
static void request_reresolution_locked(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
reresolution_request_args* args = (reresolution_request_args*)arg;
channel_data* chand = args->chand;
// If this invocation is for a stale LB policy, treat it as an LB shutdown
// signal.
if (args->lb_policy != chand->lb_policy || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "re-resolution");
gpr_free(args);
return;
}
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_DEBUG, "chand=%p: started name re-resolving", chand);
}
grpc_resolver_channel_saw_error_locked(exec_ctx, chand->resolver);
// Give back the closure to the LB policy.
grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, chand->lb_policy,
&args->closure);
}
static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
void* arg, grpc_error* error) {
channel_data* chand = (channel_data*)arg;
@ -385,100 +406,114 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
grpc_server_retry_throttle_data* retry_throttle_data = nullptr;
grpc_slice_hash_table* method_params_table = nullptr;
if (chand->resolver_result != nullptr) {
// Find LB policy name.
const char* lb_policy_name = nullptr;
const grpc_arg* channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver actually specified.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses* addresses =
(grpc_lb_addresses*)channel_arg->value.pointer.p;
bool found_balancer_address = false;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) {
found_balancer_address = true;
break;
if (chand->resolver != nullptr) {
// Find LB policy name.
const char* lb_policy_name = nullptr;
const grpc_arg* channel_arg = grpc_channel_args_find(
chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
lb_policy_name = channel_arg->value.string;
}
// Special case: If at least one balancer address is present, we use
// the grpclb policy, regardless of what the resolver actually specified.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
grpc_lb_addresses* addresses =
(grpc_lb_addresses*)channel_arg->value.pointer.p;
bool found_balancer_address = false;
for (size_t i = 0; i < addresses->num_addresses; ++i) {
if (addresses->addresses[i].is_balancer) {
found_balancer_address = true;
break;
}
}
if (found_balancer_address) {
if (lb_policy_name != nullptr &&
strcmp(lb_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy",
lb_policy_name);
}
lb_policy_name = "grpclb";
}
}
if (found_balancer_address) {
if (lb_policy_name != nullptr &&
strcmp(lb_policy_name, "grpclb") != 0) {
gpr_log(GPR_INFO,
"resolver requested LB policy %s but provided at least one "
"balancer address -- forcing use of grpclb LB policy",
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
lb_policy_name_changed =
chand->info_lb_policy_name == nullptr ||
gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
lb_policy_updated = true;
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy,
&lb_policy_args);
} else {
// Instantiate new LB policy.
new_lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (new_lb_policy == nullptr) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"",
lb_policy_name);
} else {
reresolution_request_args* args =
(reresolution_request_args*)gpr_zalloc(sizeof(*args));
args->chand = chand;
args->lb_policy = new_lb_policy;
GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
grpc_combiner_scheduler(chand->combiner));
GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, new_lb_policy,
&args->closure);
}
lb_policy_name = "grpclb";
}
}
// Use pick_first if nothing was specified and we didn't select grpclb
// above.
if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
grpc_lb_policy_args lb_policy_args;
lb_policy_args.args = chand->resolver_result;
lb_policy_args.client_channel_factory = chand->client_channel_factory;
lb_policy_args.combiner = chand->combiner;
// Check to see if we're already using the right LB policy.
// Note: It's safe to use chand->info_lb_policy_name here without
// taking a lock on chand->info_mu, because this function is the
// only thing that modifies its value, and it can only be invoked
// once at any given time.
lb_policy_name_changed =
chand->info_lb_policy_name == nullptr ||
gpr_stricmp(chand->info_lb_policy_name, lb_policy_name) != 0;
if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
// Continue using the same LB policy. Update with new addresses.
lb_policy_updated = true;
grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args);
} else {
// Instantiate new LB policy.
new_lb_policy =
grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args);
if (new_lb_policy == nullptr) {
gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
}
}
// Find service config.
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
service_config_json = gpr_strdup(channel_arg->value.string);
grpc_service_config* service_config =
grpc_service_config_create(service_config_json);
if (service_config != nullptr) {
channel_arg =
grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
GPR_ASSERT(channel_arg != nullptr);
// Find service config.
channel_arg = grpc_channel_args_find(chand->resolver_result,
GRPC_ARG_SERVICE_CONFIG);
if (channel_arg != nullptr) {
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
grpc_uri* uri =
grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path;
grpc_service_config_parse_global_params(
service_config, parse_retry_throttle_params, &parsing_state);
grpc_uri_destroy(uri);
retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
method_parameters_ref_wrapper, method_parameters_unref_wrapper);
grpc_service_config_destroy(service_config);
service_config_json = gpr_strdup(channel_arg->value.string);
grpc_service_config* service_config =
grpc_service_config_create(service_config_json);
if (service_config != nullptr) {
channel_arg = grpc_channel_args_find(chand->resolver_result,
GRPC_ARG_SERVER_URI);
GPR_ASSERT(channel_arg != nullptr);
GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
grpc_uri* uri =
grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
service_config_parsing_state parsing_state;
memset(&parsing_state, 0, sizeof(parsing_state));
parsing_state.server_name =
uri->path[0] == '/' ? uri->path + 1 : uri->path;
grpc_service_config_parse_global_params(
service_config, parse_retry_throttle_params, &parsing_state);
grpc_uri_destroy(uri);
retry_throttle_data = parsing_state.retry_throttle_data;
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
method_parameters_ref_wrapper, method_parameters_unref_wrapper);
grpc_service_config_destroy(service_config);
}
}
// Before we clean up, save a copy of lb_policy_name, since it might
// be pointing to data inside chand->resolver_result.
// The copy will be saved in chand->lb_policy_name below.
lb_policy_name_dup = gpr_strdup(lb_policy_name);
}
// Before we clean up, save a copy of lb_policy_name, since it might
// be pointing to data inside chand->resolver_result.
// The copy will be saved in chand->lb_policy_name below.
lb_policy_name_dup = gpr_strdup(lb_policy_name);
grpc_channel_args_destroy(exec_ctx, chand->resolver_result);
chand->resolver_result = nullptr;
}
@ -515,11 +550,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx* exec_ctx,
}
chand->method_params_table = method_params_table;
// If we have a new LB policy or are shutting down (in which case
// new_lb_policy will be NULL), swap out the LB policy, unreffing the
// old one and removing its fds from chand->interested_parties.
// Note that we do NOT do this if either (a) we updated the existing
// LB policy above or (b) we failed to create the new LB policy (in
// which case we want to continue using the most recent one we had).
// new_lb_policy will be NULL), swap out the LB policy, unreffing the old one
// and removing its fds from chand->interested_parties. Note that we do NOT do
// this if either (a) we updated the existing LB policy above or (b) we failed
// to create the new LB policy (in which case we want to continue using the
// most recent one we had).
if (new_lb_policy != nullptr || error != GRPC_ERROR_NONE ||
chand->resolver == nullptr) {
if (chand->lb_policy != nullptr) {

@ -161,3 +161,30 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx,
const grpc_lb_policy_args* lb_policy_args) {
policy->vtable->update_locked(exec_ctx, policy, lb_policy_args);
}
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* request_reresolution) {
policy->vtable->set_reresolve_closure_locked(exec_ctx, policy,
request_reresolution);
}
void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy,
grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error) {
if (policy->request_reresolution != nullptr) {
GRPC_CLOSURE_SCHED(exec_ctx, policy->request_reresolution, error);
policy->request_reresolution = nullptr;
if (grpc_lb_trace->enabled()) {
gpr_log(GPR_DEBUG,
"%s %p: scheduling re-resolution closure with error=%s.",
grpc_lb_trace->name(), policy, grpc_error_string(error));
}
} else {
if (grpc_lb_trace->enabled() && error == GRPC_ERROR_NONE) {
gpr_log(GPR_DEBUG, "%s %p: re-resolution already in progress.",
grpc_lb_trace->name(), policy);
}
}
}

@ -38,6 +38,8 @@ struct grpc_lb_policy {
grpc_pollset_set* interested_parties;
/* combiner under which lb_policy actions take place */
grpc_combiner* combiner;
/* callback to force a re-resolution */
grpc_closure* request_reresolution;
};
/** Extra arguments for an LB pick */
@ -96,6 +98,11 @@ struct grpc_lb_policy_vtable {
void (*update_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
const grpc_lb_policy_args* args);
/** \see grpc_lb_policy_set_reresolve_closure */
void (*set_reresolve_closure_locked)(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy,
grpc_closure* request_reresolution);
};
#ifndef NDEBUG
@ -202,4 +209,16 @@ void grpc_lb_policy_update_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy,
const grpc_lb_policy_args* lb_policy_args);
/** Set the re-resolution closure to \a request_reresolution. */
void grpc_lb_policy_set_reresolve_closure_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* request_reresolution);
/** Try to request a re-resolution. It's NOT a public API; it's only for use by
the LB policy implementations. */
void grpc_lb_policy_try_reresolve(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy,
grpc_core::TraceFlag* grpc_lb_trace,
grpc_error* error);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_H */

@ -637,7 +637,7 @@ static void update_lb_connectivity_status_locked(
/* Perform a pick over \a glb_policy->rr_policy. Given that a pick can return
* immediately (ignoring its completion callback), we need to perform the
* cleanups this callback would otherwise be resposible for.
* cleanups this callback would otherwise be responsible for.
* If \a force_async is true, then we will manually schedule the
* completion callback even if the pick is available immediately. */
static bool pick_from_internal_rr_locked(
@ -766,6 +766,9 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
glb_policy->rr_policy);
return;
}
grpc_lb_policy_set_reresolve_closure_locked(
exec_ctx, new_rr_policy, glb_policy->base.request_reresolution);
glb_policy->base.request_reresolution = nullptr;
glb_policy->rr_policy = new_rr_policy;
grpc_error* rr_state_error = nullptr;
const grpc_connectivity_state rr_state =
@ -991,6 +994,7 @@ static void glb_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
glb_policy->shutting_down = true;
/* We need a copy of the lb_call pointer because we can't cancell the call
@ -1021,6 +1025,9 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
glb_policy->pending_pings = nullptr;
if (glb_policy->rr_policy != nullptr) {
GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown");
} else {
grpc_lb_policy_try_reresolve(exec_ctx, pol, &grpc_lb_glb_trace,
GRPC_ERROR_CANCELLED);
}
// We destroy the LB channel here because
// glb_lb_channel_on_connectivity_changed_cb needs a valid glb_policy
@ -1030,28 +1037,27 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
grpc_channel_destroy(glb_policy->lb_channel);
glb_policy->lb_channel = nullptr;
}
grpc_connectivity_state_set(
exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "glb_shutdown");
grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"glb_shutdown");
while (pp != nullptr) {
pending_pick* next = pp->next;
*pp->target = nullptr;
GRPC_CLOSURE_SCHED(
exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure,
GRPC_ERROR_REF(error));
gpr_free(pp);
pp = next;
}
while (pping != nullptr) {
pending_ping* next = pping->next;
GRPC_CLOSURE_SCHED(
exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure,
GRPC_ERROR_REF(error));
gpr_free(pping);
pping = next;
}
GRPC_ERROR_UNREF(error);
}
// Cancel a specific pending pick.
@ -1754,8 +1760,8 @@ static void fallback_update_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_addresses_destroy(exec_ctx, glb_policy->fallback_backend_addresses);
glb_policy->fallback_backend_addresses =
extract_backend_addresses_locked(exec_ctx, addresses);
if (glb_policy->started_picking && glb_policy->lb_fallback_timeout_ms > 0 &&
!glb_policy->fallback_timer_active) {
if (glb_policy->lb_fallback_timeout_ms > 0 &&
glb_policy->rr_policy != nullptr) {
rr_handover_locked(exec_ctx, glb_policy);
}
}
@ -1853,7 +1859,7 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
grpc_call_cancel(glb_policy->lb_call, nullptr);
// lb_on_server_status_received() will pick up the cancel and reinit
// lb_call.
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
} else if (glb_policy->started_picking) {
if (glb_policy->retry_timer_active) {
grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer);
glb_policy->retry_timer_active = false;
@ -1870,6 +1876,20 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx* exec_ctx,
}
}
static void glb_set_reresolve_closure_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* request_reresolution) {
glb_lb_policy* glb_policy = (glb_lb_policy*)policy;
GPR_ASSERT(!glb_policy->shutting_down);
GPR_ASSERT(glb_policy->base.request_reresolution == nullptr);
if (glb_policy->rr_policy != nullptr) {
grpc_lb_policy_set_reresolve_closure_locked(exec_ctx, glb_policy->rr_policy,
request_reresolution);
} else {
glb_policy->base.request_reresolution = request_reresolution;
}
}
/* Code wiring the policy with the rest of the core */
static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_destroy,
@ -1881,7 +1901,8 @@ static const grpc_lb_policy_vtable glb_lb_policy_vtable = {
glb_exit_idle_locked,
glb_check_connectivity_locked,
glb_notify_on_state_change_locked,
glb_update_locked};
glb_update_locked,
glb_set_reresolve_closure_locked};
static grpc_lb_policy* glb_create(grpc_exec_ctx* exec_ctx,
grpc_lb_policy_factory* factory,

@ -70,8 +70,9 @@ static void pf_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
}
}
static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p,
grpc_error* error) {
static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_DEBUG, "Pick First %p Shutting down", p);
}
@ -96,14 +97,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, pick_first_lb_policy* p,
exec_ctx, p->latest_pending_subchannel_list, "pf_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_pick_first_trace,
GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
static void pf_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
shutdown_locked(exec_ctx, (pick_first_lb_policy*)pol,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"));
}
static void pf_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@ -157,10 +155,15 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
if (p->subchannel_list != nullptr &&
p->subchannel_list->num_subchannels > 0) {
p->subchannel_list->checking_subchannel = 0;
grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->subchannel_list, "connectivity_watch+start_picking");
grpc_lb_subchannel_data_start_connectivity_watch(
exec_ctx, &p->subchannel_list->subchannels[0]);
for (size_t i = 0; i < p->subchannel_list->num_subchannels; ++i) {
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(
p->subchannel_list, "connectivity_watch+start_picking");
grpc_lb_subchannel_data_start_connectivity_watch(
exec_ctx, &p->subchannel_list->subchannels[i]);
break;
}
}
}
}
@ -404,6 +407,9 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
if (sd->curr_connectivity_state != GRPC_CHANNEL_READY &&
p->latest_pending_subchannel_list != nullptr) {
p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "selected_not_ready+switch_to_update");
grpc_lb_subchannel_list_shutdown_and_unref(
exec_ctx, p->subchannel_list, "selected_not_ready+switch_to_update");
p->subchannel_list = p->latest_pending_subchannel_list;
@ -412,21 +418,35 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "selected_not_ready+switch_to_update");
} else {
if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
/* if the selected channel goes bad, we're done */
sd->curr_connectivity_state = GRPC_CHANNEL_SHUTDOWN;
// TODO(juanlishen): we re-resolve when the selected subchannel goes to
// TRANSIENT_FAILURE because we used to shut down in this case before
// re-resolution is introduced. But we need to investigate whether we
// really want to take any action instead of waiting for the selected
// subchannel reconnecting.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN ||
sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// If the selected channel goes bad, request a re-resolution.
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
"selected_changed+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(
exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
} else {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
sd->curr_connectivity_state,
GRPC_ERROR_REF(error), "selected_changed");
if (sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
// Renew notification.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
} else {
p->selected = nullptr;
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_selected_shutdown");
shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"pf_selected_shutdown");
}
}
return;
@ -531,24 +551,37 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
} while (sd->subchannel == nullptr && sd != original_sd);
if (sd == original_sd) {
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "pf_candidate_shutdown");
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Pick first exhausted channels", &error, 1));
break;
}
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
exec_ctx, sd->subchannel_list, "pf_exhausted_subchannels");
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_IDLE, GRPC_ERROR_NONE,
"exhausted_subchannels+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(
exec_ctx, &p->base, &grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
}
} else {
if (sd->subchannel_list == p->subchannel_list) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "subchannel_failed");
}
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
}
// Reuses the connectivity refs from the previous watch.
grpc_lb_subchannel_data_start_connectivity_watch(exec_ctx, sd);
break;
}
}
}
static void pf_set_reresolve_closure_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* request_reresolution) {
pick_first_lb_policy* p = (pick_first_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy,
pf_shutdown_locked,
@ -559,7 +592,8 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_exit_idle_locked,
pf_check_connectivity_locked,
pf_notify_on_state_change_locked,
pf_update_locked};
pf_update_locked,
pf_set_reresolve_closure_locked};
static void pick_first_factory_ref(grpc_lb_policy_factory* factory) {}

@ -20,9 +20,9 @@
*
* Before every pick, the \a get_next_ready_subchannel_index_locked function
* returns the p->subchannel_list->subchannels index for next subchannel,
* respecting the relative
* order of the addresses provided upon creation or updates. Note however that
* updates will start picking from the beginning of the updated list. */
* respecting the relative order of the addresses provided upon creation or
* updates. Note however that updates will start picking from the beginning of
* the updated list. */
#include <string.h>
@ -167,8 +167,9 @@ static void rr_destroy(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
gpr_free(p);
}
static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
grpc_error* error) {
static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown");
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_DEBUG, "[RR %p] Shutting down", p);
}
@ -194,15 +195,11 @@ static void shutdown_locked(grpc_exec_ctx* exec_ctx, round_robin_lb_policy* p,
"sl_shutdown_pending_rr_shutdown");
p->latest_pending_subchannel_list = nullptr;
}
grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_CANCELLED);
GRPC_ERROR_UNREF(error);
}
static void rr_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
shutdown_locked(exec_ctx, p,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"));
}
static void rr_cancel_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_connected_subchannel** target,
grpc_error* error) {
@ -255,10 +252,12 @@ static void start_picking_locked(grpc_exec_ctx* exec_ctx,
round_robin_lb_policy* p) {
p->started_picking = true;
for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
exec_ctx, &p->subchannel_list->subchannels[i]);
if (p->subchannel_list->subchannels[i].subchannel != nullptr) {
grpc_lb_subchannel_list_ref_for_connectivity_watch(p->subchannel_list,
"connectivity_watch");
grpc_lb_subchannel_data_start_connectivity_watch(
exec_ctx, &p->subchannel_list->subchannels[i]);
}
}
}
@ -346,71 +345,71 @@ static void update_state_counters_locked(grpc_lb_subchannel_data* sd) {
}
/** Sets the policy's connectivity status based on that of the passed-in \a sd
* (the grpc_lb_subchannel_data associted with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will only be
* used upon policy transition to TRANSIENT_FAILURE or SHUTDOWN. Returns the
* connectivity status set. */
static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_subchannel_data* sd, grpc_error* error) {
* (the grpc_lb_subchannel_data associated with the updated subchannel) and the
* subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used
* only if the policy transitions to state TRANSIENT_FAILURE. */
static void update_lb_connectivity_status_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_subchannel_data* sd,
grpc_error* error) {
/* In priority order. The first rule to match terminates the search (ie, if we
* are on rule n, all previous rules were unfulfilled).
*
* 1) RULE: ANY subchannel is READY => policy is READY.
* CHECK: At least one subchannel is ready iff p->ready_list is NOT empty.
* CHECK: subchannel_list->num_ready > 0.
*
* 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
* CHECK: sd->curr_connectivity_state == CONNECTING.
*
* 3) RULE: ALL subchannels are SHUTDOWN => policy is SHUTDOWN.
* CHECK: p->subchannel_list->num_shutdown ==
* p->subchannel_list->num_subchannels.
* 3) RULE: ALL subchannels are SHUTDOWN => policy is IDLE (and requests
* re-resolution).
* CHECK: subchannel_list->num_shutdown ==
* subchannel_list->num_subchannels.
*
* 4) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
* TRANSIENT_FAILURE.
* CHECK: p->num_transient_failures == p->subchannel_list->num_subchannels.
* TRANSIENT_FAILURE.
* CHECK: subchannel_list->num_transient_failures ==
* subchannel_list->num_subchannels.
*
* 5) RULE: ALL subchannels are IDLE => policy is IDLE.
* CHECK: p->num_idle == p->subchannel_list->num_subchannels.
* CHECK: subchannel_list->num_idle == subchannel_list->num_subchannels.
* (Note that all the subchannels will transition from IDLE to CONNECTING
* in batch when we start trying to connect.)
*/
grpc_connectivity_state new_state = sd->curr_connectivity_state;
// TODO(juanlishen): if the subchannel states are mixed by {SHUTDOWN,
// TRANSIENT_FAILURE}, we don't change the state. We may want to improve on
// this.
grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list;
round_robin_lb_policy* p = (round_robin_lb_policy*)subchannel_list->policy;
if (subchannel_list->num_ready > 0) { /* 1) READY */
if (subchannel_list->num_ready > 0) {
/* 1) READY */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_READY,
GRPC_ERROR_NONE, "rr_ready");
new_state = GRPC_CHANNEL_READY;
} else if (sd->curr_connectivity_state ==
GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */
} else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) {
/* 2) CONNECTING */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
"rr_connecting");
new_state = GRPC_CHANNEL_CONNECTING;
} else if (p->subchannel_list->num_shutdown ==
p->subchannel_list->num_subchannels) { /* 3) SHUTDOWN */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
p->shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
if (grpc_lb_round_robin_trace.enabled()) {
gpr_log(GPR_INFO,
"[RR %p] Shutting down: all subchannels have gone into shutdown",
(void*)p);
}
} else if (subchannel_list->num_shutdown ==
subchannel_list->num_subchannels) {
/* 3) IDLE and re-resolve */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE,
"rr_exhausted_subchannels+reresolve");
p->started_picking = false;
grpc_lb_policy_try_reresolve(exec_ctx, &p->base, &grpc_lb_round_robin_trace,
GRPC_ERROR_NONE);
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
subchannel_list->num_subchannels) {
/* 4) TRANSIENT_FAILURE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_REF(error), "rr_transient_failure");
new_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
} else if (subchannel_list->num_idle ==
p->subchannel_list->num_subchannels) { /* 5) IDLE */
} else if (subchannel_list->num_idle == subchannel_list->num_subchannels) {
/* 5) IDLE */
grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_IDLE,
GRPC_ERROR_NONE, "rr_idle");
new_state = GRPC_CHANNEL_IDLE;
}
GRPC_ERROR_UNREF(error);
return new_state;
}
static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
@ -454,21 +453,16 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
// state (which was set by the connectivity state watcher) to
// curr_connectivity_state, which is what we use inside of the combiner.
sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe;
// Update state counters and determine new overall state.
// Update state counters and new overall state.
update_state_counters_locked(sd);
const grpc_connectivity_state new_policy_connectivity_state =
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel, and if the new
// policy's state is SHUTDOWN, clean up.
update_lb_connectivity_status_locked(exec_ctx, sd, GRPC_ERROR_REF(error));
// If the sd's new state is SHUTDOWN, unref the subchannel.
if (sd->curr_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
grpc_lb_subchannel_data_stop_connectivity_watch(exec_ctx, sd);
grpc_lb_subchannel_data_unref_subchannel(exec_ctx, sd,
"rr_connectivity_shutdown");
grpc_lb_subchannel_list_unref_for_connectivity_watch(
exec_ctx, sd->subchannel_list, "rr_connectivity_shutdown");
if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) {
shutdown_locked(exec_ctx, p, GRPC_ERROR_REF(error));
}
} else { // sd not in SHUTDOWN
if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) {
if (sd->connected_subchannel == nullptr) {
@ -504,7 +498,7 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx* exec_ctx, void* arg,
}
/* at this point we know there's at least one suitable subchannel. Go
* ahead and pick one and notify the pending suitors in
* p->pending_picks. This preemtively replicates rr_pick()'s actions. */
* p->pending_picks. This preemptively replicates rr_pick()'s actions. */
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
GPR_ASSERT(next_ready_index < p->subchannel_list->num_subchannels);
grpc_lb_subchannel_data* selected =
@ -642,6 +636,15 @@ static void rr_update_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
}
}
static void rr_set_reresolve_closure_locked(
grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* request_reresolution) {
round_robin_lb_policy* p = (round_robin_lb_policy*)policy;
GPR_ASSERT(!p->shutdown);
GPR_ASSERT(policy->request_reresolution == nullptr);
policy->request_reresolution = request_reresolution;
}
static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_destroy,
rr_shutdown_locked,
@ -652,7 +655,8 @@ static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
rr_exit_idle_locked,
rr_check_connectivity_locked,
rr_notify_on_state_change_locked,
rr_update_locked};
rr_update_locked,
rr_set_reresolve_closure_locked};
static void round_robin_factory_ref(grpc_lb_policy_factory* factory) {}

@ -165,8 +165,9 @@ static void start_handshake_locked(grpc_exec_ctx* exec_ctx,
grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint,
c->args.interested_parties);
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
c->args.deadline, nullptr /* acceptor */, on_handshake_done, c);
exec_ctx, c->handshake_mgr, c->args.interested_parties, c->endpoint,
c->args.channel_args, c->args.deadline, nullptr /* acceptor */,
on_handshake_done, c);
c->endpoint = nullptr; // Endpoint handed off to handshake manager.
}

@ -197,9 +197,10 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
grpc_channel_arg_get_integer(timeout_arg,
{120 * GPR_MS_PER_SEC, 1, INT_MAX});
grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
tcp, state->args,
connection_state->deadline, acceptor,
on_handshake_done, connection_state);
nullptr /* interested_parties */, tcp,
state->args, connection_state->deadline,
acceptor, on_handshake_done,
connection_state);
}
/* Server callback: start listening on our ports */

@ -458,6 +458,14 @@ static void fail_helper_locked(grpc_exec_ctx* exec_ctx, inproc_stream* s,
} else {
err = GRPC_ERROR_REF(error);
}
if (s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available != nullptr) {
// Set to true unconditionally, because we're failing the call, so even
// if we haven't actually seen the send_trailing_metadata op from the
// other side, we're going to return trailing metadata anyway.
*s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available = true;
}
INPROC_LOG(GPR_DEBUG,
"fail_helper %p scheduling initial-metadata-ready %p %p", s,
error, err);
@ -670,6 +678,12 @@ static void op_state_machine(grpc_exec_ctx* exec_ctx, void* arg,
nullptr);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
if (s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available != nullptr) {
*s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available =
(other != nullptr && other->send_trailing_md_op != nullptr);
}
grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_DEBUG,
@ -995,6 +1009,15 @@ static void perform_stream_op(grpc_exec_ctx* exec_ctx, grpc_transport* gt,
if (error != GRPC_ERROR_NONE) {
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
if (op->payload->recv_initial_metadata.trailing_metadata_available !=
nullptr) {
// Set to true unconditionally, because we're failing the call, so
// even if we haven't actually seen the send_trailing_metadata op
// from the other side, we're going to return trailing metadata
// anyway.
*op->payload->recv_initial_metadata.trailing_metadata_available =
true;
}
INPROC_LOG(
GPR_DEBUG,
"perform_stream_op error %p scheduling initial-metadata-ready %p",

@ -231,14 +231,16 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data) {
grpc_pollset_set* interested_parties, grpc_endpoint* endpoint,
const grpc_channel_args* channel_args, grpc_millis deadline,
grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done,
void* user_data) {
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
GPR_ASSERT(!mgr->shutdown);
// Construct handshaker args. These will be passed through all
// handshakers and eventually be freed by the on_handshake_done callback.
mgr->args.interested_parties = interested_parties;
mgr->args.endpoint = endpoint;
mgr->args.args = grpc_channel_args_copy(channel_args);
mgr->args.user_data = user_data;

@ -54,6 +54,7 @@ typedef struct grpc_handshaker grpc_handshaker;
/// For the on_handshake_done callback, all members are input arguments,
/// which the callback takes ownership of.
typedef struct {
grpc_pollset_set* interested_parties;
grpc_endpoint* endpoint;
grpc_channel_args* args;
grpc_slice_buffer* read_buffer;
@ -131,11 +132,13 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_error* why);
/// Invokes handshakers in the order they were added.
/// \a interested_parties may be non-nullptr to provide a pollset_set that
/// may be used during handshaking. Ownership is not taken.
/// Takes ownership of \a endpoint, and then passes that ownership to
/// the \a on_handshake_done callback.
/// Does NOT take ownership of \a channel_args. Instead, makes a copy before
/// invoking the first handshaker.
/// \a acceptor will be NULL for client-side handshakers.
/// \a acceptor will be nullptr for client-side handshakers.
///
/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
/// object as its argument. If the callback is invoked with error !=
@ -144,9 +147,10 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
/// the arguments.
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
grpc_millis deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data);
grpc_pollset_set* interested_parties, grpc_endpoint* endpoint,
const grpc_channel_args* channel_args, grpc_millis deadline,
grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done,
void* user_data);
/// Add \a mgr to the server side list of all pending handshake managers, the
/// list starts with \a *head.

@ -191,8 +191,9 @@ static void ssl_handshake(grpc_exec_ctx* exec_ctx, void* arg,
c->handshake_mgr = grpc_handshake_manager_create();
grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, &args, c->handshake_mgr);
grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, tcp, nullptr /* channel_args */, deadline,
nullptr /* acceptor */, on_handshake_done, c /* user_data */);
exec_ctx, c->handshake_mgr, nullptr /* interested_parties */, tcp,
nullptr /* channel_args */, deadline, nullptr /* acceptor */,
on_handshake_done, c /* user_data */);
GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &sc->base, "httpcli");
}

@ -260,15 +260,36 @@ static void on_connect(uv_stream_t* server, int status) {
grpc_exec_ctx_finish(&exec_ctx);
}
static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
const grpc_resolved_address* addr,
unsigned port_index,
grpc_tcp_listener** listener) {
static grpc_error* add_addr_to_server(grpc_tcp_server* s,
const grpc_resolved_address* addr,
unsigned port_index,
grpc_tcp_listener** listener) {
grpc_tcp_listener* sp = NULL;
int port = -1;
int status;
grpc_error* error;
grpc_resolved_address sockname_temp;
uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
int family = grpc_sockaddr_get_family(addr);
status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
if (family == AF_INET || family == AF_INET6) {
int fd;
uv_fileno((uv_handle_t*)handle, &fd);
int enable = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
}
#endif /* GPR_LINUX && SO_REUSEPORT */
if (status != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to initialize UV tcp handle");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(status)));
return error;
}
// The last argument to uv_tcp_bind is flags
status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0);
@ -325,20 +346,48 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, uv_tcp_t* handle,
return GRPC_ERROR_NONE;
}
static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
unsigned port_index,
int requested_port,
grpc_tcp_listener** listener) {
grpc_resolved_address wild4;
grpc_resolved_address wild6;
grpc_tcp_listener* sp = nullptr;
grpc_tcp_listener* sp2 = nullptr;
grpc_error* v6_err = GRPC_ERROR_NONE;
grpc_error* v4_err = GRPC_ERROR_NONE;
grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
/* Try listening on IPv6 first. */
if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) ==
GRPC_ERROR_NONE) {
*listener = sp;
return GRPC_ERROR_NONE;
}
if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) ==
GRPC_ERROR_NONE) {
*listener = sp2;
return GRPC_ERROR_NONE;
}
grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to add any wildcard listeners");
root_err = grpc_error_add_child(root_err, v6_err);
root_err = grpc_error_add_child(root_err, v4_err);
return root_err;
}
grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
const grpc_resolved_address* addr,
int* port) {
// This function is mostly copied from tcp_server_windows.c
grpc_tcp_listener* sp = NULL;
uv_tcp_t* handle;
grpc_resolved_address addr6_v4mapped;
grpc_resolved_address wildcard;
grpc_resolved_address* allocated_addr = NULL;
grpc_resolved_address sockname_temp;
unsigned port_index = 0;
int status;
grpc_error* error = GRPC_ERROR_NONE;
int family;
GRPC_UV_ASSERT_SAME_THREAD();
@ -367,38 +416,15 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
}
}
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
if (grpc_sockaddr_is_wildcard(addr, port)) {
grpc_sockaddr_make_wildcard6(*port, &wildcard);
addr = &wildcard;
}
handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
family = grpc_sockaddr_get_family(addr);
status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
if (family == AF_INET || family == AF_INET6) {
int fd;
uv_fileno((uv_handle_t*)handle, &fd);
int enable = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
}
#endif /* GPR_LINUX && SO_REUSEPORT */
if (status == 0) {
error = add_socket_to_server(s, handle, addr, port_index, &sp);
error = add_wildcard_addrs_to_server(s, port_index, *port, &sp);
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Failed to initialize UV tcp handle");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(status)));
if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
addr = &addr6_v4mapped;
}
error = add_addr_to_server(s, addr, port_index, &sp);
}
gpr_free(allocated_addr);

@ -33,6 +33,11 @@ $PROTOC --plugin=$PLUGIN --csharp_out=$HEALTHCHECK_DIR --grpc_out=$HEALTHCHECK_D
$PROTOC --plugin=$PLUGIN --csharp_out=$REFLECTION_DIR --grpc_out=$REFLECTION_DIR \
-I src/proto src/proto/grpc/reflection/v1alpha/reflection.proto
# Put grp/core/stats.proto in a subdirectory to avoid collision with grpc/testing/stats.proto
mkdir -p $TESTING_DIR/CoreStats
$PROTOC --plugin=$PLUGIN --csharp_out=$TESTING_DIR/CoreStats --grpc_out=$TESTING_DIR/CoreStats \
-I src/proto src/proto/grpc/core/stats.proto
# TODO(jtattermusch): following .proto files are a bit broken and import paths
# don't match the package names. Setting -I to the correct value src/proto
# breaks the code generation.

@ -348,26 +348,25 @@ class Call(six.with_metaclass(abc.ABCMeta, RpcContext)):
class ChannelCredentials(object):
"""An encapsulation of the data required to create a secure Channel.
This class has no supported interface - it exists to define the type of its
instances and its instances exist to be passed to other functions. For
example, ssl_channel_credentials returns an instance, and secure_channel
consumes an instance of this class.
"""
This class has no supported interface - it exists to define the type of its
instances and its instances exist to be passed to other functions. For
example, ssl_channel_credentials returns an instance of this class and
secure_channel requires an instance of this class.
"""
def __init__(self, credentials):
self._credentials = credentials
class CallCredentials(object):
"""An encapsulation of the data required to assert an identity over a
channel.
"""An encapsulation of the data required to assert an identity over a call.
A CallCredentials may be composed with ChannelCredentials to always assert
identity for every call over that Channel.
A CallCredentials may be composed with ChannelCredentials to always assert
identity for every call over that Channel.
This class has no supported interface - it exists to define the type of its
instances and its instances exist to be passed to other functions.
"""
This class has no supported interface - it exists to define the type of its
instances and its instances exist to be passed to other functions.
"""
def __init__(self, credentials):
self._credentials = credentials
@ -376,23 +375,22 @@ class CallCredentials(object):
class AuthMetadataContext(six.with_metaclass(abc.ABCMeta)):
"""Provides information to call credentials metadata plugins.
Attributes:
service_url: A string URL of the service being called into.
method_name: A string of the fully qualified method name being called.
"""
Attributes:
service_url: A string URL of the service being called into.
method_name: A string of the fully qualified method name being called.
"""
class AuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
"""Callback object received by a metadata plugin."""
def __call__(self, metadata, error):
"""Inform the gRPC runtime of the metadata to construct a
CallCredentials.
"""Passes to the gRPC runtime authentication metadata for an RPC.
Args:
metadata: The :term:`metadata` used to construct the CallCredentials.
error: An Exception to indicate error or None to indicate success.
"""
Args:
metadata: The :term:`metadata` used to construct the CallCredentials.
error: An Exception to indicate error or None to indicate success.
"""
raise NotImplementedError()
@ -402,14 +400,14 @@ class AuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)):
def __call__(self, context, callback):
"""Implements authentication by passing metadata to a callback.
Implementations of this method must not block.
Implementations of this method must not block.
Args:
context: An AuthMetadataContext providing information on the RPC that the
plugin is being called to authenticate.
callback: An AuthMetadataPluginCallback to be invoked either synchronously
or asynchronously.
"""
Args:
context: An AuthMetadataContext providing information on the RPC that
the plugin is being called to authenticate.
callback: An AuthMetadataPluginCallback to be invoked either
synchronously or asynchronously.
"""
raise NotImplementedError()
@ -1138,99 +1136,86 @@ def ssl_channel_credentials(root_certificates=None,
certificate_chain=None):
"""Creates a ChannelCredentials for use with an SSL-enabled Channel.
Args:
root_certificates: The PEM-encoded root certificates as a byte string,
or None to retrieve them from a default location chosen by gRPC runtime.
private_key: The PEM-encoded private key as a byte string, or None if no
private key should be used.
certificate_chain: The PEM-encoded certificate chain as a byte string
to use or or None if no certificate chain should be used.
Args:
root_certificates: The PEM-encoded root certificates as a byte string,
or None to retrieve them from a default location chosen by gRPC
runtime.
private_key: The PEM-encoded private key as a byte string, or None if no
private key should be used.
certificate_chain: The PEM-encoded certificate chain as a byte string
to use or or None if no certificate chain should be used.
Returns:
A ChannelCredentials for use with an SSL-enabled Channel.
"""
if private_key is not None or certificate_chain is not None:
pair = _cygrpc.SslPemKeyCertPair(private_key, certificate_chain)
else:
pair = None
Returns:
A ChannelCredentials for use with an SSL-enabled Channel.
"""
return ChannelCredentials(
_cygrpc.channel_credentials_ssl(root_certificates, pair))
_cygrpc.SSLChannelCredentials(root_certificates, private_key,
certificate_chain))
def metadata_call_credentials(metadata_plugin, name=None):
"""Construct CallCredentials from an AuthMetadataPlugin.
Args:
metadata_plugin: An AuthMetadataPlugin to use for authentication.
name: An optional name for the plugin.
Args:
metadata_plugin: An AuthMetadataPlugin to use for authentication.
name: An optional name for the plugin.
Returns:
A CallCredentials.
"""
Returns:
A CallCredentials.
"""
from grpc import _plugin_wrapping # pylint: disable=cyclic-import
if name is None:
try:
effective_name = metadata_plugin.__name__
except AttributeError:
effective_name = metadata_plugin.__class__.__name__
else:
effective_name = name
return CallCredentials(
_plugin_wrapping.call_credentials_metadata_plugin(metadata_plugin,
effective_name))
return _plugin_wrapping.metadata_plugin_call_credentials(metadata_plugin,
name)
def access_token_call_credentials(access_token):
"""Construct CallCredentials from an access token.
Args:
access_token: A string to place directly in the http request
authorization header, for example
"authorization: Bearer <access_token>".
Args:
access_token: A string to place directly in the http request
authorization header, for example
"authorization: Bearer <access_token>".
Returns:
A CallCredentials.
"""
Returns:
A CallCredentials.
"""
from grpc import _auth # pylint: disable=cyclic-import
return metadata_call_credentials(
_auth.AccessTokenCallCredentials(access_token))
from grpc import _plugin_wrapping # pylint: disable=cyclic-import
return _plugin_wrapping.metadata_plugin_call_credentials(
_auth.AccessTokenAuthMetadataPlugin(access_token), None)
def composite_call_credentials(*call_credentials):
"""Compose multiple CallCredentials to make a new CallCredentials.
Args:
*call_credentials: At least two CallCredentials objects.
Args:
*call_credentials: At least two CallCredentials objects.
Returns:
A CallCredentials object composed of the given CallCredentials objects.
"""
from grpc import _credential_composition # pylint: disable=cyclic-import
cygrpc_call_credentials = tuple(
single_call_credentials._credentials
for single_call_credentials in call_credentials)
Returns:
A CallCredentials object composed of the given CallCredentials objects.
"""
return CallCredentials(
_credential_composition.call(cygrpc_call_credentials))
_cygrpc.CompositeCallCredentials(
tuple(single_call_credentials._credentials
for single_call_credentials in call_credentials)))
def composite_channel_credentials(channel_credentials, *call_credentials):
"""Compose a ChannelCredentials and one or more CallCredentials objects.
Args:
channel_credentials: A ChannelCredentials object.
*call_credentials: One or more CallCredentials objects.
Args:
channel_credentials: A ChannelCredentials object.
*call_credentials: One or more CallCredentials objects.
Returns:
A ChannelCredentials composed of the given ChannelCredentials and
CallCredentials objects.
"""
from grpc import _credential_composition # pylint: disable=cyclic-import
cygrpc_call_credentials = tuple(
single_call_credentials._credentials
for single_call_credentials in call_credentials)
Returns:
A ChannelCredentials composed of the given ChannelCredentials and
CallCredentials objects.
"""
return ChannelCredentials(
_credential_composition.channel(channel_credentials._credentials,
cygrpc_call_credentials))
_cygrpc.CompositeChannelCredentials(
tuple(single_call_credentials._credentials
for single_call_credentials in call_credentials),
channel_credentials._credentials))
def ssl_server_credentials(private_key_certificate_chain_pairs,

@ -63,7 +63,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin):
self._pool.shutdown(wait=False)
class AccessTokenCallCredentials(grpc.AuthMetadataPlugin):
class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin):
"""Metadata wrapper for raw access token credentials."""
def __init__(self, access_token):

@ -1,33 +0,0 @@
# Copyright 2016 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from grpc._cython import cygrpc
def _call(call_credentialses):
call_credentials_iterator = iter(call_credentialses)
composition = next(call_credentials_iterator)
for additional_call_credentials in call_credentials_iterator:
composition = cygrpc.call_credentials_composite(
composition, additional_call_credentials)
return composition
def call(call_credentialses):
return _call(call_credentialses)
def channel(channel_credentials, call_credentialses):
return cygrpc.channel_credentials_composite(channel_credentials,
_call(call_credentialses))

@ -72,13 +72,12 @@ cdef class Call:
result = grpc_call_cancel(self.c_call, NULL)
return result
def set_credentials(
self, CallCredentials call_credentials not None):
cdef grpc_call_error result
with nogil:
result = grpc_call_set_credentials(
self.c_call, call_credentials.c_credentials)
return result
def set_credentials(self, CallCredentials call_credentials not None):
cdef grpc_call_credentials *c_call_credentials = call_credentials.c()
cdef grpc_call_error call_error = grpc_call_set_credentials(
self.c_call, c_call_credentials)
grpc_call_credentials_release(c_call_credentials)
return call_error
def peer(self):
cdef char *peer = NULL

@ -33,10 +33,10 @@ cdef class Channel:
self.c_channel = grpc_insecure_channel_create(c_target, c_arguments,
NULL)
else:
with nogil:
self.c_channel = grpc_secure_channel_create(
channel_credentials.c_credentials, c_target, c_arguments, NULL)
self.references.append(channel_credentials)
c_channel_credentials = channel_credentials.c()
self.c_channel = grpc_secure_channel_create(
c_channel_credentials, c_target, c_arguments, NULL)
grpc_channel_credentials_release(c_channel_credentials)
self.references.append(target)
self.references.append(arguments)

@ -12,20 +12,66 @@
# See the License for the specific language governing permissions and
# limitations under the License.
cimport cpython
cdef class CallCredentials:
cdef grpc_call_credentials *c(self)
# TODO(https://github.com/grpc/grpc/issues/12531): remove.
cdef grpc_call_credentials *c_credentials
cdef int _get_metadata(
void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb, void *user_data,
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil
cdef void _destroy(void *state) with gil
cdef class MetadataPluginCallCredentials(CallCredentials):
cdef readonly object _metadata_plugin
cdef readonly bytes _name
cdef grpc_call_credentials *c(self)
cdef grpc_call_credentials *_composition(call_credentialses)
cdef class CompositeCallCredentials(CallCredentials):
cdef readonly tuple _call_credentialses
cdef grpc_call_credentials *c(self)
cdef class ChannelCredentials:
cdef grpc_channel_credentials *c(self)
# TODO(https://github.com/grpc/grpc/issues/12531): remove.
cdef grpc_channel_credentials *c_credentials
cdef grpc_ssl_pem_key_cert_pair c_ssl_pem_key_cert_pair
cdef list references
cdef class CallCredentials:
cdef class SSLChannelCredentials(ChannelCredentials):
cdef grpc_call_credentials *c_credentials
cdef list references
cdef readonly object _pem_root_certificates
cdef readonly object _private_key
cdef readonly object _certificate_chain
cdef grpc_channel_credentials *c(self)
cdef class CompositeChannelCredentials(ChannelCredentials):
cdef readonly tuple _call_credentialses
cdef readonly ChannelCredentials _channel_credentials
cdef grpc_channel_credentials *c(self)
cdef class ServerCertificateConfig:
@ -49,27 +95,3 @@ cdef class ServerCredentials:
cdef object cert_config_fetcher
# whether C-core has asked for the initial_cert_config
cdef bint initial_cert_config_fetched
cdef class CredentialsMetadataPlugin:
cdef object plugin_callback
cdef bytes plugin_name
cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin)
cdef class AuthMetadataContext:
cdef grpc_auth_metadata_context context
cdef int plugin_get_metadata(
void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb, void *user_data,
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil
cdef void plugin_destroy_c_plugin_state(void *state) with gil

@ -16,47 +16,123 @@ cimport cpython
import grpc
import threading
import traceback
cdef class ChannelCredentials:
cdef class CallCredentials:
def __cinit__(self):
grpc_init()
self.c_credentials = NULL
self.c_ssl_pem_key_cert_pair.private_key = NULL
self.c_ssl_pem_key_cert_pair.certificate_chain = NULL
self.references = []
cdef grpc_call_credentials *c(self):
raise NotImplementedError()
# The object *can* be invalid in Python if we fail to make the credentials
# (and the core thus returns NULL credentials). Used primarily for debugging.
@property
def is_valid(self):
return self.c_credentials != NULL
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_channel_credentials_release(self.c_credentials)
grpc_shutdown()
cdef int _get_metadata(
void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb, void *user_data,
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil:
def callback(Metadata metadata, grpc_status_code status, bytes error_details):
if status is StatusCode.ok:
cb(user_data, metadata.c_metadata, metadata.c_count, status, NULL)
else:
cb(user_data, NULL, 0, status, error_details)
args = context.service_url, context.method_name, callback,
threading.Thread(target=<object>state, args=args).start()
return 0 # Asynchronous return
cdef class CallCredentials:
cdef void _destroy(void *state) with gil:
cpython.Py_DECREF(<object>state)
def __cinit__(self):
grpc_init()
self.c_credentials = NULL
self.references = []
# The object *can* be invalid in Python if we fail to make the credentials
# (and the core thus returns NULL credentials). Used primarily for debugging.
@property
def is_valid(self):
return self.c_credentials != NULL
cdef class MetadataPluginCallCredentials(CallCredentials):
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_call_credentials_release(self.c_credentials)
grpc_shutdown()
def __cinit__(self, metadata_plugin, name):
self._metadata_plugin = metadata_plugin
self._name = name
cdef grpc_call_credentials *c(self):
cdef grpc_metadata_credentials_plugin c_metadata_plugin
c_metadata_plugin.get_metadata = _get_metadata
c_metadata_plugin.destroy = _destroy
c_metadata_plugin.state = <void *>self._metadata_plugin
c_metadata_plugin.type = self._name
cpython.Py_INCREF(self._metadata_plugin)
return grpc_metadata_credentials_create_from_plugin(c_metadata_plugin, NULL)
cdef grpc_call_credentials *_composition(call_credentialses):
call_credentials_iterator = iter(call_credentialses)
cdef CallCredentials composition = next(call_credentials_iterator)
cdef grpc_call_credentials *c_composition = composition.c()
cdef CallCredentials additional_call_credentials
cdef grpc_call_credentials *c_additional_call_credentials
cdef grpc_call_credentials *c_next_composition
for additional_call_credentials in call_credentials_iterator:
c_additional_call_credentials = additional_call_credentials.c()
c_next_composition = grpc_composite_call_credentials_create(
c_composition, c_additional_call_credentials, NULL)
grpc_call_credentials_release(c_composition)
grpc_call_credentials_release(c_additional_call_credentials)
c_composition = c_next_composition
return c_composition
cdef class CompositeCallCredentials(CallCredentials):
def __cinit__(self, call_credentialses):
self._call_credentialses = call_credentialses
cdef grpc_call_credentials *c(self):
return _composition(self._call_credentialses)
cdef class ChannelCredentials:
cdef grpc_channel_credentials *c(self):
raise NotImplementedError()
cdef class SSLChannelCredentials(ChannelCredentials):
def __cinit__(self, pem_root_certificates, private_key, certificate_chain):
self._pem_root_certificates = pem_root_certificates
self._private_key = private_key
self._certificate_chain = certificate_chain
cdef grpc_channel_credentials *c(self):
cdef const char *c_pem_root_certificates
cdef grpc_ssl_pem_key_cert_pair c_pem_key_certificate_pair
if self._pem_root_certificates is None:
c_pem_root_certificates = NULL
else:
c_pem_root_certificates = self._pem_root_certificates
if self._private_key is None and self._certificate_chain is None:
return grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL)
else:
c_pem_key_certificate_pair.private_key = self._private_key
c_pem_key_certificate_pair.certificate_chain = self._certificate_chain
return grpc_ssl_credentials_create(
c_pem_root_certificates, &c_pem_key_certificate_pair, NULL)
cdef class CompositeChannelCredentials(ChannelCredentials):
def __cinit__(self, call_credentialses, channel_credentials):
self._call_credentialses = call_credentialses
self._channel_credentials = channel_credentials
cdef grpc_channel_credentials *c(self):
cdef grpc_channel_credentials *c_channel_credentials
c_channel_credentials = self._channel_credentials.c()
cdef grpc_call_credentials *c_call_credentials_composition = _composition(
self._call_credentialses)
cdef grpc_channel_credentials *composition
c_composition = grpc_composite_channel_credentials_create(
c_channel_credentials, c_call_credentials_composition, NULL)
grpc_channel_credentials_release(c_channel_credentials)
grpc_call_credentials_release(c_call_credentials_composition)
return c_composition
cdef class ServerCertificateConfig:
@ -89,190 +165,6 @@ cdef class ServerCredentials:
grpc_server_credentials_release(self.c_credentials)
grpc_shutdown()
cdef class CredentialsMetadataPlugin:
def __cinit__(self, object plugin_callback, bytes name):
"""
Args:
plugin_callback (callable): Callback accepting a service URL (str/bytes)
and callback object (accepting a MetadataArray,
grpc_status_code, and a str/bytes error message). This argument
when called should be non-blocking and eventually call the callback
object with the appropriate status code/details and metadata (if
successful).
name (bytes): Plugin name.
"""
grpc_init()
if not callable(plugin_callback):
raise ValueError('expected callable plugin_callback')
self.plugin_callback = plugin_callback
self.plugin_name = name
def __dealloc__(self):
grpc_shutdown()
cdef grpc_metadata_credentials_plugin _c_plugin(CredentialsMetadataPlugin plugin):
cdef grpc_metadata_credentials_plugin c_plugin
c_plugin.get_metadata = plugin_get_metadata
c_plugin.destroy = plugin_destroy_c_plugin_state
c_plugin.state = <void *>plugin
c_plugin.type = plugin.plugin_name
cpython.Py_INCREF(plugin)
return c_plugin
cdef class AuthMetadataContext:
def __cinit__(self):
grpc_init()
self.context.service_url = NULL
self.context.method_name = NULL
@property
def service_url(self):
return self.context.service_url
@property
def method_name(self):
return self.context.method_name
def __dealloc__(self):
grpc_shutdown()
cdef int plugin_get_metadata(
void *state, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb, void *user_data,
grpc_metadata creds_md[GRPC_METADATA_CREDENTIALS_PLUGIN_SYNC_MAX],
size_t *num_creds_md, grpc_status_code *status,
const char **error_details) with gil:
called_flag = [False]
def python_callback(
Metadata metadata, grpc_status_code status,
bytes error_details):
cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
called_flag[0] = True
cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
cdef AuthMetadataContext cy_context = AuthMetadataContext()
cy_context.context = context
def async_callback():
try:
self.plugin_callback(cy_context, python_callback)
except Exception as error:
if not called_flag[0]:
cb(user_data, NULL, 0, StatusCode.unknown,
traceback.format_exc().encode())
threading.Thread(group=None, target=async_callback).start()
return 0 # Asynchronous return
cdef void plugin_destroy_c_plugin_state(void *state) with gil:
cpython.Py_DECREF(<CredentialsMetadataPlugin>state)
def channel_credentials_google_default():
cdef ChannelCredentials credentials = ChannelCredentials();
with nogil:
credentials.c_credentials = grpc_google_default_credentials_create()
return credentials
def channel_credentials_ssl(pem_root_certificates,
SslPemKeyCertPair ssl_pem_key_cert_pair):
pem_root_certificates = str_to_bytes(pem_root_certificates)
cdef ChannelCredentials credentials = ChannelCredentials()
cdef const char *c_pem_root_certificates = NULL
if pem_root_certificates is not None:
c_pem_root_certificates = pem_root_certificates
credentials.references.append(pem_root_certificates)
if ssl_pem_key_cert_pair is not None:
with nogil:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, &ssl_pem_key_cert_pair.c_pair, NULL)
credentials.references.append(ssl_pem_key_cert_pair)
else:
with nogil:
credentials.c_credentials = grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL)
return credentials
def channel_credentials_composite(
ChannelCredentials credentials_1 not None,
CallCredentials credentials_2 not None):
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef ChannelCredentials credentials = ChannelCredentials()
with nogil:
credentials.c_credentials = grpc_composite_channel_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def call_credentials_composite(
CallCredentials credentials_1 not None,
CallCredentials credentials_2 not None):
if not credentials_1.is_valid or not credentials_2.is_valid:
raise ValueError("passed credentials must both be valid")
cdef CallCredentials credentials = CallCredentials()
with nogil:
credentials.c_credentials = grpc_composite_call_credentials_create(
credentials_1.c_credentials, credentials_2.c_credentials, NULL)
credentials.references.append(credentials_1)
credentials.references.append(credentials_2)
return credentials
def call_credentials_google_compute_engine():
cdef CallCredentials credentials = CallCredentials()
with nogil:
credentials.c_credentials = (
grpc_google_compute_engine_credentials_create(NULL))
return credentials
def call_credentials_service_account_jwt_access(
json_key, Timespec token_lifetime not None):
json_key = str_to_bytes(json_key)
cdef CallCredentials credentials = CallCredentials()
cdef char *json_key_c_string = json_key
with nogil:
credentials.c_credentials = (
grpc_service_account_jwt_access_credentials_create(
json_key_c_string, token_lifetime.c_time, NULL))
credentials.references.append(json_key)
return credentials
def call_credentials_google_refresh_token(json_refresh_token):
json_refresh_token = str_to_bytes(json_refresh_token)
cdef CallCredentials credentials = CallCredentials()
cdef char *json_refresh_token_c_string = json_refresh_token
with nogil:
credentials.c_credentials = grpc_google_refresh_token_credentials_create(
json_refresh_token_c_string, NULL)
credentials.references.append(json_refresh_token)
return credentials
def call_credentials_google_iam(authorization_token, authority_selector):
authorization_token = str_to_bytes(authorization_token)
authority_selector = str_to_bytes(authority_selector)
cdef CallCredentials credentials = CallCredentials()
cdef char *authorization_token_c_string = authorization_token
cdef char *authority_selector_c_string = authority_selector
with nogil:
credentials.c_credentials = grpc_google_iam_credentials_create(
authorization_token_c_string, authority_selector_c_string, NULL)
credentials.references.append(authorization_token)
credentials.references.append(authority_selector)
return credentials
def call_credentials_metadata_plugin(CredentialsMetadataPlugin plugin):
cdef CallCredentials credentials = CallCredentials()
cdef grpc_metadata_credentials_plugin c_plugin = _c_plugin(plugin)
with nogil:
credentials.c_credentials = (
grpc_metadata_credentials_create_from_plugin(c_plugin, NULL))
# TODO(atash): the following held reference is *probably* never necessary
credentials.references.append(plugin)
return credentials
cdef const char* _get_c_pem_root_certs(pem_root_certs):
if pem_root_certs is None:
return NULL

@ -13,6 +13,7 @@
# limitations under the License.
import collections
import logging
import threading
import grpc
@ -20,89 +21,79 @@ from grpc import _common
from grpc._cython import cygrpc
class AuthMetadataContext(
class _AuthMetadataContext(
collections.namedtuple('AuthMetadataContext', (
'service_url', 'method_name',)), grpc.AuthMetadataContext):
pass
class AuthMetadataPluginCallback(grpc.AuthMetadataContext):
class _CallbackState(object):
def __init__(self, callback):
self._callback = callback
def __call__(self, metadata, error):
self._callback(metadata, error)
def __init__(self):
self.lock = threading.Lock()
self.called = False
self.exception = None
class _WrappedCygrpcCallback(object):
class _AuthMetadataPluginCallback(grpc.AuthMetadataPluginCallback):
def __init__(self, cygrpc_callback):
self.is_called = False
self.error = None
self.is_called_lock = threading.Lock()
self.cygrpc_callback = cygrpc_callback
def _invoke_failure(self, error):
# TODO(atash) translate different Exception superclasses into different
# status codes.
self.cygrpc_callback(_common.EMPTY_METADATA, cygrpc.StatusCode.internal,
_common.encode(str(error)))
def _invoke_success(self, metadata):
try:
cygrpc_metadata = _common.to_cygrpc_metadata(metadata)
except Exception as exception: # pylint: disable=broad-except
self._invoke_failure(exception)
return
self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, b'')
def __init__(self, state, callback):
self._state = state
self._callback = callback
def __call__(self, metadata, error):
with self.is_called_lock:
if self.is_called:
raise RuntimeError('callback should only ever be invoked once')
if self.error:
self._invoke_failure(self.error)
return
self.is_called = True
with self._state.lock:
if self._state.exception is None:
if self._state.called:
raise RuntimeError(
'AuthMetadataPluginCallback invoked more than once!')
else:
self._state.called = True
else:
raise RuntimeError(
'AuthMetadataPluginCallback raised exception "{}"!'.format(
self._state.exception))
if error is None:
self._invoke_success(metadata)
self._callback(
_common.to_cygrpc_metadata(metadata), cygrpc.StatusCode.ok,
None)
else:
self._invoke_failure(error)
def notify_failure(self, error):
with self.is_called_lock:
if not self.is_called:
self.error = error
self._callback(None, cygrpc.StatusCode.internal,
_common.encode(str(error)))
class _WrappedPlugin(object):
class _Plugin(object):
def __init__(self, plugin):
self.plugin = plugin
def __init__(self, metadata_plugin):
self._metadata_plugin = metadata_plugin
def __call__(self, context, cygrpc_callback):
wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
wrapped_context = AuthMetadataContext(
_common.decode(context.service_url),
_common.decode(context.method_name))
def __call__(self, service_url, method_name, callback):
context = _AuthMetadataContext(
_common.decode(service_url), _common.decode(method_name))
callback_state = _CallbackState()
try:
self._metadata_plugin(
context, _AuthMetadataPluginCallback(callback_state, callback))
except Exception as exception: # pylint: disable=broad-except
logging.exception(
'AuthMetadataPluginCallback "%s" raised exception!',
self._metadata_plugin)
with callback_state.lock:
callback_state.exception = exception
if callback_state.called:
return
callback(None, cygrpc.StatusCode.internal,
_common.encode(str(exception)))
def metadata_plugin_call_credentials(metadata_plugin, name):
if name is None:
try:
self.plugin(wrapped_context,
AuthMetadataPluginCallback(wrapped_cygrpc_callback))
except Exception as error:
wrapped_cygrpc_callback.notify_failure(error)
raise
def call_credentials_metadata_plugin(plugin, name):
"""
Args:
plugin: A callable accepting a grpc.AuthMetadataContext
object and a callback (itself accepting a list of metadata key/value
2-tuples and a None-able exception value). The callback must be eventually
called, but need not be called in plugin's invocation.
plugin's invocation must be non-blocking.
"""
return cygrpc.call_credentials_metadata_plugin(
cygrpc.CredentialsMetadataPlugin(
_WrappedPlugin(plugin), _common.encode(name)))
effective_name = metadata_plugin.__name__
except AttributeError:
effective_name = metadata_plugin.__class__.__name__
else:
effective_name = name
return grpc.CallCredentials(
cygrpc.MetadataPluginCallCredentials(
_Plugin(metadata_plugin), _common.encode(effective_name)))

@ -374,10 +374,10 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _Context(rpc_event, state, request_deserializer)
try:
return behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception calling application: {}'.format(e)
if exception not in state.rpc_errors:
details = 'Exception calling application: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
@ -389,10 +389,10 @@ def _take_response_from_response_iterator(rpc_event, state, response_iterator):
return next(response_iterator), True
except StopIteration:
return None, True
except Exception as e: # pylint: disable=broad-except
except Exception as exception: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(e)
if exception not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(exception)
logging.exception(details)
_abort(state, rpc_event.operation_call,
cygrpc.StatusCode.unknown, _common.encode(details))
@ -591,7 +591,13 @@ def _handle_call(rpc_event, generic_handlers, thread_pool,
if not rpc_event.success:
return None, None
if rpc_event.request_call_details.method is not None:
method_handler = _find_method_handler(rpc_event, generic_handlers)
try:
method_handler = _find_method_handler(rpc_event, generic_handlers)
except Exception as exception: # pylint: disable=broad-except
details = 'Exception servicing handler: {}'.format(exception)
logging.exception(details)
return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
b'Error in service handler!'), None
if method_handler is None:
return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
b'Method not found!'), None

@ -22,7 +22,7 @@
"unit._api_test.ChannelConnectivityTest",
"unit._api_test.ChannelTest",
"unit._auth_context_test.AuthContextTest",
"unit._auth_test.AccessTokenCallCredentialsTest",
"unit._auth_test.AccessTokenAuthMetadataPluginTest",
"unit._auth_test.GoogleCallCredentialsTest",
"unit._channel_args_test.ChannelArgsTest",
"unit._channel_connectivity_test.ChannelConnectivityTest",

@ -61,7 +61,7 @@ class GoogleCallCredentialsTest(unittest.TestCase):
self.assertTrue(callback_event.wait(1.0))
class AccessTokenCallCredentialsTest(unittest.TestCase):
class AccessTokenAuthMetadataPluginTest(unittest.TestCase):
def test_google_call_credentials_success(self):
callback_event = threading.Event()
@ -71,8 +71,8 @@ class AccessTokenCallCredentialsTest(unittest.TestCase):
self.assertIsNone(error)
callback_event.set()
call_creds = _auth.AccessTokenCallCredentials('token')
call_creds(None, mock_callback)
metadata_plugin = _auth.AccessTokenAuthMetadataPlugin('token')
metadata_plugin(None, mock_callback)
self.assertTrue(callback_event.wait(1.0))

@ -28,7 +28,7 @@ _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
_EMPTY_FLAGS = 0
def _metadata_plugin_callback(context, callback):
def _metadata_plugin(context, callback):
callback(
cygrpc.Metadata([
cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY,
@ -105,17 +105,9 @@ class TypeSmokeTest(unittest.TestCase):
channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([]))
del channel
def testCredentialsMetadataPluginUpDown(self):
plugin = cygrpc.CredentialsMetadataPlugin(
lambda ignored_a, ignored_b: None, b'')
del plugin
def testCallCredentialsFromPluginUpDown(self):
plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback,
b'')
call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
del plugin
del call_credentials
def test_metadata_plugin_call_credentials_up_down(self):
cygrpc.MetadataPluginCallCredentials(_metadata_plugin,
b'test plugin name!')
def testServerStartNoExplicitShutdown(self):
server = cygrpc.Server(cygrpc.ChannelArgs([]))
@ -205,7 +197,7 @@ class ServerClientMixin(object):
return test_utilities.SimpleFuture(performer)
def testEcho(self):
def test_echo(self):
DEADLINE = time.time() + 5
DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = b'key'
@ -439,8 +431,8 @@ class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
cygrpc.SslPemKeyCertPair(resources.private_key(),
resources.certificate_chain())
], False)
client_credentials = cygrpc.channel_credentials_ssl(
resources.test_root_certificates(), None)
client_credentials = cygrpc.SSLChannelCredentials(
resources.test_root_certificates(), None, None)
self.setUpMixin(server_credentials, client_credentials,
_SSL_HOST_OVERRIDE)

@ -32,6 +32,7 @@ _UNARY_UNARY = '/test/UnaryUnary'
_UNARY_STREAM = '/test/UnaryStream'
_STREAM_UNARY = '/test/StreamUnary'
_STREAM_STREAM = '/test/StreamStream'
_DEFECTIVE_GENERIC_RPC_HANDLER = '/test/DefectiveGenericRpcHandler'
class _Callback(object):
@ -95,6 +96,9 @@ class _Handler(object):
yield request
self._control.control()
def defective_generic_rpc_handler(self):
raise test_control.Defect()
class _MethodHandler(grpc.RpcMethodHandler):
@ -132,6 +136,8 @@ class _GenericHandler(grpc.GenericRpcHandler):
elif handler_call_details.method == _STREAM_STREAM:
return _MethodHandler(True, True, None, None, None, None, None,
self._handler.handle_stream_stream)
elif handler_call_details.method == _DEFECTIVE_GENERIC_RPC_HANDLER:
return self._handler.defective_generic_rpc_handler()
else:
return None
@ -176,6 +182,10 @@ def _stream_stream_multi_callable(channel):
return channel.stream_stream(_STREAM_STREAM)
def _defective_handler_multi_callable(channel):
return channel.unary_unary(_DEFECTIVE_GENERIC_RPC_HANDLER)
class InvocationDefectsTest(unittest.TestCase):
def setUp(self):
@ -235,6 +245,18 @@ class InvocationDefectsTest(unittest.TestCase):
for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
next(response_iterator)
def testDefectiveGenericRpcHandlerUnaryResponse(self):
request = b'\x07\x08'
multi_callable = _defective_handler_multi_callable(self._channel)
with self.assertRaises(grpc.RpcError) as exception_context:
response = multi_callable(
request,
metadata=(('test', 'DefectiveGenericRpcHandlerUnary'),))
self.assertIs(grpc.StatusCode.UNKNOWN,
exception_context.exception.code())
if __name__ == '__main__':
unittest.main(verbosity=2)

@ -92,8 +92,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_handshake_manager* handshake_mgr = grpc_handshake_manager_create();
grpc_server_security_connector_add_handshakers(&exec_ctx, sc, handshake_mgr);
grpc_handshake_manager_do_handshake(
&exec_ctx, handshake_mgr, mock_endpoint, nullptr /* channel_args */,
deadline, nullptr /* acceptor */, on_handshake_done, &state);
&exec_ctx, handshake_mgr, nullptr /* interested_parties */, mock_endpoint,
nullptr /* channel_args */, deadline, nullptr /* acceptor */,
on_handshake_done, &state);
grpc_exec_ctx_flush(&exec_ctx);
// If the given string happens to be part of the correct client hello, the

@ -1231,7 +1231,7 @@ if not args.disable_auto_set_flakes:
if test.flaky: flaky_tests.add(test.name)
if test.cpu > 0: shortname_to_cpu[test.name] = test.cpu
except:
print("Unexpected error getting flaky tests:", sys.exc_info()[0])
print("Unexpected error getting flaky tests: %s" % traceback.format_exc())
if args.force_default_poller:
_POLLING_STRATEGIES = {}

Loading…
Cancel
Save