move pending tcp management from server to connector

pull/4175/head
yang-g 9 years ago
parent 75b53d6a5d
commit 5e7f08a063
  1. 10
      src/core/httpcli/httpcli_security_connector.c
  2. 6
      src/core/security/handshake.c
  3. 183
      src/core/security/security_connector.c
  4. 4
      src/core/security/security_connector.h
  5. 78
      src/core/security/server_secure_chttp2.c
  6. 3
      src/core/surface/secure_channel_create.c

@ -68,7 +68,7 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
tsi_result result = TSI_OK;
tsi_handshaker *handshaker;
if (c->handshaker_factory == NULL) {
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL);
return;
}
result = tsi_ssl_handshaker_factory_create_handshaker(
@ -76,7 +76,7 @@ static void httpcli_ssl_do_handshake(grpc_exec_ctx *exec_ctx,
if (result != TSI_OK) {
gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.",
tsi_result_to_string(result));
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
user_data);
@ -102,8 +102,11 @@ static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc,
return status;
}
static void httpcli_ssl_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static grpc_security_connector_vtable httpcli_ssl_vtable = {
httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer};
httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer,
httpcli_ssl_shutdown};
static grpc_security_status httpcli_ssl_channel_security_connector_create(
const unsigned char *pem_root_certs, size_t pem_root_certs_size,
@ -149,7 +152,6 @@ typedef struct {
static void on_secure_transport_setup_done(grpc_exec_ctx *exec_ctx, void *rp,
grpc_security_status status,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
on_done_closure *c = rp;
if (status != GRPC_SECURITY_OK) {

@ -68,8 +68,7 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx,
grpc_security_handshake *h,
int is_success) {
if (is_success) {
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->wrapped_endpoint,
h->secure_endpoint);
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint);
} else {
if (h->secure_endpoint != NULL) {
grpc_endpoint_shutdown(exec_ctx, h->secure_endpoint);
@ -77,8 +76,7 @@ static void security_handshake_done(grpc_exec_ctx *exec_ctx,
} else {
grpc_endpoint_destroy(exec_ctx, h->wrapped_endpoint);
}
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, h->wrapped_endpoint,
NULL);
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_ERROR, NULL);
}
if (h->handshaker != NULL) tsi_handshaker_destroy(h->handshaker);
if (h->handshake_buffer != NULL) gpr_free(h->handshake_buffer);

@ -102,13 +102,18 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
return NULL;
}
void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *connector) {
connector->vtable->shutdown(exec_ctx, connector);
}
void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
if (sc == NULL || nonsecure_endpoint == NULL) {
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, nonsecure_endpoint, NULL);
cb(exec_ctx, user_data, GRPC_SECURITY_ERROR, NULL);
} else {
sc->vtable->do_handshake(exec_ctx, sc, nonsecure_endpoint, cb, user_data);
}
@ -210,6 +215,17 @@ typedef struct {
int call_host_check_is_async;
} grpc_fake_channel_security_connector;
typedef struct tcp_endpoint_list {
grpc_endpoint *tcp_endpoint;
struct tcp_endpoint_list *next;
} tcp_endpoint_list;
typedef struct {
grpc_security_connector base;
gpr_mu mu;
tcp_endpoint_list *handshaking_tcp_endpoints;
} grpc_fake_server_security_connector;
static void fake_channel_destroy(grpc_security_connector *sc) {
grpc_channel_security_connector *c = (grpc_channel_security_connector *)sc;
grpc_call_credentials_unref(c->request_metadata_creds);
@ -280,20 +296,99 @@ static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
nonsecure_endpoint, cb, user_data);
}
typedef struct callback_data {
grpc_security_connector *sc;
grpc_endpoint *tcp;
grpc_security_handshake_done_cb cb;
void *user_data;
} callback_data;
static tcp_endpoint_list *remove_tcp_from_list(tcp_endpoint_list *head,
grpc_endpoint *tcp) {
tcp_endpoint_list *node = head;
tcp_endpoint_list *tmp = NULL;
if (head && head->tcp_endpoint == tcp) {
head = head->next;
gpr_free(node);
return head;
}
while (node) {
if (node->next->tcp_endpoint == tcp) {
tmp = node->next;
node->next = node->next->next;
gpr_free(tmp);
return head;
}
node = node->next;
}
return head;
}
static void fake_remove_tcp_and_call_user_cb(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
callback_data *d = (callback_data *)user_data;
grpc_fake_server_security_connector *sc =
(grpc_fake_server_security_connector *)d->sc;
grpc_security_handshake_done_cb cb = d->cb;
void *data = d->user_data;
gpr_mu_lock(&sc->mu);
sc->handshaking_tcp_endpoints =
remove_tcp_from_list(sc->handshaking_tcp_endpoints, d->tcp);
gpr_mu_unlock(&sc->mu);
gpr_free(d);
cb(exec_ctx, data, status, secure_endpoint);
}
static void fake_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
grpc_fake_server_security_connector *c =
(grpc_fake_server_security_connector *)sc;
tcp_endpoint_list *node = gpr_malloc(sizeof(tcp_endpoint_list));
callback_data *wrapped_data;
node->tcp_endpoint = nonsecure_endpoint;
gpr_mu_lock(&c->mu);
node->next = c->handshaking_tcp_endpoints;
c->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&c->mu);
wrapped_data = gpr_malloc(sizeof(callback_data));
wrapped_data->sc = &c->base;
wrapped_data->tcp = nonsecure_endpoint;
wrapped_data->cb = cb;
wrapped_data->user_data = user_data;
grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), sc,
nonsecure_endpoint, cb, user_data);
nonsecure_endpoint,
fake_remove_tcp_and_call_user_cb, wrapped_data);
}
static void fake_channel_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static void fake_server_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_fake_server_security_connector *c =
(grpc_fake_server_security_connector *)sc;
gpr_mu_lock(&c->mu);
while (c->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(exec_ctx,
c->handshaking_tcp_endpoints->tcp_endpoint);
c->handshaking_tcp_endpoints =
remove_tcp_from_list(c->handshaking_tcp_endpoints,
c->handshaking_tcp_endpoints->tcp_endpoint);
}
gpr_mu_unlock(&c->mu);
}
static grpc_security_connector_vtable fake_channel_vtable = {
fake_channel_destroy, fake_channel_do_handshake, fake_check_peer};
fake_channel_destroy, fake_channel_do_handshake, fake_check_peer,
fake_channel_shutdown};
static grpc_security_connector_vtable fake_server_vtable = {
fake_server_destroy, fake_server_do_handshake, fake_check_peer};
fake_server_destroy, fake_server_do_handshake, fake_check_peer,
fake_server_shutdown};
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
grpc_call_credentials *request_metadata_creds,
@ -313,13 +408,15 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
}
grpc_security_connector *grpc_fake_server_security_connector_create(void) {
grpc_security_connector *c = gpr_malloc(sizeof(grpc_security_connector));
memset(c, 0, sizeof(grpc_security_connector));
gpr_ref_init(&c->refcount, 1);
c->is_client_side = 0;
c->vtable = &fake_server_vtable;
c->url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
return c;
grpc_fake_server_security_connector *c =
gpr_malloc(sizeof(grpc_fake_server_security_connector));
memset(c, 0, sizeof(grpc_fake_server_security_connector));
gpr_ref_init(&c->base.refcount, 1);
c->base.is_client_side = 0;
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
gpr_mu_init(&c->mu);
return &c->base;
}
/* --- Ssl implementation. --- */
@ -334,6 +431,8 @@ typedef struct {
typedef struct {
grpc_security_connector base;
gpr_mu mu;
tcp_endpoint_list *handshaking_tcp_endpoints;
tsi_ssl_handshaker_factory *handshaker_factory;
} grpc_ssl_server_security_connector;
@ -354,6 +453,7 @@ static void ssl_channel_destroy(grpc_security_connector *sc) {
static void ssl_server_destroy(grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
if (c->handshaker_factory != NULL) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
@ -390,13 +490,30 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
: c->target_name,
&handshaker);
if (status != GRPC_SECURITY_OK) {
cb(exec_ctx, user_data, status, nonsecure_endpoint, NULL);
cb(exec_ctx, user_data, status, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
user_data);
}
}
static void ssl_remove_tcp_and_call_user_cb(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
callback_data *d = (callback_data *)user_data;
grpc_ssl_server_security_connector *sc =
(grpc_ssl_server_security_connector *)d->sc;
grpc_security_handshake_done_cb cb = d->cb;
void *data = d->user_data;
gpr_mu_lock(&sc->mu);
sc->handshaking_tcp_endpoints =
remove_tcp_from_list(sc->handshaking_tcp_endpoints, d->tcp);
gpr_mu_unlock(&sc->mu);
gpr_free(d);
cb(exec_ctx, data, status, secure_endpoint);
}
static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
@ -405,13 +522,26 @@ static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
tsi_handshaker *handshaker;
callback_data *wrapped_data;
tcp_endpoint_list *node;
grpc_security_status status =
ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker);
if (status != GRPC_SECURITY_OK) {
cb(exec_ctx, user_data, status, nonsecure_endpoint, NULL);
cb(exec_ctx, user_data, status, NULL);
} else {
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
user_data);
node = gpr_malloc(sizeof(tcp_endpoint_list));
node->tcp_endpoint = nonsecure_endpoint;
gpr_mu_lock(&c->mu);
node->next = c->handshaking_tcp_endpoints;
c->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&c->mu);
wrapped_data = gpr_malloc(sizeof(callback_data));
wrapped_data->sc = &c->base;
wrapped_data->tcp = nonsecure_endpoint;
wrapped_data->cb = cb;
wrapped_data->user_data = user_data;
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint,
ssl_remove_tcp_and_call_user_cb, wrapped_data);
}
}
@ -536,11 +666,29 @@ static grpc_security_status ssl_channel_check_call_host(
}
}
static void ssl_channel_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static void ssl_server_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
gpr_mu_lock(&c->mu);
while (c->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(exec_ctx,
c->handshaking_tcp_endpoints->tcp_endpoint);
c->handshaking_tcp_endpoints =
remove_tcp_from_list(c->handshaking_tcp_endpoints,
c->handshaking_tcp_endpoints->tcp_endpoint);
}
gpr_mu_unlock(&c->mu);
}
static grpc_security_connector_vtable ssl_channel_vtable = {
ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer};
ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer,
ssl_channel_shutdown};
static grpc_security_connector_vtable ssl_server_vtable = {
ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer};
ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer,
ssl_server_shutdown};
static gpr_slice default_pem_root_certs;
@ -691,6 +839,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL;
goto error;
}
gpr_mu_init(&c->mu);
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);

@ -67,7 +67,6 @@ typedef void (*grpc_security_check_cb)(grpc_exec_ctx *exec_ctx, void *user_data,
typedef void (*grpc_security_handshake_done_cb)(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_security_status status,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint);
typedef struct {
@ -78,6 +77,7 @@ typedef struct {
grpc_security_status (*check_peer)(grpc_security_connector *sc, tsi_peer peer,
grpc_security_check_cb cb,
void *user_data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc);
} grpc_security_connector_vtable;
struct grpc_security_connector {
@ -115,6 +115,8 @@ void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_handshake_done_cb cb,
void *user_data);
void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *connector);
/* Check the peer.
Implementations can choose to check the peer either synchronously or
asynchronously. In the first case, a successful call will return

@ -52,17 +52,11 @@
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
typedef struct tcp_endpoint_list {
grpc_endpoint *tcp_endpoint;
struct tcp_endpoint_list *next;
} tcp_endpoint_list;
typedef struct grpc_server_secure_state {
grpc_server *server;
grpc_tcp_server *tcp;
grpc_security_connector *sc;
grpc_server_credentials *creds;
tcp_endpoint_list *handshaking_tcp_endpoints;
int is_shutdown;
gpr_mu mu;
gpr_refcount refcount;
@ -103,54 +97,31 @@ static void setup_transport(grpc_exec_ctx *exec_ctx, void *statep,
grpc_channel_args_destroy(args_copy);
}
static int remove_tcp_from_list_locked(grpc_server_secure_state *state,
grpc_endpoint *tcp) {
tcp_endpoint_list *node = state->handshaking_tcp_endpoints;
tcp_endpoint_list *tmp = NULL;
if (node && node->tcp_endpoint == tcp) {
state->handshaking_tcp_endpoints = state->handshaking_tcp_endpoints->next;
gpr_free(node);
return 0;
}
while (node) {
if (node->next->tcp_endpoint == tcp) {
tmp = node->next;
node->next = node->next->next;
gpr_free(tmp);
return 0;
}
node = node->next;
}
return -1;
}
static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
grpc_security_status status,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
grpc_server_secure_state *state = statep;
grpc_transport *transport;
grpc_mdctx *mdctx;
if (status == GRPC_SECURITY_OK) {
gpr_mu_lock(&state->mu);
remove_tcp_from_list_locked(state, wrapped_endpoint);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->server),
secure_endpoint, mdctx, 0);
setup_transport(exec_ctx, state, transport, mdctx);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
} else {
/* We need to consume this here, because the server may already have gone
* away. */
grpc_endpoint_destroy(exec_ctx, secure_endpoint);
if (secure_endpoint) {
gpr_mu_lock(&state->mu);
if (!state->is_shutdown) {
mdctx = grpc_mdctx_create();
transport = grpc_create_chttp2_transport(
exec_ctx, grpc_server_get_channel_args(state->server),
secure_endpoint, mdctx, 0);
setup_transport(exec_ctx, state, transport, mdctx);
grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
} else {
/* We need to consume this here, because the server may already have
* gone
* away. */
grpc_endpoint_destroy(exec_ctx, secure_endpoint);
}
gpr_mu_unlock(&state->mu);
}
gpr_mu_unlock(&state->mu);
} else {
gpr_mu_lock(&state->mu);
remove_tcp_from_list_locked(state, wrapped_endpoint);
gpr_mu_unlock(&state->mu);
gpr_log(GPR_ERROR, "Secure transport failed with error %d", status);
}
state_unref(state);
@ -159,14 +130,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *statep,
static void on_accept(grpc_exec_ctx *exec_ctx, void *statep,
grpc_endpoint *tcp) {
grpc_server_secure_state *state = statep;
tcp_endpoint_list *node;
state_ref(state);
node = gpr_malloc(sizeof(tcp_endpoint_list));
node->tcp_endpoint = tcp;
gpr_mu_lock(&state->mu);
node->next = state->handshaking_tcp_endpoints;
state->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&state->mu);
grpc_security_connector_do_handshake(exec_ctx, state->sc, tcp,
on_secure_handshake_done, state);
}
@ -183,14 +147,7 @@ static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
grpc_server_secure_state *state = statep;
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
success);
gpr_mu_lock(&state->mu);
while (state->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(exec_ctx,
state->handshaking_tcp_endpoints->tcp_endpoint);
remove_tcp_from_list_locked(state,
state->handshaking_tcp_endpoints->tcp_endpoint);
}
gpr_mu_unlock(&state->mu);
grpc_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
}
@ -280,7 +237,6 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr,
state->sc = sc;
state->creds = grpc_server_credentials_ref(creds);
state->handshaking_tcp_endpoints = NULL;
state->is_shutdown = 0;
gpr_mu_init(&state->mu);
gpr_ref_init(&state->refcount, 1);

@ -86,7 +86,6 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_security_status status,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
grpc_closure *notify;
@ -95,13 +94,11 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
memset(c->result, 0, sizeof(*c->result));
gpr_mu_unlock(&c->mu);
} else if (status != GRPC_SECURITY_OK) {
GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status);
memset(c->result, 0, sizeof(*c->result));
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
} else {
GPR_ASSERT(c->connecting_endpoint == wrapped_endpoint);
c->connecting_endpoint = NULL;
gpr_mu_unlock(&c->mu);
c->result->transport = grpc_create_chttp2_transport(

Loading…
Cancel
Save