Add grpc_error parameter to handshaker callback.

pull/7517/head
Mark D. Roth 9 years ago
parent b16b1f29c3
commit a228e5f231
  1. 5
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  2. 24
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  3. 35
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
  4. 28
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  5. 16
      src/core/lib/channel/handshaker.c
  6. 9
      src/core/lib/channel/handshaker.h

@ -88,7 +88,8 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
} }
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data) { grpc_channel_args *args, void *user_data,
grpc_error *error) {
connector *c = user_data; connector *c = user_data;
c->result->transport = c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1); grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
@ -97,7 +98,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
c->result->channel_args = args; c->result->channel_args = args;
grpc_closure *notify = c->notify; grpc_closure *notify = c->notify;
c->notify = NULL; c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
} }
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {

@ -126,15 +126,23 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
} }
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data) { grpc_channel_args *args, void *user_data,
grpc_error *error) {
connector *c = user_data; connector *c = user_data;
// TODO(roth, jboeuf): Convert security connector handshaking to use new if (error != GRPC_ERROR_NONE) {
// handshake API, and then move the code from on_secure_handshake_done() grpc_closure *notify = c->notify;
// into this function. c->notify = NULL;
c->tmp_args = args; grpc_exec_ctx_sched(exec_ctx, notify, error, NULL);
grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector, } else {
endpoint, c->args.deadline, // TODO(roth, jboeuf): Convert security connector handshaking to use new
on_secure_handshake_done, c); // handshake API, and then move the code from on_secure_handshake_done()
// into this function.
c->tmp_args = args;
grpc_channel_security_connector_do_handshake(exec_ctx,
c->security_connector,
endpoint, c->args.deadline,
on_secure_handshake_done, c);
}
} }
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,

@ -55,21 +55,28 @@ typedef struct server_connect_state {
} server_connect_state; } server_connect_state;
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data) { grpc_channel_args *args, void *user_data,
grpc_error *error) {
server_connect_state *state = user_data; server_connect_state *state = user_data;
/* if (error != GRPC_ERROR_NONE) {
* Beware that the call to grpc_create_chttp2_transport() has to happen before const char *error_str = grpc_error_string(error);
* grpc_tcp_server_destroy(). This is fine here, but similar code gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
* asynchronously doing a handshake instead of calling grpc_tcp_server_start() grpc_error_free_string(error_str);
* (as in server_secure_chttp2.c) needs to add synchronization to avoid this GRPC_ERROR_UNREF(error);
* case. grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
*/ } else {
grpc_transport *transport = // Beware that the call to grpc_create_chttp2_transport() has to happen
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0); // before grpc_tcp_server_destroy(). This is fine here, but similar code
grpc_server_setup_transport(exec_ctx, state->server, transport, // asynchronously doing a handshake instead of calling
state->accepting_pollset, // grpc_tcp_server_start() (as in server_secure_chttp2.c) needs to add
grpc_server_get_channel_args(state->server)); // synchronization to avoid this case.
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); grpc_transport *transport =
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 0);
grpc_server_setup_transport(exec_ctx, state->server, transport,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
}
// Clean up. // Clean up.
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);

@ -59,7 +59,7 @@ typedef struct server_secure_state {
grpc_tcp_server *tcp; grpc_tcp_server *tcp;
grpc_server_security_connector *sc; grpc_server_security_connector *sc;
grpc_server_credentials *creds; grpc_server_credentials *creds;
int is_shutdown; bool is_shutdown;
gpr_mu mu; gpr_mu mu;
gpr_refcount refcount; gpr_refcount refcount;
grpc_closure destroy_closure; grpc_closure destroy_closure;
@ -96,12 +96,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
grpc_endpoint *secure_endpoint, grpc_endpoint *secure_endpoint,
grpc_auth_context *auth_context) { grpc_auth_context *auth_context) {
server_secure_connect *state = statep; server_secure_connect *state = statep;
grpc_transport *transport;
if (status == GRPC_SECURITY_OK) { if (status == GRPC_SECURITY_OK) {
if (secure_endpoint) { if (secure_endpoint) {
gpr_mu_lock(&state->state->mu); gpr_mu_lock(&state->state->mu);
if (!state->state->is_shutdown) { if (!state->state->is_shutdown) {
transport = grpc_create_chttp2_transport( grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->state->server), exec_ctx, grpc_server_get_channel_args(state->state->server),
secure_endpoint, 0); secure_endpoint, 0);
grpc_arg args_to_add[2]; grpc_arg args_to_add[2];
@ -129,13 +128,26 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
} }
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
grpc_channel_args *args, void *user_data) { grpc_channel_args *args, void *user_data,
grpc_error *error) {
server_secure_connect *state = user_data; server_secure_connect *state = user_data;
if (error != GRPC_ERROR_NONE) {
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
GRPC_ERROR_UNREF(error);
grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
grpc_channel_args_destroy(args);
state_unref(state->state);
gpr_free(state);
return;
}
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
state->handshake_mgr = NULL;
// TODO(roth, jboeuf): Convert security connector handshaking to use new // TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done() // handshake API, and then move the code from on_secure_handshake_done()
// into this function. // into this function.
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
state->handshake_mgr = NULL;
state->args = args; state->args = args;
grpc_server_security_connector_do_handshake( grpc_server_security_connector_do_handshake(
exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline, exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
@ -187,7 +199,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
server_secure_state *state = statep; server_secure_state *state = statep;
grpc_tcp_server *tcp; grpc_tcp_server *tcp;
gpr_mu_lock(&state->mu); gpr_mu_lock(&state->mu);
state->is_shutdown = 1; state->is_shutdown = true;
state->destroy_callback = callback; state->destroy_callback = callback;
tcp = state->tcp; tcp = state->tcp;
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
@ -251,7 +263,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state->tcp = tcp; state->tcp = tcp;
state->sc = sc; state->sc = sc;
state->creds = grpc_server_credentials_ref(creds); state->creds = grpc_server_credentials_ref(creds);
state->is_shutdown = 0; state->is_shutdown = false;
gpr_mu_init(&state->mu); gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1); gpr_ref_init(&state->refcount, 1);

@ -130,8 +130,6 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) { grpc_handshake_manager* mgr) {
// FIXME: maybe check which handshaker is currently in progress, and
// only shut down that one?
for (size_t i = 0; i < mgr->count; ++i) { for (size_t i = 0; i < mgr->count; ++i) {
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
} }
@ -145,10 +143,18 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
// handshakers together. // handshakers together.
static void call_next_handshaker(grpc_exec_ctx* exec_ctx, static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint, grpc_endpoint* endpoint,
grpc_channel_args* args, void* user_data) { grpc_channel_args* args, void* user_data,
grpc_error* error) {
grpc_handshake_manager* mgr = user_data; grpc_handshake_manager* mgr = user_data;
GPR_ASSERT(mgr->state != NULL); GPR_ASSERT(mgr->state != NULL);
GPR_ASSERT(mgr->state->index < mgr->count); GPR_ASSERT(mgr->state->index < mgr->count);
// If we got an error, skip all remaining handshakers and invoke the
// caller-supplied callback immediately.
if (error != GRPC_ERROR_NONE) {
mgr->state->final_cb(exec_ctx, endpoint, args, mgr->state->final_user_data,
error);
return;
}
grpc_handshaker_done_cb cb = call_next_handshaker; grpc_handshaker_done_cb cb = call_next_handshaker;
// If this is the last handshaker, use the caller-supplied callback // If this is the last handshaker, use the caller-supplied callback
// and user_data instead of chaining back to this function again. // and user_data instead of chaining back to this function again.
@ -177,7 +183,7 @@ void grpc_handshake_manager_do_handshake(
if (mgr->count == 0) { if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done // No handshakers registered, so we just immediately call the done
// callback with the passed-in endpoint. // callback with the passed-in endpoint.
cb(exec_ctx, endpoint, args_copy, user_data); cb(exec_ctx, endpoint, args_copy, user_data, GRPC_ERROR_NONE);
} else { } else {
GPR_ASSERT(mgr->state == NULL); GPR_ASSERT(mgr->state == NULL);
mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state)); mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
@ -186,6 +192,6 @@ void grpc_handshake_manager_do_handshake(
mgr->state->acceptor = acceptor; mgr->state->acceptor = acceptor;
mgr->state->final_cb = cb; mgr->state->final_cb = cb;
mgr->state->final_user_data = user_data; mgr->state->final_user_data = user_data;
call_next_handshaker(exec_ctx, endpoint, args_copy, mgr); call_next_handshaker(exec_ctx, endpoint, args_copy, mgr, GRPC_ERROR_NONE);
} }
} }

@ -60,7 +60,8 @@ typedef struct grpc_handshaker grpc_handshaker;
typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx, typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint, grpc_endpoint* endpoint,
grpc_channel_args* args, grpc_channel_args* args,
void* user_data); void* user_data,
grpc_error* error);
struct grpc_handshaker_vtable { struct grpc_handshaker_vtable {
/// Destroys the handshaker. /// Destroys the handshaker.
@ -115,7 +116,7 @@ typedef struct grpc_handshake_manager grpc_handshake_manager;
grpc_handshake_manager* grpc_handshake_manager_create(); grpc_handshake_manager* grpc_handshake_manager_create();
/// Adds a handshaker to the handshake manager. /// Adds a handshaker to the handshake manager.
/// Takes ownership of \a mgr. /// Takes ownership of \a handshaker.
void grpc_handshake_manager_add(grpc_handshaker* handshaker, void grpc_handshake_manager_add(grpc_handshaker* handshaker,
grpc_handshake_manager* mgr); grpc_handshake_manager* mgr);
@ -134,8 +135,8 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
/// Does NOT take ownership of \a args. Instead, makes a copy before /// Does NOT take ownership of \a args. Instead, makes a copy before
/// invoking the first handshaker. /// invoking the first handshaker.
/// \a acceptor will be NULL for client-side handshakers. /// \a acceptor will be NULL for client-side handshakers.
/// If successful, invokes \a cb with \a user_data after all handshakers /// Invokes \a cb with \a user_data after either a handshaker fails or
/// have completed. /// all handshakers have completed successfully.
void grpc_handshake_manager_do_handshake( void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* args, grpc_endpoint* endpoint, const grpc_channel_args* args,

Loading…
Cancel
Save