Use handshake_manager API in client and server.

pull/7395/head
Mark D. Roth 9 years ago
parent 7c2a58c107
commit dba5d2708c
  1. 34
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  2. 28
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  3. 42
      src/core/ext/transport/chttp2/server/insecure/server_chttp2.c
  4. 32
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  5. 2
      src/core/lib/channel/handshaker.h

@ -45,6 +45,7 @@
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/surface/api_trace.h"
@ -63,6 +64,8 @@ typedef struct {
grpc_endpoint *tcp;
grpc_closure connected;
grpc_handshake_manager *handshake_mgr;
} connector;
static void connector_ref(grpc_connector *con) {
@ -74,6 +77,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
}
@ -83,6 +87,19 @@ static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
connector_unref(exec_ctx, arg);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
connector *c = arg;
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, endpoint, 1);
GPR_ASSERT(c->result->transport);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL, 0);
c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
grpc_closure *notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector *c = arg;
grpc_closure *notify;
@ -97,19 +114,17 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
connector_ref(arg);
grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp,
c->args.deadline, on_handshake_done,
c);
}
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1);
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
0);
GPR_ASSERT(c->result->transport);
c->result->channel_args = grpc_channel_args_copy(c->args.channel_args);
} else {
memset(c->result, 0, sizeof(*c->result));
notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
notify = c->notify;
c->notify = NULL;
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(error), NULL);
}
static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
@ -171,6 +186,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
gpr_ref_init(&c->refs, 1);
c->handshake_mgr = grpc_handshake_manager_create();
args->args = final_args;
s = grpc_subchannel_create(exec_ctx, &c->base, args);
grpc_connector_unref(exec_ctx, &c->base);

@ -44,6 +44,7 @@
#include "src/core/ext/client_config/resolver_registry.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
@ -69,6 +70,8 @@ typedef struct {
grpc_endpoint *newly_connecting_endpoint;
grpc_closure connected_closure;
grpc_handshake_manager *handshake_mgr;
} connector;
static void connector_ref(grpc_connector *con) {
@ -80,6 +83,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
connector *c = (connector *)con;
if (gpr_unref(&c->refs)) {
/* c->initial_string_buffer does not need to be destroyed */
grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr);
gpr_free(c);
}
}
@ -116,12 +120,23 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
connector *c = arg;
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
grpc_channel_security_connector_do_handshake(exec_ctx, c->security_connector,
endpoint, c->args.deadline,
on_secure_handshake_done, c);
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
connector *c = arg;
grpc_channel_security_connector_do_handshake(
exec_ctx, c->security_connector, c->connecting_endpoint, c->args.deadline,
on_secure_handshake_done, c);
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr,
c->connecting_endpoint, c->args.deadline,
on_handshake_done, c);
}
static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
@ -142,9 +157,9 @@ static void connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
&c->initial_string_sent);
} else {
grpc_channel_security_connector_do_handshake(
exec_ctx, c->security_connector, tcp, c->args.deadline,
on_secure_handshake_done, c);
grpc_handshake_manager_do_handshake(exec_ctx, c->handshake_mgr, tcp,
c->args.deadline, on_handshake_done,
c);
}
} else {
memset(c->result, 0, sizeof(*c->result));
@ -227,6 +242,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
memset(c, 0, sizeof(*c));
c->base.vtable = &connector_vtable;
c->security_connector = f->security_connector;
c->handshake_mgr = grpc_handshake_manager_create();
gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1);
args->args = final_args;

@ -38,15 +38,23 @@
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h"
static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
grpc_endpoint *tcp, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
typedef struct server_connect_state {
grpc_server *server;
grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor;
grpc_handshake_manager *handshake_mgr;
} server_connect_state;
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
server_connect_state *state = arg;
/*
* Beware that the call to grpc_create_chttp2_transport() has to happen before
* grpc_tcp_server_destroy(). This is fine here, but similar code
@ -55,17 +63,37 @@ static void new_transport(grpc_exec_ctx *exec_ctx, void *server,
* case.
*/
grpc_transport *transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(server), tcp, 0);
grpc_server_setup_transport(exec_ctx, server, transport, accepting_pollset,
grpc_server_get_channel_args(server));
exec_ctx, grpc_server_get_channel_args(state->server), endpoint, 0);
grpc_server_setup_transport(exec_ctx, state->server, transport,
state->accepting_pollset,
grpc_server_get_channel_args(state->server));
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
// Clean up.
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
gpr_free(state);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *server, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
server_connect_state *state = gpr_malloc(sizeof(server_connect_state));
state->server = server;
state->accepting_pollset = accepting_pollset;
state->acceptor = acceptor;
state->handshake_mgr = grpc_handshake_manager_create();
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
const gpr_timespec deadline = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(120, GPR_TIMESPAN));
grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp,
deadline, on_handshake_done, state);
}
/* Server callback: start listening on our ports */
static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_pollset **pollsets, size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, new_transport,
grpc_tcp_server_start(exec_ctx, tcp, pollsets, pollset_count, on_accept,
server);
}

@ -42,6 +42,7 @@
#include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -68,6 +69,10 @@ typedef struct server_secure_state {
typedef struct server_secure_connect {
server_secure_state *state;
grpc_pollset *accepting_pollset;
grpc_tcp_server_acceptor *acceptor;
gpr_timespec deadline; // FIXME: remove when we eliminate
// grpc_server_security_connector_do_handshake()
grpc_handshake_manager *handshake_mgr;
} server_secure_connect;
static void state_ref(server_secure_state *state) { gpr_ref(&state->refcount); }
@ -122,6 +127,19 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
gpr_free(state);
}
static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint,
void *arg) {
server_secure_connect *state = arg;
grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr);
state->handshake_mgr = NULL;
// TODO(roth, jboeuf): Convert security connector handshaking to use new
// handshake API, and then move the code from on_secure_handshake_done()
// into this function.
grpc_server_security_connector_do_handshake(
exec_ctx, state->state->sc, state->acceptor, endpoint, state->deadline,
on_secure_handshake_done, state);
}
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) {
@ -129,11 +147,15 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *statep, grpc_endpoint *tcp,
state->state = statep;
state_ref(state->state);
state->accepting_pollset = accepting_pollset;
grpc_server_security_connector_do_handshake(
exec_ctx, state->state->sc, acceptor, tcp,
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(120, GPR_TIMESPAN)),
on_secure_handshake_done, state);
state->acceptor = acceptor;
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
state->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(120, GPR_TIMESPAN));
state->handshake_mgr = grpc_handshake_manager_create();
grpc_handshake_manager_do_handshake(exec_ctx, state->handshake_mgr, tcp,
state->deadline, on_handshake_done,
state);
}
/* Server callback: start listening on our ports */

@ -40,6 +40,8 @@
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
// FIXME: high-level documentation
//
// grpc_handshaker -- API for initial handshaking for a new connection
//

Loading…
Cancel
Save