Make handshaker responsible for destroying endpoint on shutdown or failure.

reviewable/pr8782/r1
Mark D. Roth 8 years ago
parent 447569490d
commit 30f698f1bc
  1. 81
      src/core/ext/client_channel/http_connect_handshaker.c
  2. 6
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  3. 2
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  4. 4
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
  5. 3
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  6. 17
      src/core/lib/channel/handshaker.c
  7. 21
      src/core/lib/channel/handshaker.h

@ -41,6 +41,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/http/format_request.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/support/env.h"
@ -55,9 +56,12 @@ typedef struct http_connect_handshaker {
gpr_refcount refcount;
gpr_mu mu;
bool shutdown;
// Endpoint and read buffer to destroy after a shutdown.
grpc_endpoint* endpoint_to_destroy;
grpc_slice_buffer* read_buffer_to_destroy;
// State saved while performing the handshake.
// args will be NULL when either there is no handshake in progress or
// when the handshaker is shutting down.
grpc_handshaker_args* args;
grpc_closure* on_handshake_done;
@ -70,9 +74,17 @@ typedef struct http_connect_handshaker {
} http_connect_handshaker;
// Unref and clean up handshaker.
static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx,
http_connect_handshaker* handshaker) {
if (gpr_unref(&handshaker->refcount)) {
gpr_mu_destroy(&handshaker->mu);
if (handshaker->endpoint_to_destroy != NULL) {
grpc_endpoint_destroy(exec_ctx, handshaker->endpoint_to_destroy);
}
if (handshaker->read_buffer_to_destroy != NULL) {
grpc_slice_buffer_destroy(handshaker->read_buffer_to_destroy);
gpr_free(handshaker->read_buffer_to_destroy);
}
gpr_free(handshaker->proxy_server);
gpr_free(handshaker->server_name);
grpc_slice_buffer_destroy(&handshaker->write_buffer);
@ -82,18 +94,42 @@ static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) {
}
}
// Set args fields to NULL, saving the endpoint and read buffer for
// later destruction.
static void cleanup_args_for_failure_locked(
http_connect_handshaker* handshaker) {
handshaker->endpoint_to_destroy = handshaker->args->endpoint;
handshaker->args->endpoint = NULL;
handshaker->read_buffer_to_destroy = handshaker->args->read_buffer;
handshaker->args->read_buffer = NULL;
grpc_channel_args_destroy(handshaker->args->args);
handshaker->args->args = NULL;
}
// Callback invoked when finished writing HTTP CONNECT request.
static void on_write_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
gpr_mu_lock(&handshaker->mu);
if (error != GRPC_ERROR_NONE || handshaker->args == NULL) {
// If the write failed, invoke the callback immediately with the error.
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done,
GRPC_ERROR_REF(error), NULL);
handshaker->args = NULL;
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
if (error == GRPC_ERROR_NONE) {
// If we were shut down after the write succeeded but before this
// callback was invoked, we need to generate our own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
} else {
GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
}
if (!handshaker->shutdown) {
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(handshaker);
}
// Invoke callback.
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
gpr_mu_unlock(&handshaker->mu);
http_connect_handshaker_unref(handshaker);
http_connect_handshaker_unref(exec_ctx, handshaker);
} else {
// Otherwise, read the response.
// The read callback inherits our ref to the handshaker.
@ -109,8 +145,21 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
http_connect_handshaker* handshaker = arg;
gpr_mu_lock(&handshaker->mu);
if (error != GRPC_ERROR_NONE || handshaker->args == NULL) {
GRPC_ERROR_REF(error); // Take ref to pass to the handshake-done callback.
if (error != GRPC_ERROR_NONE || handshaker->shutdown) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
if (error == GRPC_ERROR_NONE) {
// If we were shut down after the write succeeded but before this
// callback was invoked, we need to generate our own error.
error = GRPC_ERROR_CREATE("Handshaker shutdown");
} else {
GRPC_ERROR_REF(error); // Take ref for the handshake-done callback.
}
if (!handshaker->shutdown) {
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
cleanup_args_for_failure_locked(handshaker);
}
goto done;
}
// Add buffer to parser.
@ -172,10 +221,9 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg,
}
done:
// Invoke handshake-done callback.
handshaker->args = NULL;
grpc_exec_ctx_sched(exec_ctx, handshaker->on_handshake_done, error, NULL);
gpr_mu_unlock(&handshaker->mu);
http_connect_handshaker_unref(handshaker);
http_connect_handshaker_unref(exec_ctx, handshaker);
}
//
@ -185,16 +233,17 @@ done:
static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
http_connect_handshaker_unref(handshaker);
http_connect_handshaker_unref(exec_ctx, handshaker);
}
static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker_in) {
http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in;
gpr_mu_lock(&handshaker->mu);
if (handshaker->args != NULL) {
if (!handshaker->shutdown) {
handshaker->shutdown = true;
grpc_endpoint_shutdown(exec_ctx, handshaker->args->endpoint);
handshaker->args = NULL;
cleanup_args_for_failure_locked(handshaker);
}
gpr_mu_unlock(&handshaker->mu);
}

@ -96,11 +96,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_handshaker_args *args = arg;
connector *c = args->user_data;
if (error != GRPC_ERROR_NONE) {
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(args->args);
gpr_free(args->read_buffer);
} else {
if (error == GRPC_ERROR_NONE) {
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
GPR_ASSERT(c->result->transport);

@ -135,8 +135,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
connector *c = args->user_data;
c->tmp_args = args->args;
if (error != GRPC_ERROR_NONE) {
grpc_endpoint_destroy(exec_ctx, args->endpoint);
gpr_free(args->read_buffer);
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);

@ -62,8 +62,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
gpr_free(args->read_buffer);
} else {
// Beware that the call to grpc_create_chttp2_transport() has to happen
// before grpc_tcp_server_destroy(). This is fine here, but similar code
@ -76,9 +74,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, args->read_buffer);
grpc_channel_args_destroy(args->args);
}
// Clean up.
grpc_channel_args_destroy(args->args);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
gpr_free(state);
}

@ -123,9 +123,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
const char *error_str = grpc_error_string(error);
gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str);
grpc_error_free_string(error_str);
grpc_endpoint_destroy(exec_ctx, args->endpoint);
grpc_channel_args_destroy(args->args);
gpr_free(args->read_buffer);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp);
gpr_free(connection_state);

@ -141,25 +141,20 @@ void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr) {
gpr_mu_lock(&mgr->mu);
for (size_t i = 0; i < mgr->count; ++i) {
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[i]);
if (mgr->index > 0) {
grpc_handshaker_shutdown(exec_ctx, mgr->handshakers[mgr->index - 1]);
}
gpr_mu_unlock(&mgr->mu);
}
static void call_next_handshaker(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error);
// Helper function to call either the next handshaker or the
// on_handshake_done callback.
static void call_next_handshaker_locked(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr,
grpc_error* error) {
GPR_ASSERT(mgr->index <= mgr->count);
// If we got an error, skip all remaining handshakers and invoke the
// caller-supplied callback immediately.
// Otherwise, if this is the last handshaker, then call the on_handshake_done
// callback instead of chaining back to this function again.
// If we got an error or we've finished the last handshaker, invoke
// the on_handshake_done callback. Otherwise, call the next handshaker.
if (error != GRPC_ERROR_NONE || mgr->index == mgr->count) {
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
@ -202,6 +197,8 @@ void grpc_handshake_manager_do_handshake(
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,
gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor,
grpc_iomgr_cb_func on_handshake_done, void* user_data) {
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
// Construct handshaker args. These will be passed through all
// handshakers and eventually be freed by the on_handshake_done callback.
mgr->args.endpoint = endpoint;
@ -210,8 +207,6 @@ void grpc_handshake_manager_do_handshake(
mgr->args.read_buffer = gpr_malloc(sizeof(*mgr->args.read_buffer));
grpc_slice_buffer_init(mgr->args.read_buffer);
// Initialize state needed for calling handshakers.
gpr_mu_lock(&mgr->mu);
GPR_ASSERT(mgr->index == 0);
mgr->acceptor = acceptor;
grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr);
grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args);

@ -57,17 +57,24 @@ typedef struct grpc_handshaker grpc_handshaker;
/// Arguments passed through handshakers and to the on_handshake_done callback.
///
/// For handshakers, all members are input/output parameters; for
/// example, a handshaker may read from \a endpoint and then later
/// replace it with a wrapped endpoint. Similarly, a handshaker may
/// modify \a args.
/// example, a handshaker may read from or write to \a endpoint and
/// then later replace it with a wrapped endpoint. Similarly, a
/// handshaker may modify \a args.
///
/// A handshaker takes ownership of the members while a handshake is in
/// progress. Upon failure or shutdown of an in-progress handshaker,
/// the handshaker is responsible for destroying the members and setting
/// them to NULL before invoking the on_handshake_done callback.
///
/// For the on_handshake_done callback, all members are input arguments,
/// which the callback takes ownership of.
typedef struct {
grpc_endpoint* endpoint;
grpc_channel_args* args;
void* user_data;
grpc_slice_buffer* read_buffer;
// User data passed through the handshake manager. Not used by
// individual handshakers.
void* user_data;
} grpc_handshaker_args;
typedef struct {
@ -132,9 +139,9 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
///
/// When done, invokes \a on_handshake_done with a grpc_handshaker_args
/// object as its argument. If the callback is invoked with error !=
/// GRPC_ERROR_NONE, then handshaking failed and the resulting endpoint
/// will have already been shut down (although the caller will still be
/// responsible for destroying it).
/// GRPC_ERROR_NONE, then handshaking failed and the handshaker has done
/// the necessary clean-up. Otherwise, the callback takes ownership of
/// the arguments.
void grpc_handshake_manager_do_handshake(
grpc_exec_ctx* exec_ctx, grpc_handshake_manager* mgr,
grpc_endpoint* endpoint, const grpc_channel_args* channel_args,

Loading…
Cancel
Save