Merge pull request #8979 from markdroth/handshaker_cleanup

Address comments from #8913.  Also make changes needed for import.
pull/9019/head
Mark D. Roth 8 years ago committed by GitHub
commit be37dff6d3
  1. 18
      src/core/ext/transport/chttp2/client/chttp2_connector.c
  2. 10
      src/core/ext/transport/chttp2/client/chttp2_connector.h
  3. 2
      src/core/ext/transport/chttp2/client/insecure/channel_create.c
  4. 11
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
  5. 10
      src/core/ext/transport/chttp2/server/chttp2_server.c
  6. 4
      src/core/ext/transport/chttp2/server/chttp2_server.h
  7. 6
      src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c
  8. 18
      src/core/lib/channel/handshaker.c
  9. 10
      src/core/lib/channel/handshaker.h
  10. 17
      src/core/lib/http/httpcli_security_connector.c
  11. 3
      src/core/lib/iomgr/tcp_server.h
  12. 16
      src/core/lib/iomgr/tcp_server_posix.c
  13. 9
      src/core/lib/iomgr/tcp_server_uv.c
  14. 8
      src/core/lib/iomgr/tcp_server_windows.c
  15. 59
      src/core/lib/security/transport/security_connector.c
  16. 18
      src/core/lib/security/transport/security_connector.h
  17. 20
      src/core/lib/security/transport/security_handshaker.c
  18. 13
      src/core/lib/security/transport/security_handshaker.h
  19. 1
      test/core/client_channel/set_initial_connect_string_test.c
  20. 1
      test/core/end2end/bad_server_response_test.c
  21. 1
      test/core/end2end/fixtures/http_proxy.c
  22. 1
      test/core/iomgr/tcp_server_posix_test.c
  23. 3
      test/core/security/ssl_server_fuzzer.c
  24. 2
      test/core/surface/concurrent_connectivity_test.c
  25. 1
      test/core/util/reconnect_server.c

@ -59,8 +59,8 @@ typedef struct {
bool connecting; bool connecting;
char *server_name; char *server_name;
grpc_chttp2_create_handshakers_func create_handshakers; grpc_chttp2_add_handshakers_func add_handshakers;
void *create_handshakers_user_data; void *add_handshakers_user_data;
grpc_closure *notify; grpc_closure *notify;
grpc_connect_in_args args; grpc_connect_in_args args;
@ -160,9 +160,9 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx,
grpc_http_connect_handshaker_create(proxy_name, c->server_name)); grpc_http_connect_handshaker_create(proxy_name, c->server_name));
gpr_free(proxy_name); gpr_free(proxy_name);
} }
if (c->create_handshakers != NULL) { if (c->add_handshakers != NULL) {
c->create_handshakers(exec_ctx, c->create_handshakers_user_data, c->add_handshakers(exec_ctx, c->add_handshakers_user_data,
c->handshake_mgr); c->handshake_mgr);
} }
grpc_handshake_manager_do_handshake( grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args,
@ -255,15 +255,15 @@ static const grpc_connector_vtable chttp2_connector_vtable = {
grpc_connector *grpc_chttp2_connector_create( grpc_connector *grpc_chttp2_connector_create(
grpc_exec_ctx *exec_ctx, const char *server_name, grpc_exec_ctx *exec_ctx, const char *server_name,
grpc_chttp2_create_handshakers_func create_handshakers, grpc_chttp2_add_handshakers_func add_handshakers,
void *create_handshakers_user_data) { void *add_handshakers_user_data) {
chttp2_connector *c = gpr_malloc(sizeof(*c)); chttp2_connector *c = gpr_malloc(sizeof(*c));
memset(c, 0, sizeof(*c)); memset(c, 0, sizeof(*c));
c->base.vtable = &chttp2_connector_vtable; c->base.vtable = &chttp2_connector_vtable;
gpr_mu_init(&c->mu); gpr_mu_init(&c->mu);
gpr_ref_init(&c->refs, 1); gpr_ref_init(&c->refs, 1);
c->server_name = gpr_strdup(server_name); c->server_name = gpr_strdup(server_name);
c->create_handshakers = create_handshakers; c->add_handshakers = add_handshakers;
c->create_handshakers_user_data = create_handshakers_user_data; c->add_handshakers_user_data = add_handshakers_user_data;
return &c->base; return &c->base;
} }

@ -38,15 +38,15 @@
#include "src/core/lib/channel/handshaker.h" #include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/exec_ctx.h"
typedef void (*grpc_chttp2_create_handshakers_func)( typedef void (*grpc_chttp2_add_handshakers_func)(
grpc_exec_ctx* exec_ctx, void* user_data, grpc_exec_ctx* exec_ctx, void* user_data,
grpc_handshake_manager* handshake_mgr); grpc_handshake_manager* handshake_mgr);
/// If \a create_handshakers is non-NULL, it will be called with /// If \a add_handshakers is non-NULL, it will be called with
/// \a create_handshakers_user_data to add handshakers. /// \a add_handshakers_user_data to add handshakers.
grpc_connector* grpc_chttp2_connector_create( grpc_connector* grpc_chttp2_connector_create(
grpc_exec_ctx* exec_ctx, const char* server_name, grpc_exec_ctx* exec_ctx, const char* server_name,
grpc_chttp2_create_handshakers_func create_handshakers, grpc_chttp2_add_handshakers_func add_handshakers,
void* create_handshakers_user_data); void* add_handshakers_user_data);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */ #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_CLIENT_CHTTP2_CONNECTOR_H */

@ -54,7 +54,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const grpc_subchannel_args *args) { const grpc_subchannel_args *args) {
grpc_connector *connector = grpc_chttp2_connector_create( grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, NULL /* create_handshakers */, exec_ctx, args->server_name, NULL /* add_handshakers */,
NULL /* user_data */); NULL /* user_data */);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector); grpc_connector_unref(exec_ctx, connector);

@ -69,11 +69,10 @@ static void client_channel_factory_unref(
} }
} }
static void create_handshakers(grpc_exec_ctx *exec_ctx, static void add_handshakers(grpc_exec_ctx *exec_ctx, void *security_connector,
void *security_connector, grpc_handshake_manager *handshake_mgr) {
grpc_handshake_manager *handshake_mgr) { grpc_channel_security_connector_add_handshakers(exec_ctx, security_connector,
grpc_channel_security_connector_create_handshakers( handshake_mgr);
exec_ctx, security_connector, handshake_mgr);
} }
static grpc_subchannel *client_channel_factory_create_subchannel( static grpc_subchannel *client_channel_factory_create_subchannel(
@ -81,7 +80,7 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
const grpc_subchannel_args *args) { const grpc_subchannel_args *args) {
client_channel_factory *f = (client_channel_factory *)cc_factory; client_channel_factory *f = (client_channel_factory *)cc_factory;
grpc_connector *connector = grpc_chttp2_connector_create( grpc_connector *connector = grpc_chttp2_connector_create(
exec_ctx, args->server_name, create_handshakers, f->security_connector); exec_ctx, args->server_name, add_handshakers, f->security_connector);
grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args); grpc_subchannel *s = grpc_subchannel_create(exec_ctx, connector, args);
grpc_connector_unref(exec_ctx, connector); grpc_connector_unref(exec_ctx, connector);
return s; return s;

@ -53,13 +53,13 @@
#include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/server.h" #include "src/core/lib/surface/server.h"
void grpc_chttp2_server_handshaker_factory_create_handshakers( void grpc_chttp2_server_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
if (handshaker_factory != NULL) { if (handshaker_factory != NULL) {
handshaker_factory->vtable->create_handshakers(exec_ctx, handshaker_factory, handshaker_factory->vtable->add_handshakers(exec_ctx, handshaker_factory,
handshake_mgr); handshake_mgr);
} }
} }
@ -172,6 +172,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
gpr_mu_unlock(&connection_state->server_state->mu); gpr_mu_unlock(&connection_state->server_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server);
gpr_free(connection_state->acceptor);
gpr_free(connection_state); gpr_free(connection_state);
} }
@ -183,6 +184,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
if (state->shutdown) { if (state->shutdown) {
gpr_mu_unlock(&state->mu); gpr_mu_unlock(&state->mu);
grpc_endpoint_destroy(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp);
gpr_free(acceptor);
return; return;
} }
grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create();
@ -195,7 +197,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
connection_state->accepting_pollset = accepting_pollset; connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor; connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr; connection_state->handshake_mgr = handshake_mgr;
grpc_chttp2_server_handshaker_factory_create_handshakers( grpc_chttp2_server_handshaker_factory_add_handshakers(
exec_ctx, state->handshaker_factory, connection_state->handshake_mgr); exec_ctx, state->handshaker_factory, connection_state->handshake_mgr);
// TODO(roth): We should really get this timeout value from channel // TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it. // args instead of hard-coding it.

@ -45,7 +45,7 @@ typedef struct grpc_chttp2_server_handshaker_factory
grpc_chttp2_server_handshaker_factory; grpc_chttp2_server_handshaker_factory;
typedef struct { typedef struct {
void (*create_handshakers)( void (*add_handshakers)(
grpc_exec_ctx *exec_ctx, grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);
@ -57,7 +57,7 @@ struct grpc_chttp2_server_handshaker_factory {
const grpc_chttp2_server_handshaker_factory_vtable *vtable; const grpc_chttp2_server_handshaker_factory_vtable *vtable;
}; };
void grpc_chttp2_server_handshaker_factory_create_handshakers( void grpc_chttp2_server_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_exec_ctx *exec_ctx,
grpc_chttp2_server_handshaker_factory *handshaker_factory, grpc_chttp2_server_handshaker_factory *handshaker_factory,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);

@ -54,12 +54,12 @@ typedef struct {
grpc_server_security_connector *security_connector; grpc_server_security_connector *security_connector;
} server_security_handshaker_factory; } server_security_handshaker_factory;
static void server_security_handshaker_factory_create_handshakers( static void server_security_handshaker_factory_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf, grpc_exec_ctx *exec_ctx, grpc_chttp2_server_handshaker_factory *hf,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
server_security_handshaker_factory *handshaker_factory = server_security_handshaker_factory *handshaker_factory =
(server_security_handshaker_factory *)hf; (server_security_handshaker_factory *)hf;
grpc_server_security_connector_create_handshakers( grpc_server_security_connector_add_handshakers(
exec_ctx, handshaker_factory->security_connector, handshake_mgr); exec_ctx, handshaker_factory->security_connector, handshake_mgr);
} }
@ -74,7 +74,7 @@ static void server_security_handshaker_factory_destroy(
static const grpc_chttp2_server_handshaker_factory_vtable static const grpc_chttp2_server_handshaker_factory_vtable
server_security_handshaker_factory_vtable = { server_security_handshaker_factory_vtable = {
server_security_handshaker_factory_create_handshakers, server_security_handshaker_factory_add_handshakers,
server_security_handshaker_factory_destroy}; server_security_handshaker_factory_destroy};
int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,

@ -49,21 +49,21 @@ void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
handshaker->vtable = vtable; handshaker->vtable = vtable;
} }
static void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx, void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) { grpc_handshaker* handshaker) {
handshaker->vtable->destroy(exec_ctx, handshaker); handshaker->vtable->destroy(exec_ctx, handshaker);
} }
static void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx, void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker) { grpc_handshaker* handshaker) {
handshaker->vtable->shutdown(exec_ctx, handshaker); handshaker->vtable->shutdown(exec_ctx, handshaker);
} }
static void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx, void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker, grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor, grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done, grpc_closure* on_handshake_done,
grpc_handshaker_args* args) { grpc_handshaker_args* args) {
handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor, handshaker->vtable->do_handshake(exec_ctx, handshaker, acceptor,
on_handshake_done, args); on_handshake_done, args);
} }

@ -108,6 +108,16 @@ struct grpc_handshaker {
void grpc_handshaker_init(const grpc_handshaker_vtable* vtable, void grpc_handshaker_init(const grpc_handshaker_vtable* vtable,
grpc_handshaker* handshaker); grpc_handshaker* handshaker);
void grpc_handshaker_destroy(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_shutdown(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker);
void grpc_handshaker_do_handshake(grpc_exec_ctx* exec_ctx,
grpc_handshaker* handshaker,
grpc_tcp_server_acceptor* acceptor,
grpc_closure* on_handshake_done,
grpc_handshaker_args* args);
/// ///
/// grpc_handshake_manager /// grpc_handshake_manager
/// ///

@ -60,9 +60,9 @@ static void httpcli_ssl_destroy(grpc_security_connector *sc) {
gpr_free(sc); gpr_free(sc);
} }
static void httpcli_ssl_create_handshakers( static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
grpc_httpcli_ssl_channel_security_connector *c = grpc_httpcli_ssl_channel_security_connector *c =
(grpc_httpcli_ssl_channel_security_connector *)sc; (grpc_httpcli_ssl_channel_security_connector *)sc;
tsi_handshaker *handshaker = NULL; tsi_handshaker *handshaker = NULL;
@ -74,8 +74,9 @@ static void httpcli_ssl_create_handshakers(
tsi_result_to_string(result)); tsi_result_to_string(result));
} }
} }
grpc_security_create_handshakers(exec_ctx, handshaker, &sc->base, grpc_handshake_manager_add(
handshake_mgr); handshake_mgr,
grpc_security_handshaker_create(exec_ctx, handshaker, &sc->base));
} }
static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx,
@ -132,7 +133,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create(
*sc = NULL; *sc = NULL;
return GRPC_SECURITY_ERROR; return GRPC_SECURITY_ERROR;
} }
c->base.create_handshakers = httpcli_ssl_create_handshakers; c->base.add_handshakers = httpcli_ssl_add_handshakers;
*sc = &c->base; *sc = &c->base;
return GRPC_SECURITY_OK; return GRPC_SECURITY_OK;
} }
@ -185,8 +186,8 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(httpcli_ssl_channel_security_connector_create( GPR_ASSERT(httpcli_ssl_channel_security_connector_create(
pem_root_certs, pem_root_certs_size, host, &sc) == pem_root_certs, pem_root_certs_size, host, &sc) ==
GRPC_SECURITY_OK); GRPC_SECURITY_OK);
grpc_channel_security_connector_create_handshakers(exec_ctx, sc, grpc_channel_security_connector_add_handshakers(exec_ctx, sc,
c->handshake_mgr); c->handshake_mgr);
grpc_handshake_manager_do_handshake( grpc_handshake_manager_do_handshake(
exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline, exec_ctx, c->handshake_mgr, tcp, NULL /* channel_args */, deadline,
NULL /* acceptor */, on_handshake_done, c /* user_data */); NULL /* acceptor */, on_handshake_done, c /* user_data */);

@ -52,7 +52,8 @@ typedef struct grpc_tcp_server_acceptor {
unsigned fd_index; unsigned fd_index;
} grpc_tcp_server_acceptor; } grpc_tcp_server_acceptor;
/* Called for newly connected TCP connections. */ /* Called for newly connected TCP connections.
Takes ownership of acceptor. */
typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg, typedef void (*grpc_tcp_server_cb)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_endpoint *ep, grpc_endpoint *ep,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,

@ -381,16 +381,12 @@ error:
/* event manager callback when reads are ready */ /* event manager callback when reads are ready */
static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_tcp_listener *sp = arg; grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
grpc_pollset *read_notifier_pollset = NULL;
grpc_fd *fdobj;
if (err != GRPC_ERROR_NONE) { if (err != GRPC_ERROR_NONE) {
goto error; goto error;
} }
read_notifier_pollset = grpc_pollset *read_notifier_pollset =
sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add( sp->server->pollsets[(size_t)gpr_atm_no_barrier_fetch_add(
&sp->server->next_pollset_to_assign, 1) % &sp->server->next_pollset_to_assign, 1) %
sp->server->pollset_count]; sp->server->pollset_count];
@ -426,7 +422,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
} }
fdobj = grpc_fd_create(fd, name); grpc_fd *fdobj = grpc_fd_create(fd, name);
if (read_notifier_pollset == NULL) { if (read_notifier_pollset == NULL) {
gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
@ -435,11 +431,17 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = sp->fd_index;
sp->server->on_accept_cb( sp->server->on_accept_cb(
exec_ctx, sp->server->on_accept_cb_arg, exec_ctx, sp->server->on_accept_cb_arg,
grpc_tcp_create(fdobj, sp->server->resource_quota, grpc_tcp_create(fdobj, sp->server->resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
read_notifier_pollset, &acceptor); read_notifier_pollset, acceptor);
gpr_free(name); gpr_free(name);
gpr_free(addr_str); gpr_free(addr_str);

@ -188,7 +188,6 @@ static void accepted_connection_close_cb(uv_handle_t *handle) {
static void on_connect(uv_stream_t *server, int status) { static void on_connect(uv_stream_t *server, int status) {
grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
uv_tcp_t *client; uv_tcp_t *client;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@ -201,6 +200,7 @@ static void on_connect(uv_stream_t *server, int status) {
uv_strerror(status)); uv_strerror(status));
return; return;
} }
client = gpr_malloc(sizeof(uv_tcp_t)); client = gpr_malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client); uv_tcp_init(uv_default_loop(), client);
// UV documentation says this is guaranteed to succeed // UV documentation says this is guaranteed to succeed
@ -220,8 +220,13 @@ static void on_connect(uv_stream_t *server, int status) {
gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
} }
ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); acceptor);
grpc_exec_ctx_finish(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx);
} }
} }

@ -323,7 +323,6 @@ failure:
/* Event manager callback when reads are ready. */ /* Event manager callback when reads are ready. */
static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_tcp_listener *sp = arg; grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket; SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL; grpc_endpoint *ep = NULL;
@ -396,8 +395,13 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
/* The only time we should call our callback, is where we successfully /* The only time we should call our callback, is where we successfully
managed to accept a connection, and created an endpoint. */ managed to accept a connection, and created an endpoint. */
if (ep) { if (ep) {
// Create acceptor.
grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
acceptor->from_server = sp->server;
acceptor->port_index = sp->port_index;
acceptor->fd_index = 0;
sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
&acceptor); acceptor);
} }
/* As we were notified from the IOCP of one and exactly one accept, /* As we were notified from the IOCP of one and exactly one accept,
the former socked we created has now either been destroy or assigned the former socked we created has now either been destroy or assigned

@ -43,6 +43,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/alpn/alpn.h" #include "src/core/ext/transport/chttp2/alpn/alpn.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/load_file.h" #include "src/core/lib/iomgr/load_file.h"
#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/credentials.h"
@ -111,19 +112,19 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL; return NULL;
} }
void grpc_channel_security_connector_create_handshakers( void grpc_channel_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
if (connector != NULL) { if (connector != NULL) {
connector->create_handshakers(exec_ctx, connector, handshake_mgr); connector->add_handshakers(exec_ctx, connector, handshake_mgr);
} }
} }
void grpc_server_security_connector_create_handshakers( void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector, grpc_exec_ctx *exec_ctx, grpc_server_security_connector *connector,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
if (connector != NULL) { if (connector != NULL) {
connector->create_handshakers(exec_ctx, connector, handshake_mgr); connector->add_handshakers(exec_ctx, connector, handshake_mgr);
} }
} }
@ -285,20 +286,24 @@ static void fake_channel_check_call_host(grpc_exec_ctx *exec_ctx,
cb(exec_ctx, user_data, GRPC_SECURITY_OK); cb(exec_ctx, user_data, GRPC_SECURITY_OK);
} }
static void fake_channel_create_handshakers( static void fake_channel_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
grpc_security_create_handshakers( grpc_handshake_manager_add(
exec_ctx, tsi_create_fake_handshaker(true /* is_client */), &sc->base, handshake_mgr,
handshake_mgr); grpc_security_handshaker_create(
exec_ctx, tsi_create_fake_handshaker(true /* is_client */),
&sc->base));
} }
static void fake_server_create_handshakers( static void fake_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
grpc_security_create_handshakers( grpc_handshake_manager_add(
exec_ctx, tsi_create_fake_handshaker(false /* is_client */), &sc->base, handshake_mgr,
handshake_mgr); grpc_security_handshaker_create(
exec_ctx, tsi_create_fake_handshaker(false /* is_client */),
&sc->base));
} }
static grpc_security_connector_vtable fake_channel_vtable = { static grpc_security_connector_vtable fake_channel_vtable = {
@ -316,7 +321,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
c->base.vtable = &fake_channel_vtable; c->base.vtable = &fake_channel_vtable;
c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds); c->request_metadata_creds = grpc_call_credentials_ref(request_metadata_creds);
c->check_call_host = fake_channel_check_call_host; c->check_call_host = fake_channel_check_call_host;
c->create_handshakers = fake_channel_create_handshakers; c->add_handshakers = fake_channel_add_handshakers;
return c; return c;
} }
@ -328,7 +333,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create(
gpr_ref_init(&c->base.refcount, 1); gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &fake_server_vtable; c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
c->create_handshakers = fake_server_create_handshakers; c->add_handshakers = fake_server_add_handshakers;
return c; return c;
} }
@ -382,9 +387,9 @@ static grpc_security_status ssl_create_handshaker(
return GRPC_SECURITY_OK; return GRPC_SECURITY_OK;
} }
static void ssl_channel_create_handshakers( static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
grpc_ssl_channel_security_connector *c = grpc_ssl_channel_security_connector *c =
(grpc_ssl_channel_security_connector *)sc; (grpc_ssl_channel_security_connector *)sc;
// Instantiate TSI handshaker. // Instantiate TSI handshaker.
@ -395,12 +400,13 @@ static void ssl_channel_create_handshakers(
: c->target_name, : c->target_name,
&tsi_hs); &tsi_hs);
// Create handshakers. // Create handshakers.
grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
exec_ctx, tsi_hs, &sc->base));
} }
static void ssl_server_create_handshakers( static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr) { grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c = grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc; (grpc_ssl_server_security_connector *)sc;
// Instantiate TSI handshaker. // Instantiate TSI handshaker.
@ -408,7 +414,8 @@ static void ssl_server_create_handshakers(
ssl_create_handshaker(c->handshaker_factory, false /* is_client */, ssl_create_handshaker(c->handshaker_factory, false /* is_client */,
NULL /* peer_name */, &tsi_hs); NULL /* peer_name */, &tsi_hs);
// Create handshakers. // Create handshakers.
grpc_security_create_handshakers(exec_ctx, tsi_hs, &sc->base, handshake_mgr); grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create(
exec_ctx, tsi_hs, &sc->base));
} }
static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) { static int ssl_host_matches_name(const tsi_peer *peer, const char *peer_name) {
@ -708,7 +715,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
c->base.request_metadata_creds = c->base.request_metadata_creds =
grpc_call_credentials_ref(request_metadata_creds); grpc_call_credentials_ref(request_metadata_creds);
c->base.check_call_host = ssl_channel_check_call_host; c->base.check_call_host = ssl_channel_check_call_host;
c->base.create_handshakers = ssl_channel_create_handshakers; c->base.add_handshakers = ssl_channel_add_handshakers;
gpr_split_host_port(target_name, &c->target_name, &port); gpr_split_host_port(target_name, &c->target_name, &port);
gpr_free(port); gpr_free(port);
if (overridden_target_name != NULL) { if (overridden_target_name != NULL) {
@ -783,7 +790,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL; *sc = NULL;
goto error; goto error;
} }
c->base.create_handshakers = ssl_server_create_handshakers; c->base.add_handshakers = ssl_server_add_handshakers;
*sc = &c->base; *sc = &c->base;
gpr_free((void *)alpn_protocol_strings); gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths); gpr_free(alpn_protocol_string_lengths);

@ -98,7 +98,7 @@ void grpc_security_connector_unref(grpc_security_connector *policy);
#endif #endif
/* Check the peer. Callee takes ownership of the peer object. /* Check the peer. Callee takes ownership of the peer object.
Sets *auth_context and invokes on_peer_checked when done. */ When done, sets *auth_context and invokes on_peer_checked. */
void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc, grpc_security_connector *sc,
tsi_peer peer, tsi_peer peer,
@ -133,9 +133,9 @@ struct grpc_channel_security_connector {
grpc_channel_security_connector *sc, const char *host, grpc_channel_security_connector *sc, const char *host,
grpc_auth_context *auth_context, grpc_auth_context *auth_context,
grpc_security_call_host_check_cb cb, void *user_data); grpc_security_call_host_check_cb cb, void *user_data);
void (*create_handshakers)(grpc_exec_ctx *exec_ctx, void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
grpc_channel_security_connector *sc, grpc_channel_security_connector *sc,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);
}; };
/* Checks that the host that will be set for a call is acceptable. */ /* Checks that the host that will be set for a call is acceptable. */
@ -145,7 +145,7 @@ void grpc_channel_security_connector_check_call_host(
grpc_security_call_host_check_cb cb, void *user_data); grpc_security_call_host_check_cb cb, void *user_data);
/* Registers handshakers with \a handshake_mgr. */ /* Registers handshakers with \a handshake_mgr. */
void grpc_channel_security_connector_create_handshakers( void grpc_channel_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector, grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *connector,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);
@ -158,12 +158,12 @@ typedef struct grpc_server_security_connector grpc_server_security_connector;
struct grpc_server_security_connector { struct grpc_server_security_connector {
grpc_security_connector base; grpc_security_connector base;
void (*create_handshakers)(grpc_exec_ctx *exec_ctx, void (*add_handshakers)(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);
}; };
void grpc_server_security_connector_create_handshakers( void grpc_server_security_connector_add_handshakers(
grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc, grpc_exec_ctx *exec_ctx, grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr); grpc_handshake_manager *handshake_mgr);

@ -131,6 +131,9 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx,
// Not shutting down, so the write failed. Clean up before // Not shutting down, so the write failed. Clean up before
// invoking the callback. // invoking the callback.
cleanup_args_for_failure_locked(h); cleanup_args_for_failure_locked(h);
// Set shutdown to true so that subsequent calls to
// security_handshaker_shutdown() do nothing.
h->shutdown = true;
} }
// Invoke callback. // Invoke callback.
grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL); grpc_exec_ctx_sched(exec_ctx, h->on_handshake_done, error, NULL);
@ -434,17 +437,14 @@ static grpc_handshaker *fail_handshaker_create() {
// exported functions // exported functions
// //
void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, grpc_handshaker *grpc_security_handshaker_create(
tsi_handshaker *handshaker, grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
grpc_security_connector *connector, grpc_security_connector *connector) {
grpc_handshake_manager *handshake_mgr) { // If no TSI handshaker was created, return a handshaker that always fails.
// If no TSI handshaker was created, add a handshaker that always fails. // Otherwise, return a real security handshaker.
// Otherwise, add a real security handshaker.
if (handshaker == NULL) { if (handshaker == NULL) {
grpc_handshake_manager_add(handshake_mgr, fail_handshaker_create()); return fail_handshaker_create();
} else { } else {
grpc_handshake_manager_add( return security_handshaker_create(exec_ctx, handshaker, connector);
handshake_mgr,
security_handshaker_create(exec_ctx, handshaker, connector));
} }
} }

@ -34,14 +34,13 @@
#ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H #ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
#define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H #define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H
#include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/security/transport/security_connector.h" #include "src/core/lib/security/transport/security_connector.h"
/// Creates any necessary security handshakers and adds them to /// Creates a security handshaker using \a handshaker.
/// \a handshake_mgr. grpc_handshaker *grpc_security_handshaker_create(
void grpc_security_create_handshakers(grpc_exec_ctx *exec_ctx, grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker,
tsi_handshaker *handshaker, grpc_security_connector *connector);
grpc_security_connector *connector,
grpc_handshake_manager *handshake_mgr);
#endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */ #endif /* GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_HANDSHAKER_H */

@ -92,6 +92,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) { grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
test_tcp_server *server = arg; test_tcp_server *server = arg;
grpc_closure_init(&on_read, handle_read, NULL); grpc_closure_init(&on_read, handle_read, NULL);
grpc_slice_buffer_init(&state.incoming_buffer); grpc_slice_buffer_init(&state.incoming_buffer);

@ -145,6 +145,7 @@ static void handle_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) { grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
test_tcp_server *server = arg; test_tcp_server *server = arg;
grpc_closure_init(&on_read, handle_read, NULL); grpc_closure_init(&on_read, handle_read, NULL);
grpc_closure_init(&on_write, done_write, NULL); grpc_closure_init(&on_write, done_write, NULL);

@ -367,6 +367,7 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, static void on_accept(grpc_exec_ctx* exec_ctx, void* arg,
grpc_endpoint* endpoint, grpc_pollset* accepting_pollset, grpc_endpoint* endpoint, grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor) { grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
grpc_end2end_http_proxy* proxy = arg; grpc_end2end_http_proxy* proxy = arg;
// Instantiate proxy_connection. // Instantiate proxy_connection.
proxy_connection* conn = gpr_malloc(sizeof(*conn)); proxy_connection* conn = gpr_malloc(sizeof(*conn));

@ -126,6 +126,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
on_connect_result temp_result; on_connect_result temp_result;
on_connect_result_set(&temp_result, acceptor); on_connect_result_set(&temp_result, acceptor);
gpr_free(acceptor);
gpr_mu_lock(g_mu); gpr_mu_lock(g_mu);
g_result = temp_result; g_result = temp_result;

@ -111,8 +111,7 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
struct handshake_state state; struct handshake_state state;
state.done_callback_called = false; state.done_callback_called = false;
grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create();
grpc_server_security_connector_create_handshakers(&exec_ctx, sc, grpc_server_security_connector_add_handshakers(&exec_ctx, sc, handshake_mgr);
handshake_mgr);
grpc_handshake_manager_do_handshake( grpc_handshake_manager_do_handshake(
&exec_ctx, handshake_mgr, mock_endpoint, NULL /* channel_args */, &exec_ctx, handshake_mgr, mock_endpoint, NULL /* channel_args */,
deadline, NULL /* acceptor */, on_handshake_done, &state); deadline, NULL /* acceptor */, on_handshake_done, &state);

@ -105,8 +105,8 @@ void server_thread(void *vargs) {
static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) { grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
struct server_thread_args *args = (struct server_thread_args *)vargs; struct server_thread_args *args = (struct server_thread_args *)vargs;
(void)acceptor;
grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_destroy(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL));

@ -73,6 +73,7 @@ static void pretty_print_backoffs(reconnect_server *server) {
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *accepting_pollset, grpc_pollset *accepting_pollset,
grpc_tcp_server_acceptor *acceptor) { grpc_tcp_server_acceptor *acceptor) {
gpr_free(acceptor);
char *peer; char *peer;
char *last_colon; char *last_colon;
reconnect_server *server = (reconnect_server *)arg; reconnect_server *server = (reconnect_server *)arg;

Loading…
Cancel
Save