make security_connector manage pending handshaker, while handshaker owns tcp

pull/4175/head
yang-g 9 years ago
parent 5e7f08a063
commit 5e72a35498
  1. 5
      src/core/httpcli/httpcli_security_connector.c
  2. 43
      src/core/security/handshake.c
  3. 2
      src/core/security/handshake.h
  4. 186
      src/core/security/security_connector.c
  5. 14
      src/core/security/security_connector.h
  6. 2
      src/core/security/server_secure_chttp2.c

@ -102,11 +102,8 @@ static grpc_security_status httpcli_ssl_check_peer(grpc_security_connector *sc,
return status;
}
static void httpcli_ssl_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static grpc_security_connector_vtable httpcli_ssl_vtable = {
httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer,
httpcli_ssl_shutdown};
httpcli_ssl_destroy, httpcli_ssl_do_handshake, httpcli_ssl_check_peer};
static grpc_security_status httpcli_ssl_channel_security_connector_create(
const unsigned char *pem_root_certs, size_t pem_root_certs_size,

@ -64,9 +64,37 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
int success);
static void security_connector_remove_handshake(grpc_security_handshake *h) {
grpc_security_connector_handshake_list *node;
grpc_security_connector_handshake_list *tmp;
grpc_security_connector *sc = h->connector;
gpr_mu_lock(&sc->mu);
node = sc->handshaking_handshakes;
if (node && node->handshake == h) {
sc->handshaking_handshakes = node->next;
gpr_free(node);
gpr_mu_unlock(&sc->mu);
return;
}
while (node) {
if (node->next->handshake == h) {
tmp = node->next;
node->next = node->next->next;
gpr_free(tmp);
gpr_mu_unlock(&sc->mu);
return;
}
node = node->next;
}
gpr_mu_unlock(&sc->mu);
}
static void security_handshake_done(grpc_exec_ctx *exec_ctx,
grpc_security_handshake *h,
int is_success) {
if (!h->connector->is_client_side) {
security_connector_remove_handshake(h);
}
if (is_success) {
h->cb(exec_ctx, h->user_data, GRPC_SECURITY_OK, h->secure_endpoint);
} else {
@ -266,6 +294,7 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
grpc_security_connector_handshake_list *handshake_node;
grpc_security_handshake *h = gpr_malloc(sizeof(grpc_security_handshake));
memset(h, 0, sizeof(grpc_security_handshake));
h->handshaker = handshaker;
@ -282,5 +311,19 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_init(&h->left_overs);
gpr_slice_buffer_init(&h->outgoing);
gpr_slice_buffer_init(&h->incoming);
if (!connector->is_client_side) {
handshake_node = gpr_malloc(sizeof(grpc_security_connector_handshake_list));
handshake_node->handshake = h;
gpr_mu_lock(&connector->mu);
handshake_node->next = connector->handshaking_handshakes;
connector->handshaking_handshakes = handshake_node;
gpr_mu_unlock(&connector->mu);
}
send_handshake_bytes_to_peer(exec_ctx, h);
}
void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx,
void *handshake) {
grpc_security_handshake *h = handshake;
grpc_endpoint_shutdown(exec_ctx, h->wrapped_endpoint);
}

@ -45,4 +45,6 @@ void grpc_do_security_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_handshake_done_cb cb,
void *user_data);
void grpc_security_handshake_shutdown(grpc_exec_ctx *exec_ctx, void *handshake);
#endif /* GRPC_INTERNAL_CORE_SECURITY_HANDSHAKE_H */

@ -104,7 +104,18 @@ const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *connector) {
connector->vtable->shutdown(exec_ctx, connector);
grpc_security_connector_handshake_list *tmp;
if (!connector->is_client_side) {
gpr_mu_lock(&connector->mu);
while (connector->handshaking_handshakes) {
tmp = connector->handshaking_handshakes;
grpc_security_handshake_shutdown(
exec_ctx, connector->handshaking_handshakes->handshake);
connector->handshaking_handshakes = tmp->next;
gpr_free(tmp);
}
gpr_mu_unlock(&connector->mu);
}
}
void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
@ -215,17 +226,6 @@ typedef struct {
int call_host_check_is_async;
} grpc_fake_channel_security_connector;
typedef struct tcp_endpoint_list {
grpc_endpoint *tcp_endpoint;
struct tcp_endpoint_list *next;
} tcp_endpoint_list;
typedef struct {
grpc_security_connector base;
gpr_mu mu;
tcp_endpoint_list *handshaking_tcp_endpoints;
} grpc_fake_server_security_connector;
static void fake_channel_destroy(grpc_security_connector *sc) {
grpc_channel_security_connector *c = (grpc_channel_security_connector *)sc;
grpc_call_credentials_unref(c->request_metadata_creds);
@ -235,6 +235,7 @@ static void fake_channel_destroy(grpc_security_connector *sc) {
static void fake_server_destroy(grpc_security_connector *sc) {
GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector");
gpr_mu_destroy(&sc->mu);
gpr_free(sc);
}
@ -296,99 +297,20 @@ static void fake_channel_do_handshake(grpc_exec_ctx *exec_ctx,
nonsecure_endpoint, cb, user_data);
}
typedef struct callback_data {
grpc_security_connector *sc;
grpc_endpoint *tcp;
grpc_security_handshake_done_cb cb;
void *user_data;
} callback_data;
static tcp_endpoint_list *remove_tcp_from_list(tcp_endpoint_list *head,
grpc_endpoint *tcp) {
tcp_endpoint_list *node = head;
tcp_endpoint_list *tmp = NULL;
if (head && head->tcp_endpoint == tcp) {
head = head->next;
gpr_free(node);
return head;
}
while (node) {
if (node->next->tcp_endpoint == tcp) {
tmp = node->next;
node->next = node->next->next;
gpr_free(tmp);
return head;
}
node = node->next;
}
return head;
}
static void fake_remove_tcp_and_call_user_cb(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
callback_data *d = (callback_data *)user_data;
grpc_fake_server_security_connector *sc =
(grpc_fake_server_security_connector *)d->sc;
grpc_security_handshake_done_cb cb = d->cb;
void *data = d->user_data;
gpr_mu_lock(&sc->mu);
sc->handshaking_tcp_endpoints =
remove_tcp_from_list(sc->handshaking_tcp_endpoints, d->tcp);
gpr_mu_unlock(&sc->mu);
gpr_free(d);
cb(exec_ctx, data, status, secure_endpoint);
}
static void fake_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
grpc_security_handshake_done_cb cb,
void *user_data) {
grpc_fake_server_security_connector *c =
(grpc_fake_server_security_connector *)sc;
tcp_endpoint_list *node = gpr_malloc(sizeof(tcp_endpoint_list));
callback_data *wrapped_data;
node->tcp_endpoint = nonsecure_endpoint;
gpr_mu_lock(&c->mu);
node->next = c->handshaking_tcp_endpoints;
c->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&c->mu);
wrapped_data = gpr_malloc(sizeof(callback_data));
wrapped_data->sc = &c->base;
wrapped_data->tcp = nonsecure_endpoint;
wrapped_data->cb = cb;
wrapped_data->user_data = user_data;
grpc_do_security_handshake(exec_ctx, tsi_create_fake_handshaker(0), sc,
nonsecure_endpoint,
fake_remove_tcp_and_call_user_cb, wrapped_data);
}
static void fake_channel_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static void fake_server_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_fake_server_security_connector *c =
(grpc_fake_server_security_connector *)sc;
gpr_mu_lock(&c->mu);
while (c->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(exec_ctx,
c->handshaking_tcp_endpoints->tcp_endpoint);
c->handshaking_tcp_endpoints =
remove_tcp_from_list(c->handshaking_tcp_endpoints,
c->handshaking_tcp_endpoints->tcp_endpoint);
}
gpr_mu_unlock(&c->mu);
nonsecure_endpoint, cb, user_data);
}
static grpc_security_connector_vtable fake_channel_vtable = {
fake_channel_destroy, fake_channel_do_handshake, fake_check_peer,
fake_channel_shutdown};
fake_channel_destroy, fake_channel_do_handshake, fake_check_peer};
static grpc_security_connector_vtable fake_server_vtable = {
fake_server_destroy, fake_server_do_handshake, fake_check_peer,
fake_server_shutdown};
fake_server_destroy, fake_server_do_handshake, fake_check_peer};
grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
grpc_call_credentials *request_metadata_creds,
@ -408,15 +330,14 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create(
}
grpc_security_connector *grpc_fake_server_security_connector_create(void) {
grpc_fake_server_security_connector *c =
gpr_malloc(sizeof(grpc_fake_server_security_connector));
memset(c, 0, sizeof(grpc_fake_server_security_connector));
gpr_ref_init(&c->base.refcount, 1);
c->base.is_client_side = 0;
c->base.vtable = &fake_server_vtable;
c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
grpc_security_connector *c = gpr_malloc(sizeof(grpc_security_connector));
memset(c, 0, sizeof(grpc_security_connector));
gpr_ref_init(&c->refcount, 1);
c->is_client_side = 0;
c->vtable = &fake_server_vtable;
c->url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME;
gpr_mu_init(&c->mu);
return &c->base;
return c;
}
/* --- Ssl implementation. --- */
@ -431,8 +352,6 @@ typedef struct {
typedef struct {
grpc_security_connector base;
gpr_mu mu;
tcp_endpoint_list *handshaking_tcp_endpoints;
tsi_ssl_handshaker_factory *handshaker_factory;
} grpc_ssl_server_security_connector;
@ -458,6 +377,7 @@ static void ssl_server_destroy(grpc_security_connector *sc) {
tsi_ssl_handshaker_factory_destroy(c->handshaker_factory);
}
GRPC_AUTH_CONTEXT_UNREF(sc->auth_context, "connector");
gpr_mu_destroy(&sc->mu);
gpr_free(sc);
}
@ -497,23 +417,6 @@ static void ssl_channel_do_handshake(grpc_exec_ctx *exec_ctx,
}
}
static void ssl_remove_tcp_and_call_user_cb(grpc_exec_ctx *exec_ctx,
void *user_data,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
callback_data *d = (callback_data *)user_data;
grpc_ssl_server_security_connector *sc =
(grpc_ssl_server_security_connector *)d->sc;
grpc_security_handshake_done_cb cb = d->cb;
void *data = d->user_data;
gpr_mu_lock(&sc->mu);
sc->handshaking_tcp_endpoints =
remove_tcp_from_list(sc->handshaking_tcp_endpoints, d->tcp);
gpr_mu_unlock(&sc->mu);
gpr_free(d);
cb(exec_ctx, data, status, secure_endpoint);
}
static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc,
grpc_endpoint *nonsecure_endpoint,
@ -522,26 +425,13 @@ static void ssl_server_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
tsi_handshaker *handshaker;
callback_data *wrapped_data;
tcp_endpoint_list *node;
grpc_security_status status =
ssl_create_handshaker(c->handshaker_factory, 0, NULL, &handshaker);
if (status != GRPC_SECURITY_OK) {
cb(exec_ctx, user_data, status, NULL);
} else {
node = gpr_malloc(sizeof(tcp_endpoint_list));
node->tcp_endpoint = nonsecure_endpoint;
gpr_mu_lock(&c->mu);
node->next = c->handshaking_tcp_endpoints;
c->handshaking_tcp_endpoints = node;
gpr_mu_unlock(&c->mu);
wrapped_data = gpr_malloc(sizeof(callback_data));
wrapped_data->sc = &c->base;
wrapped_data->tcp = nonsecure_endpoint;
wrapped_data->cb = cb;
wrapped_data->user_data = user_data;
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint,
ssl_remove_tcp_and_call_user_cb, wrapped_data);
grpc_do_security_handshake(exec_ctx, handshaker, sc, nonsecure_endpoint, cb,
user_data);
}
}
@ -666,29 +556,11 @@ static grpc_security_status ssl_channel_check_call_host(
}
}
static void ssl_channel_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {}
static void ssl_server_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
gpr_mu_lock(&c->mu);
while (c->handshaking_tcp_endpoints != NULL) {
grpc_endpoint_shutdown(exec_ctx,
c->handshaking_tcp_endpoints->tcp_endpoint);
c->handshaking_tcp_endpoints =
remove_tcp_from_list(c->handshaking_tcp_endpoints,
c->handshaking_tcp_endpoints->tcp_endpoint);
}
gpr_mu_unlock(&c->mu);
}
static grpc_security_connector_vtable ssl_channel_vtable = {
ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer,
ssl_channel_shutdown};
ssl_channel_destroy, ssl_channel_do_handshake, ssl_channel_check_peer};
static grpc_security_connector_vtable ssl_server_vtable = {
ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer,
ssl_server_shutdown};
ssl_server_destroy, ssl_server_do_handshake, ssl_server_check_peer};
static gpr_slice default_pem_root_certs;
@ -839,7 +711,7 @@ grpc_security_status grpc_ssl_server_security_connector_create(
*sc = NULL;
goto error;
}
gpr_mu_init(&c->mu);
gpr_mu_init(&c->base.mu);
*sc = &c->base;
gpr_free((void *)alpn_protocol_strings);
gpr_free(alpn_protocol_string_lengths);

@ -77,15 +77,22 @@ typedef struct {
grpc_security_status (*check_peer)(grpc_security_connector *sc, tsi_peer peer,
grpc_security_check_cb cb,
void *user_data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_security_connector *sc);
} grpc_security_connector_vtable;
typedef struct grpc_security_connector_handshake_list {
void *handshake;
struct grpc_security_connector_handshake_list *next;
} grpc_security_connector_handshake_list;
struct grpc_security_connector {
const grpc_security_connector_vtable *vtable;
gpr_refcount refcount;
int is_client_side;
const char *url_scheme;
grpc_auth_context *auth_context; /* Populated after the peer is checked. */
/* Used on server side only. */
gpr_mu mu;
grpc_security_connector_handshake_list *handshaking_handshakes;
};
/* Refcounting. */
@ -115,8 +122,6 @@ void grpc_security_connector_do_handshake(grpc_exec_ctx *exec_ctx,
grpc_security_handshake_done_cb cb,
void *user_data);
void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *connector);
/* Check the peer.
Implementations can choose to check the peer either synchronously or
asynchronously. In the first case, a successful call will return
@ -128,6 +133,9 @@ grpc_security_status grpc_security_connector_check_peer(
grpc_security_connector *sc, tsi_peer peer, grpc_security_check_cb cb,
void *user_data);
void grpc_security_connector_shutdown(grpc_exec_ctx *exec_ctx,
grpc_security_connector *connector);
/* Util to encapsulate the connector in a channel arg. */
grpc_arg grpc_security_connector_to_arg(grpc_security_connector *sc);

@ -145,9 +145,9 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
grpc_server_secure_state *state = statep;
grpc_security_connector_shutdown(exec_ctx, state->sc);
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
success);
grpc_security_connector_shutdown(exec_ctx, state->sc);
state_unref(state);
}

Loading…
Cancel
Save