From 28bf8912fd4d5aa3e30710f6871abd52abbb9420 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 7 Dec 2015 16:07:04 -0800 Subject: [PATCH] Ping API --- src/core/channel/client_channel.c | 6 ++++-- .../client_config/lb_policies/pick_first.c | 11 ++++++++++ .../client_config/lb_policies/round_robin.c | 18 ++++++++++++++++- src/core/client_config/lb_policy.c | 4 ++++ src/core/client_config/lb_policy.h | 4 ++-- src/core/client_config/subchannel.c | 11 ++++++++++ src/core/client_config/subchannel.h | 3 +++ src/core/transport/chttp2/frame_ping.c | 11 +--------- src/core/transport/chttp2/internal.h | 7 ++++--- src/core/transport/chttp2_transport.c | 19 ++++++++++++++++++ test/core/end2end/tests/channel_ping.c | 20 +++++++++++++++++++ 11 files changed, 96 insertions(+), 18 deletions(-) diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 0212f4f2c8e..c2a6648f18e 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -252,7 +252,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1); GPR_ASSERT(op->set_accept_stream == NULL); - GPR_ASSERT(op->bind_pollset == NULL || op->send_ping != NULL); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, &chand->interested_parties, op->bind_pollset); + } gpr_mu_lock(&chand->mu_config); if (op->on_connectivity_state_change != NULL) { @@ -267,7 +269,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (chand->lb_policy == NULL) { grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0); } else { - grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->bind_pollset, op->send_ping); + grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; } op->send_ping = NULL; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 8a8b60ba39b..7dad7f7cd4f 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -348,6 +348,17 @@ void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_unlock(&p->mu); } +void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *closure) { + pick_first_lb_policy *p = (pick_first_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); + } else { + grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + } + gpr_mu_unlock(&p->mu); +} + static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_ping_one, pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change}; diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c index b86dba20ee1..d4874563639 100644 --- a/src/core/client_config/lb_policies/round_robin.c +++ b/src/core/client_config/lb_policies/round_robin.c @@ -467,8 +467,24 @@ static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&p->mu); } +static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + ready_list *selected; + grpc_connected_subchannel *target; + gpr_mu_lock(&p->mu); + if ((selected = peek_next_connected_locked(p))) { + gpr_mu_unlock(&p->mu); + target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + grpc_connected_subchannel_ping(exec_ctx, target, closure); + } else { + gpr_mu_unlock(&p->mu); + grpc_exec_ctx_enqueue(exec_ctx, closure, 0); + } +} + static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle, + rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_ping_one, rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index d2541615467..3230434966b 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -116,6 +116,10 @@ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { policy->vtable->exit_idle(exec_ctx, policy); } +void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure) { + policy->vtable->ping_one(exec_ctx, policy, closure); +} + void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 01c57a24ff9..c194bf11fe7 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -63,7 +63,7 @@ struct grpc_lb_policy_vtable { void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); - void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_closure *closure); + void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); /** try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); @@ -123,7 +123,7 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target, grpc_closure *on_complete); -void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_pollset *pollset, grpc_closure *closure); +void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connected_subchannel **target); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6631e9bae21..ce4919a8cbf 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -461,6 +461,17 @@ void grpc_connected_subchannel_notify_on_state_change( closure); } +void grpc_connected_subchannel_ping( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, + grpc_closure *closure) { + grpc_transport_op op; + grpc_channel_element *elem; + memset(&op, 0, sizeof(op)); + op.send_ping = closure; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); + elem->filter->start_transport_op(exec_ctx, elem, &op); +} + static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { size_t channel_stack_size; grpc_connected_subchannel *con; diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 74ebcecfba4..c28c3a86c96 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -124,6 +124,9 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, grpc_pollset_set *interested_parties, grpc_connectivity_state *state, grpc_closure *notify); +void grpc_connected_subchannel_ping( + grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel, + grpc_closure *notify); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/src/core/transport/chttp2/frame_ping.c b/src/core/transport/chttp2/frame_ping.c index 4d2c54269d5..8e763278ffd 100644 --- a/src/core/transport/chttp2/frame_ping.c +++ b/src/core/transport/chttp2/frame_ping.c @@ -76,7 +76,6 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); gpr_uint8 *cur = beg; grpc_chttp2_ping_parser *p = parser; - grpc_chttp2_outstanding_ping *ping; while (p->byte != 8 && cur != end) { p->opaque_8bytes[p->byte] = *cur; @@ -87,15 +86,7 @@ grpc_chttp2_parse_error grpc_chttp2_ping_parser_parse( if (p->byte == 8) { GPR_ASSERT(is_last); if (p->is_ack) { - for (ping = transport_parsing->pings.next; - ping != &transport_parsing->pings; ping = ping->next) { - if (0 == memcmp(p->opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); - } - ping->next->prev = ping->prev; - ping->prev->next = ping->next; - gpr_free(ping); - } + grpc_chttp2_ack_ping(exec_ctx, transport_parsing, p->opaque_8bytes); } else { gpr_slice_buffer_add(&transport_parsing->qbuf, grpc_chttp2_ping_create(1, p->opaque_8bytes)); diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 45d2599cdc7..fc35ea6f930 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -283,9 +283,6 @@ struct grpc_chttp2_transport_parsing { gpr_slice goaway_text; gpr_int64 outgoing_window; - - /** pings awaiting responses */ - grpc_chttp2_outstanding_ping pings; }; struct grpc_chttp2_transport { @@ -747,4 +744,8 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, void grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs); +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *parsing, + const gpr_uint8 *opaque_8bytes); + #endif diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 6ba9db83486..661e80d3a49 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -901,6 +901,25 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); } +void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_parsing *transport_parsing, + const gpr_uint8 *opaque_8bytes) { + grpc_chttp2_outstanding_ping *ping; + grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing); + grpc_chttp2_transport_global *transport_global = &t->global; + lock(t); + for (ping = transport_global->pings.next; + ping != &transport_global->pings; ping = ping->next) { + if (0 == memcmp(opaque_8bytes, ping->id, 8)) { + grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1); + } + ping->next->prev = ping->prev; + ping->prev->next = ping->next; + gpr_free(ping); + } + unlock(exec_ctx, t); +} + static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) { grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; diff --git a/test/core/end2end/tests/channel_ping.c b/test/core/end2end/tests/channel_ping.c index 9598b16beca..441e49ee7a5 100644 --- a/test/core/end2end/tests/channel_ping.c +++ b/test/core/end2end/tests/channel_ping.c @@ -45,11 +45,31 @@ static void *tag(gpr_intptr t) { return (void *)t; } static void test_ping(grpc_end2end_test_config config) { grpc_end2end_test_fixture f = config.create_fixture(NULL, NULL); cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_connectivity_state state = GRPC_CHANNEL_IDLE; int i; config.init_client(&f, NULL); config.init_server(&f, NULL); + grpc_channel_ping(f.client, f.cq, tag(0), NULL); + cq_expect_completion(cqv, tag(0), 0); + + /* check that we're still in idle, and start connecting */ + GPR_ASSERT(grpc_channel_check_connectivity_state(f.client, 1) == + GRPC_CHANNEL_IDLE); + /* we'll go through some set of transitions (some might be missed), until + READY is reached */ + while (state != GRPC_CHANNEL_READY) { + grpc_channel_watch_connectivity_state( + f.client, state, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), f.cq, tag(99)); + cq_expect_completion(cqv, tag(99), 1); + cq_verify(cqv); + state = grpc_channel_check_connectivity_state(f.client, 0); + GPR_ASSERT(state == GRPC_CHANNEL_READY || + state == GRPC_CHANNEL_CONNECTING || + state == GRPC_CHANNEL_TRANSIENT_FAILURE); + } + for (i = 1; i <= 5; i++) { grpc_channel_ping(f.client, f.cq, tag(i), NULL); cq_expect_completion(cqv, tag(i), 1);