Plumb channel args through handshakers.

pull/7395/head
Mark D. Roth 9 years ago
parent ef31ba1e03
commit 45015dc8da
  1. 12
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  2. 21
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  3. 10
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
  4. 31
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  5. 2
      src/core/lib/channel/channel_args.h
  6. 35
      src/core/lib/channel/handshaker.c
  7. 67
      src/core/lib/channel/handshaker.h

@ -88,13 +88,13 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
connector *c = arg;
grpc_channel_args* args, void *user_data) {
connector *c = user_data;
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, endpoint, 1);
grpc_create_chttp2_transport(exec_ctx, args, endpoint, 1);
GPR_ASSERT(c->result->transport);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0);
c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
c->result->channel_args = args;
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
@ -102,7 +102,6 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
if (tcp != NULL) {
if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
@ -116,12 +115,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
&c->initial_string_sent);
} else {
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp,
c->args.channel_args,
c->args.deadline, on_handshake_done,
c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}

@ -72,6 +72,9 @@ typedef struct {
grpc_closure connected_closure;
grpc_handshake_manager *handshake_mgr;
// TODO(roth): Remove once we eliminate on_secure_handshake_done().
grpc_channel_args* tmp_args;
} connector;
static void connector_ref(grpc_connector *con) {
@ -83,6 +86,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
grpc_channel_args_destroy(c->tmp_args);
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
@ -93,7 +97,6 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *secure_endpoint,
grpc_auth_context *auth_context) {
connector *c = arg;
grpc_closure *notify;
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
@ -113,19 +116,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
0);
auth_context_arg = grpc_auth_context_to_arg(auth_context);
c->result->channel_args = grpc_channel_args_copy_and_add(
c->args.channel_args, &auth_context_arg, 1);
c->tmp_args, &auth_context_arg, 1);
}
notify = c->notify;
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
connector *c = arg;
grpc_channel_args* args, void *user_data) {
connector *c = user_data;
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
c->tmp_args = args;
grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
endpoint, c->args.deadline,
on_secure_handshake_done, c);
@ -135,13 +139,13 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
connector *c = arg;
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr,
c->connecting_endpoint, c->args.deadline,
c->connecting_endpoint,
c->args.channel_args, c->args.deadline,
on_handshake_done, c);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
if (tcp != NULL) {
gpr_mu_lock(&c->mu);
@ -158,12 +162,13 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
&c->initial_string_sent);
} else {
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp,
c->args.channel_args,
c->args.deadline, on_handshake_done,
c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}

@ -37,7 +37,9 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -53,8 +55,8 @@ typedef struct server_connect_state {
} server_connect_state;
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
server_connect_state *state = arg;
grpc_channel_args* args, void *user_data) {
server_connect_state *state = user_data;
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@ -63,12 +65,13 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
* case.
*/
grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->server), endpoint, 0);
exec_ctx, args, endpoint, 0);
grpc_server_setup_transport(exec_ctx, state->server, transport,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
// Clean up.
grpc_channel_args_destroy(args);
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
gpr_free(state);
}
@ -86,6 +89,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp,
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp,
grpc_server_get_channel_args(server),
deadline, on_handshake_done, state);
}

@ -70,9 +70,11 @@ typedef struct server_secure_connect {
server_secure_state *state;
grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor;
gpr_timespec deadline; // FIXME: remove when we eliminate
// grpc_server_security_connector_do_handshake()
grpc_handshake_manager *handshake_mgr;
// TODO(roth): Remove the following two fields when we eliminate
// grpc_server_security_connector_do_handshake().
gpr_timespec deadline;
grpc_channel_args* args;
} server_secure_connect;
static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); }
@ -102,13 +104,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->state->server),
secure_endpoint, 0);
grpc_channel_args *args_copy;
grpc_arg args_to_add[2];
args_to_add[0] = grpc_server_credentials_to_arg(state->state->creds);
args_to_add[1] = grpc_auth_context_to_arg(auth_context);
args_copy = grpc_channel_args_copy_and_add(
grpc_server_get_channel_args(state->state->server), args_to_add,
GPR_ARRAY_SIZE(args_to_add));
grpc_channel_args *args_copy = grpc_channel_args_copy_and_add(
state->args, args_to_add, GPR_ARRAY_SIZE(args_to_add));
grpc_server_setup_transport(exec_ctx, state->state->server, transport,
state->accepting_pollset, args_copy);
grpc_channel_args_destroy(args_copy);
@ -123,18 +123,20 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
} else {
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
grpc_channel_args_destroy(state->args);
state_unref(state->state);
gpr_free(state);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
server_secure_connect *state = arg;
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
state->handshake_mgr = NULL;
grpc_channel_args* args, void *user_data) {
server_secure_connect *state = user_data;
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
state->handshake_mgr = NULL;
state->args = args;
grpc_server_security_connector_do_handshake(
exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
on_secure_handshake_done, state);
@ -148,14 +150,15 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
state_ref(state->state);
state->accepting_pollset = accepting_pollset;
state->acceptor = acceptor;
state->handshake_mgr = grpc_handshake_manager_create();
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(120, GPR_TIMESPAN));
state->handshake_mgr = grpc_handshake_manager_create();
grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp,
state->deadline, on_handshake_done,
state);
grpc_handshake_manager_do_handshake(
exec_ctx, state->handshake_mgr, tcp,
grpc_server_get_channel_args(state->state->server), state->deadline,
on_handshake_done, state);
}
/* Server callback: start listening on our ports */

@ -37,6 +37,8 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
// Channel args are intentionally immutable, to avoid the need for locking.
/** Copy the arguments in \a src into a new instance */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);

@ -36,6 +36,7 @@
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
//
@ -60,10 +61,11 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
gpr_timespec deadline,
grpc_handshaker_done_cb cb, void* arg) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, deadline, cb,
arg);
grpc_handshaker_done_cb cb, void* user_data) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, endpoint, args,
deadline, cb, user_data);
}
//
@ -76,9 +78,9 @@ struct grpc_handshaker_state {
size_t index;
// The deadline for all handshakers.
gpr_timespec deadline;
// The final callback and arg to invoke after the last handshaker.
// The final callback and user_data to invoke after the last handshaker.
grpc_handshaker_done_cb final_cb;
void* final_arg;
void* final_user_data;
};
struct grpc_handshake_manager {
@ -126,20 +128,23 @@ void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
// A function used as the handshaker-done callback when chaining
// handshakers together.
static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint, void* arg) {
grpc_handshake_manager* mgr = arg;
grpc_endpoint* endpoint,
grpc_channel_args* args,
void* user_data) {
grpc_handshake_manager* mgr = user_data;
GPR_ASSERT(mgr->state != NULL);
GPR_ASSERT(mgr->state->index < mgr->count);
grpc_handshaker_done_cb cb = call_next_handshaker;
// If this is the last handshaker, use the caller-supplied callback
// and arg instead of chaining back to this function again.
// and user_data instead of chaining back to this function again.
if (mgr->state->index == mgr->count - 1) {
cb = mgr->state->final_cb;
arg = mgr->state->final_arg;
user_data = mgr->state->final_user_data;
}
// Invoke handshaker.
grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->state->index],
endpoint, mgr->state->deadline, cb, arg);
endpoint, args, mgr->state->deadline, cb,
user_data);
++mgr->state->index;
// If this is the last handshaker, clean up state.
if (mgr->state->index == mgr->count) {
@ -151,20 +156,22 @@ static void call_next_handshaker(grpc_exec_ctx* exec_ctx,
void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr,
grpc_endpoint* endpoint,
const grpc_channel_args* args,
gpr_timespec deadline,
grpc_handshaker_done_cb cb,
void* arg) {
void* user_data) {
grpc_channel_args* args_copy = grpc_channel_args_copy(args);
if (mgr->count == 0) {
// No handshakers registered, so we just immediately call the done
// callback with the passed-in endpoint.
cb(exec_ctx, endpoint, arg);
cb(exec_ctx, endpoint, args_copy, user_data);
} else {
GPR_ASSERT(mgr->state == NULL);
mgr->state = gpr_malloc(sizeof(struct grpc_handshaker_state));
memset(mgr->state, 0, sizeof(*mgr->state));
mgr->state->deadline = deadline;
mgr->state->final_cb = cb;
mgr->state->final_arg = arg;
call_next_handshaker(exec_ctx, endpoint, mgr);
mgr->state->final_user_data = user_data;
call_next_handshaker(exec_ctx, endpoint, args_copy, mgr);
}
}

@ -34,44 +34,62 @@
#ifndef GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
#define GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/impl/codegen/time.h>
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
// FIXME: high-level documentation
/// Handshakers are used to perform initial handshakes on a connection
/// before the client sends the initial request. Some examples of what
/// a handshaker can be used for includes support for HTTP CONNECT on
/// the client side and various types of security initialization.
///
/// In general, handshakers should be used via a handshake manager.
//
// grpc_handshaker -- API for initial handshaking for a new connection
//
// FIXME: document
///
/// grpc_handshaker
///
typedef struct grpc_handshaker grpc_handshaker;
/// Callback type invoked when a handshaker is done.
/// Takes ownership of \a args.
typedef void (*grpc_handshaker_done_cb)(grpc_exec_ctx* exec_ctx,
grpc_endpoint* endpoint, void* arg);
grpc_endpoint* endpoint,
grpc_channel_args* args,
void* user_data);
struct grpc_handshaker_vtable {
/// Destroys the handshaker.
void (*destroy)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
/// Shuts down the handshaker (e.g., to clean up when the operation is
/// aborted in the middle).
void (*shutdown)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker);
/// Performs handshaking. When finished, calls \a cb with \a user_data.
/// Takes ownership of \a args.
void (*do_handshake)(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker,
grpc_endpoint* endpoint, gpr_timespec deadline,
grpc_handshaker_done_cb cb, void* arg);
grpc_endpoint* endpoint, grpc_channel_args* args,
gpr_timespec deadline, grpc_handshaker_done_cb cb,
void* user_data);
};
/// Base struct. To subclass, make this the first member of the
/// implementation struct.
struct grpc_handshaker {
const struct grpc_handshaker_vtable* vtable;
};
// Called by concrete implementations to initialize the base struct.
/// Called by concrete implementations to initialize the base struct.
void grpc_handshaker_init(const struct grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker);
// Convenient wrappers for invoking methods via the vtable.
/// Convenient wrappers for invoking methods via the vtable.
/// These probably do not need to be called from anywhere but
/// grpc_handshake_manager.
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
@ -79,31 +97,46 @@ void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_endpoint* endpoint,
grpc_channel_args* args,
gpr_timespec deadline,
grpc_handshaker_done_cb cb, void* arg);
grpc_handshaker_done_cb cb, void* user_data);
//
// grpc_handshake_manager -- manages a set of handshakers
//
///
/// grpc_handshake_manager
///
typedef struct grpc_handshake_manager grpc_handshake_manager;
/// Creates a new handshake manager. Caller takes ownership.
grpc_handshake_manager* grpc_handshake_manager_create();
// Handshakers will be invoked in the order added.
/// Adds a handshaker to the handshake manager.
/// Takes ownership of \a mgr.
void grpc_handshake_manager_add(grpc_handshaker* handshaker,
grpc_handshake_manager* mgr);
/// Destroys the handshake manager.
void grpc_handshake_manager_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
/// Shuts down the handshake manager (e.g., to clean up when the operation is
/// aborted in the middle).
/// The caller must still call grpc_handshake_manager_destroy() after
/// calling this function.
void grpc_handshake_manager_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr);
/// Invokes handshakers in the order they were added.
/// Does NOT take ownership of \a args. Instead, makes a copy before
/// invoking the first handshaker.
/// If successful, invokes \a cb with \a user_data after all handshakers
/// have completed.
void grpc_handshake_manager_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshake_manager* mgr,
grpc_endpoint* endpoint,
const grpc_channel_args* args,
gpr_timespec deadline,
grpc_handshaker_done_cb cb, void* arg);
grpc_handshaker_done_cb cb,
void* user_data);
#endif /* GRPC_CORE_LIB_CHANNEL_HANDSHAKER_H */

Loading…
Cancel
Save