From f9b56b93f76f50c85b1897f2732d1a5bbf6727b1 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 16 Nov 2016 14:58:05 -0800 Subject: [PATCH 1/2] Change grpc_handshaker_args to be owned by the handshake manager. Also clean up hand-off semantics of endpoints. --- .../chttp2/client/insecure/channel_create.c | 2 +- .../client/secure/secure_channel_create.c | 2 +- .../chttp2/server/insecure/server_chttp2.c | 3 +-- .../server/secure/server_secure_chttp2.c | 8 +++----- src/core/lib/channel/handshaker.c | 20 ++++++++++--------- src/core/lib/channel/handshaker.h | 19 +++++++++++++++--- 6 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index c4c1e638a97..e0bce57fc28 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -97,6 +97,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_handshaker_args *args = arg; connector *c = args->user_data; if (error != GRPC_ERROR_NONE) { + grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(args->args); gpr_free(args->read_buffer); } else { @@ -107,7 +108,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, args->read_buffer); c->result->channel_args = args->args; } - gpr_free(args); grpc_closure *notify = c->notify; c->notify = NULL; grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 5101737e111..4182aa730fb 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -135,6 +135,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connector *c = args->user_data; c->tmp_args = args->args; if (error != GRPC_ERROR_NONE) { + grpc_endpoint_destroy(exec_ctx, args->endpoint); gpr_free(args->read_buffer); grpc_closure *notify = c->notify; c->notify = NULL; @@ -147,7 +148,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, c->security_connector, args->endpoint, args->read_buffer, c->args.deadline, on_secure_handshake_done, c); } - gpr_free(args); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index 669a3869ab3..5a9d4f89280 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -62,7 +62,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); - grpc_handshake_manager_shutdown(exec_ctx, state->handshake_mgr); + grpc_endpoint_destroy(exec_ctx, args->endpoint); gpr_free(args->read_buffer); } else { // Beware that the call to grpc_create_chttp2_transport() has to happen @@ -79,7 +79,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } // Clean up. grpc_channel_args_destroy(args->args); - gpr_free(args); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); gpr_free(state); } diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index c7980780f33..1d1973be8bf 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -123,17 +123,14 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, const char *error_str = grpc_error_string(error); gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); grpc_error_free_string(error_str); + grpc_endpoint_destroy(exec_ctx, args->endpoint); grpc_channel_args_destroy(args->args); gpr_free(args->read_buffer); - gpr_free(args); - grpc_handshake_manager_shutdown(exec_ctx, connection_state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp); gpr_free(connection_state); return; } - grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); - connection_state->handshake_mgr = NULL; // TODO(roth, jboeuf): Convert security connector handshaking to use new // handshake API, and then move the code from on_secure_handshake_done() // into this function. @@ -142,7 +139,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, connection_state->server_state->sc, connection_state->acceptor, args->endpoint, args->read_buffer, connection_state->deadline, on_secure_handshake_done, connection_state); - gpr_free(args); + grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); + connection_state->handshake_mgr = NULL; } static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp, diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 89130e99511..905db118be1 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -88,6 +88,8 @@ struct grpc_handshake_manager { // The final callback and user_data to invoke after the last handshaker. grpc_closure on_handshake_done; void* user_data; + // Handshaker args. + grpc_handshaker_args args; }; grpc_handshake_manager* grpc_handshake_manager_create() { @@ -205,22 +207,22 @@ void grpc_handshake_manager_do_handshake( grpc_iomgr_cb_func on_handshake_done, void* user_data) { // Construct handshaker args. These will be passed through all // handshakers and eventually be freed by the on_handshake_done callback. - grpc_handshaker_args* args = gpr_malloc(sizeof(*args)); - args->endpoint = endpoint; - args->args = grpc_channel_args_copy(channel_args); - args->read_buffer = gpr_malloc(sizeof(*args->read_buffer)); - grpc_slice_buffer_init(args->read_buffer); + mgr->args.endpoint = endpoint; + mgr->args.args = grpc_channel_args_copy(channel_args); + mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer)); + grpc_slice_buffer_init(mgr->args.read_buffer); // Initialize state needed for calling handshakers. gpr_mu_lock(&mgr->mu); GPR_ASSERT(mgr->index == 0); mgr->acceptor = acceptor; - grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, args); - grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, args); + grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, + &mgr->args); + grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args); // While chaining between handshakers, we use args->user_data to // store a pointer to the handshake manager. This will be // changed to point to the caller-supplied user_data before calling // the on_handshake_done callback. - args->user_data = mgr; + mgr->args.user_data = mgr; mgr->user_data = user_data; // Start deadline timer, which owns a ref. gpr_ref(&mgr->refs); @@ -229,6 +231,6 @@ void grpc_handshake_manager_do_handshake( on_timeout, mgr, gpr_now(GPR_CLOCK_MONOTONIC)); // Start first handshaker, which also owns a ref. gpr_ref(&mgr->refs); - call_next_handshaker_locked(exec_ctx, mgr, args, GRPC_ERROR_NONE); + call_next_handshaker_locked(exec_ctx, mgr, &mgr->args, GRPC_ERROR_NONE); gpr_mu_unlock(&mgr->mu); } diff --git a/src/core/lib/channel/handshaker.h b/src/core/lib/channel/handshaker.h index 8cad81c444e..f0614c354b9 100644 --- a/src/core/lib/channel/handshaker.h +++ b/src/core/lib/channel/handshaker.h @@ -55,7 +55,14 @@ typedef struct grpc_handshaker grpc_handshaker; /// Arguments passed through handshakers and to the on_handshake_done callback. -/// All data members are owned by the struct. +/// +/// For handshakers, all members are input/output parameters; for +/// example, a handshaker may read from \a endpoint and then later +/// replace it with a wrapped endpoint. Similarly, a handshaker may +/// modify \a args. +/// +/// For the on_handshake_done callback, all members are input arguments, +/// which the callback takes ownership of. typedef struct { grpc_endpoint* endpoint; grpc_channel_args* args; @@ -117,11 +124,17 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr); /// Invokes handshakers in the order they were added. +/// Takes ownership of \a endpoint, and then passes that ownership to +/// the \a on_handshake_done callback. /// Does NOT take ownership of \a args. Instead, makes a copy before /// invoking the first handshaker. /// \a acceptor will be NULL for client-side handshakers. -/// When done, invokes \a on_handshake_done with an argument of a -/// grpc_handshaker_args object, which the callback takes ownership of. +/// +/// When done, invokes \a on_handshake_done with a grpc_handshaker_args +/// object as its argument. If the callback is invoked with error != +/// GRPC_ERROR_NONE, then handshaking failed and the resulting endpoint +/// will have already been shut down (although the caller will still be +/// responsible for destroying it). void grpc_handshake_manager_do_handshake( grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr, grpc_endpoint* endpoint, const grpc_channel_args* channel_args, From 30a1beb0065300311a989c62ad7c21f9ab40358a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 17 Nov 2016 08:14:29 -0800 Subject: [PATCH 2/2] Fix edge cases in HTTP CONNECT handshaker. --- src/core/ext/client_channel/http_connect_handshaker.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 407b8ce0232..971bbe8944c 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -56,6 +56,8 @@ typedef struct http_connect_handshaker { gpr_mu mu; // State saved while performing the handshake. + // args will be NULL when either there is no handshake in progress or + // when the handshaker is shutting down. grpc_handshaker_args* args; grpc_closure* on_handshake_done; @@ -84,7 +86,7 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; - if (error != GRPC_ERROR_NONE) { + if (error != GRPC_ERROR_NONE || handshaker->args == NULL) { // If the write failed, invoke the callback immediately with the error. gpr_mu_lock(&handshaker->mu); grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, @@ -96,7 +98,6 @@ static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg, // Otherwise, read the response. // The read callback inherits our ref to the handshaker. gpr_mu_lock(&handshaker->mu); - GPR_ASSERT(handshaker->args != NULL); grpc_endpoint_read(exec_ctx, handshaker->args->endpoint, handshaker->args->read_buffer, &handshaker->response_read_closure); @@ -109,8 +110,7 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { http_connect_handshaker* handshaker = arg; gpr_mu_lock(&handshaker->mu); - GPR_ASSERT(handshaker->args != NULL); - if (error != GRPC_ERROR_NONE) { + if (error != GRPC_ERROR_NONE || handshaker->args == NULL) { GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback. goto done; }