diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index c9861a5aed2..48990f9dacc 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -41,6 +41,7 @@ #include #include "src/core/ext/client_channel/uri_parser.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/support/env.h" @@ -55,9 +56,12 @@ typedef struct http_connect_handshaker { gpr_refcount refcount; gpr_mu mu; + bool shutdown; + // Endpoint and read buffer to destroy after a shutdown. + grpc_endpoint* endpoint_to_destroy; + grpc_slice_buffer* read_buffer_to_destroy; + // State saved while performing the handshake. - // args will be NULL when either there is no handshake in progress or - // when the handshaker is shutting down. grpc_handshaker_args* args; grpc_closure* on_handshake_done; @@ -70,9 +74,17 @@ typedef struct http_connect_handshaker { } http_connect_handshaker; // Unref and clean up handshaker. -static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { +static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { gpr_mu_destroy(&handshaker->mu); + if (handshaker->endpoint_to_destroy != NULL) { + grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy); + } + if (handshaker->read_buffer_to_destroy != NULL) { + grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy); + gpr_free(handshaker->read_buffer_to_destroy); + } gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); grpc_slice_buffer_destroy(&handshaker->write_buffer); @@ -82,18 +94,42 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { } } +// Set args fields to NULL, saving the endpoint and read buffer for +// later destruction. +static void cleanup_args_for_failure_locked( + http_connect_handshaker* handshaker) { + handshaker->endpoint_to_destroy = handshaker->args->endpoint; + handshaker->args->endpoint = NULL; + handshaker->read_buffer_to_destroy = handshaker->args->read_buffer; + handshaker->args->read_buffer = NULL; + grpc_channel_args_destroy(handshaker->args->args); + handshaker->args->args = NULL; +} + // Callback invoked when finished writing HTTP CONNECT request. static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; gpr_mu_lock(&handshaker->mu); - if (error != GRPC_ERROR_NONE || handshaker->args == NULL) { - // If the write failed, invoke the callback immediately with the error. - grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, - GRPC_ERROR_REF(error), NULL); - handshaker->args = NULL; + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the write failed or we're shutting down, clean up and invoke the + // callback with the error. + if (error == GRPC_ERROR_NONE) { + // If we were shut down after the write succeeded but before this + // callback was invoked, we need to generate our own error. + error = GRPC_ERROR_CREATE("Handshaker shutdown"); + } else { + GRPC_ERROR_REF(error); // Take ref for the handshake-done callback. + } + if (!handshaker->shutdown) { + // Not shutting down, so the write failed. Clean up before + // invoking the callback. + cleanup_args_for_failure_locked(handshaker); + } + // Invoke callback. + grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); gpr_mu_unlock(&handshaker->mu); - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } else { // Otherwise, read the response. // The read callback inherits our ref to the handshaker. @@ -109,8 +145,21 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; gpr_mu_lock(&handshaker->mu); - if (error != GRPC_ERROR_NONE || handshaker->args == NULL) { - GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. + if (error != GRPC_ERROR_NONE || handshaker->shutdown) { + // If the write failed or we're shutting down, clean up and invoke the + // callback with the error. + if (error == GRPC_ERROR_NONE) { + // If we were shut down after the write succeeded but before this + // callback was invoked, we need to generate our own error. + error = GRPC_ERROR_CREATE("Handshaker shutdown"); + } else { + GRPC_ERROR_REF(error); // Take ref for the handshake-done callback. + } + if (!handshaker->shutdown) { + // Not shutting down, so the write failed. Clean up before + // invoking the callback. + cleanup_args_for_failure_locked(handshaker); + } goto done; } // Add buffer to parser. @@ -172,10 +221,9 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, } done: // Invoke handshake-done callback. - handshaker->args = NULL; grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL); gpr_mu_unlock(&handshaker->mu); - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } // @@ -185,16 +233,17 @@ done: static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; gpr_mu_lock(&handshaker->mu); - if (handshaker->args != NULL) { + if (!handshaker->shutdown) { + handshaker->shutdown = true; grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint); - handshaker->args = NULL; + cleanup_args_for_failure_locked(handshaker); } gpr_mu_unlock(&handshaker->mu); } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index e0bce57fc28..00b272de270 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -96,11 +96,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; connector *c = args->user_data; - if (error != GRPC_ERROR_NONE) { - grpc_endpoint_destroy(exec_ctx, args->endpoint); - grpc_channel_args_destroy(args->args); - gpr_free(args->read_buffer); - } else { + if (error == GRPC_ERROR_NONE) { c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); GPR_ASSERT(c->result->transport); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 4182aa730fb..b4a30f94fc8 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -135,8 +135,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connector *c = args->user_data; c->tmp_args = args->args; if (error != GRPC_ERROR_NONE) { - grpc_endpoint_destroy(exec_ctx, args->endpoint); - gpr_free(args->read_buffer); grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 5a9d4f89280..1b38d4decdc 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -62,8 +62,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - grpc_endpoint_destroy(exec_ctx, args->endpoint); - gpr_free(args->read_buffer); } else { // Beware that the call to grpc_create_chttp2_transport() has to happen // before grpc_tcp_server_destroy(). This is fine here, but similar code @@ -76,9 +74,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, state->accepting_pollset, grpc_server_get_channel_args(state->server)); grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer); + grpc_channel_args_destroy(args->args); } // Clean up. - grpc_channel_args_destroy(args->args); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); gpr_free(state); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 1d1973be8bf..22af94199f2 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -123,9 +123,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - grpc_endpoint_destroy(exec_ctx, args->endpoint); - grpc_channel_args_destroy(args->args); - gpr_free(args->read_buffer); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); gpr_free(connection_state); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 3c125a22f37..f3bd91284ee 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -141,25 +141,20 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx, void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr) { gpr_mu_lock(&mgr->mu); - for (size_t i = 0; i < mgr->count; ++i) { - grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]); + if (mgr->index > 0) { + grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]); } gpr_mu_unlock(&mgr->mu); } -static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error); - // Helper function to call either the next handshaker or the // on_handshake_done callback. static void call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_error* error) { GPR_ASSERT(mgr->index <= mgr->count); - // If we got an error, skip all remaining handshakers and invoke the - // caller-supplied callback immediately. - // Otherwise, if this is the last handshaker, then call the on_handshake_done - // callback instead of chaining back to this function again. + // If we got an error or we've finished the last handshaker, invoke + // the on_handshake_done callback. Otherwise, call the next handshaker. if (error != GRPC_ERROR_NONE || mgr->index == mgr->count) { // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. @@ -202,6 +197,8 @@ void grpc_handshake_manager_do_handshake( grpc_endpoint* endpoint, const grpc_channel_args* channel_args, gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_iomgr_cb_func on_handshake_done, void* user_data) { + gpr_mu_lock(&mgr->mu); + GPR_ASSERT(mgr->index == 0); // Construct handshaker args. These will be passed through all // handshakers and eventually be freed by the on_handshake_done callback. mgr->args.endpoint = endpoint; @@ -210,8 +207,6 @@ void grpc_handshake_manager_do_handshake( mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer)); grpc_slice_buffer_init(mgr->args.read_buffer); // Initialize state needed for calling handshakers. - gpr_mu_lock(&mgr->mu); - GPR_ASSERT(mgr->index == 0); mgr->acceptor = acceptor; grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr); grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args); diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index f0614c354b9..2e1f5435121 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -57,17 +57,24 @@ typedef struct grpc_handshaker grpc_handshaker; /// Arguments passed through handshakers and to the on_handshake_done callback. /// /// For handshakers, all members are input/output parameters; for -/// example, a handshaker may read from \a endpoint and then later -/// replace it with a wrapped endpoint. Similarly, a handshaker may -/// modify \a args. +/// example, a handshaker may read from or write to \a endpoint and +/// then later replace it with a wrapped endpoint. Similarly, a +/// handshaker may modify \a args. +/// +/// A handshaker takes ownership of the members while a handshake is in +/// progress. Upon failure or shutdown of an in-progress handshaker, +/// the handshaker is responsible for destroying the members and setting +/// them to NULL before invoking the on_handshake_done callback. /// /// For the on_handshake_done callback, all members are input arguments, /// which the callback takes ownership of. typedef struct { grpc_endpoint* endpoint; grpc_channel_args* args; - void* user_data; grpc_slice_buffer* read_buffer; + // User data passed through the handshake manager. Not used by + // individual handshakers. + void* user_data; } grpc_handshaker_args; typedef struct { @@ -132,9 +139,9 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, /// /// When done, invokes \a on_handshake_done with a grpc_handshaker_args /// object as its argument. If the callback is invoked with error != -/// GRPC_ERROR_NONE, then handshaking failed and the resulting endpoint -/// will have already been shut down (although the caller will still be -/// responsible for destroying it). +/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done +/// the necessary clean-up. Otherwise, the callback takes ownership of +/// the arguments. void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args,