From c272dd73aad1273b70fbd038efb02d6f0e169b60 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Tue, 5 Dec 2017 12:18:34 -0800 Subject: [PATCH] Add on_initiate callback for the send_ping tranport op --- .../filters/client_channel/client_channel.cc | 14 +++++--- .../ext/filters/client_channel/lb_policy.cc | 5 +-- .../ext/filters/client_channel/lb_policy.h | 5 +-- .../client_channel/lb_policy/grpclb/grpclb.cc | 32 +++++++++---------- .../lb_policy/pick_first/pick_first.cc | 9 ++++-- .../lb_policy/round_robin/round_robin.cc | 10 ++++-- .../ext/filters/client_channel/subchannel.cc | 6 ++-- .../ext/filters/client_channel/subchannel.h | 3 +- .../chttp2/transport/chttp2_transport.cc | 5 +-- src/core/lib/surface/channel_ping.cc | 2 +- src/core/lib/surface/lame_client.cc | 9 ++++-- src/core/lib/transport/transport.h | 10 ++++-- src/core/lib/transport/transport_op_string.cc | 2 +- 13 files changed, 71 insertions(+), 41 deletions(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index aced9adf9f1..fc8210569e8 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -643,16 +643,22 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg, op->connectivity_state = nullptr; } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (chand->lb_policy == nullptr) { GRPC_CLOSURE_SCHED( - exec_ctx, op->send_ping, + exec_ctx, op->send_ping.on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); + GRPC_CLOSURE_SCHED( + exec_ctx, op->send_ping.on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); } else { - grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); + grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, + op->send_ping.on_initiate, + op->send_ping.on_ack); op->bind_pollset = nullptr; } - op->send_ping = nullptr; + op->send_ping.on_initiate = nullptr; + op->send_ping.on_ack = nullptr; } if (op->disconnect_with_error != GRPC_ERROR_NONE) { diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index db566f1b562..6b6022c2476 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -138,8 +138,9 @@ void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx* exec_ctx, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure) { - policy->vtable->ping_one_locked(exec_ctx, policy, closure); + grpc_closure* on_initiate, + grpc_closure* on_ack) { + policy->vtable->ping_one_locked(exec_ctx, policy, on_initiate, on_ack); } void grpc_lb_policy_notify_on_state_change_locked( diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index d3159eebf3b..38cc26422fc 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -78,7 +78,7 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_ping_one */ void (*ping_one_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure); + grpc_closure* on_initiate, grpc_closure* on_ack); /** Try to enter a READY connectivity state */ void (*exit_idle_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy); @@ -171,7 +171,8 @@ int grpc_lb_policy_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, against one of the connected subchannels managed by \a policy. */ void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, - grpc_closure* closure); + grpc_closure* on_initiate, + grpc_closure* on_ack); /** Cancel picks for \a target. The \a on_complete callback of the pending picks will be invoked with \a diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index db06fc20b60..a6972b850f0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -275,18 +275,17 @@ static void add_pending_pick(pending_pick** root, typedef struct pending_ping { struct pending_ping* next; - /* args for wrapped_notify */ - wrapped_rr_closure_arg wrapped_notify_arg; + /* args for sending the ping */ + grpc_closure* on_initiate; + grpc_closure* on_ack; } pending_ping; -static void add_pending_ping(pending_ping** root, grpc_closure* notify) { +static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate, + grpc_closure* on_ack) { pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); - pping->wrapped_notify_arg.wrapped_closure = notify; - pping->wrapped_notify_arg.free_when_done = pping; + pping->on_initiate = on_initiate; + pping->on_ack = on_ack; pping->next = *root; - GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure, - wrapped_rr_closure, &pping->wrapped_notify_arg, - grpc_schedule_on_exec_ctx); *root = pping; } @@ -822,14 +821,13 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy, pending_ping* pping; while ((pping = glb_policy->pending_pings)) { glb_policy->pending_pings = pping->next; - GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping"); - pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy; if (grpc_lb_glb_trace.enabled()) { gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", glb_policy, glb_policy->rr_policy); } grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify_arg.wrapper_closure); + pping->on_initiate, pping->on_ack); + gpr_free(pping); } } @@ -1052,8 +1050,8 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) { while (pping != nullptr) { pending_ping* next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, pping->on_initiate, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, pping->on_ack, GRPC_ERROR_REF(error)); gpr_free(pping); pping = next; } @@ -1251,12 +1249,14 @@ static grpc_connectivity_state glb_check_connectivity_locked( } static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { glb_lb_policy* glb_policy = (glb_lb_policy*)pol; if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate, + on_ack); } else { - add_pending_ping(&glb_policy->pending_pings, closure); + add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 228a77d9db7..b2007ca301a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -226,13 +226,16 @@ static void pf_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, } static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { pick_first_lb_policy* p = (pick_first_lb_policy*)pol; if (p->selected) { grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, - closure); + on_initiate, on_ack); } else { - GRPC_CLOSURE_SCHED(exec_ctx, closure, + GRPC_CLOSURE_SCHED(exec_ctx, on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); + GRPC_CLOSURE_SCHED(exec_ctx, on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index f68daba474c..bf6a72a62a8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -548,7 +548,8 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx, } static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { round_robin_lb_policy* p = (round_robin_lb_policy*)pol; const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); if (next_ready_index < p->subchannel_list->num_subchannels) { @@ -556,11 +557,14 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, &p->subchannel_list->subchannels[next_ready_index]; grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( selected->connected_subchannel, "rr_ping"); - grpc_connected_subchannel_ping(exec_ctx, target, closure); + grpc_connected_subchannel_ping(exec_ctx, target, on_initiate, on_ack); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); } else { GRPC_CLOSURE_SCHED( - exec_ctx, closure, + exec_ctx, on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); + GRPC_CLOSURE_SCHED( + exec_ctx, on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); } } diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 58e294d597f..bff2ae487dd 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -584,10 +584,12 @@ void grpc_connected_subchannel_notify_on_state_change( void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* con, - grpc_closure* closure) { + grpc_closure* on_initiate, + grpc_closure* on_ack) { grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_channel_element* elem; - op->send_ping = closure; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem->filter->start_transport_op(exec_ctx, elem, op); } diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 1f326fc1d20..3916ea00caf 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -135,7 +135,8 @@ void grpc_connected_subchannel_notify_on_state_change( grpc_closure* notify); void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, grpc_connected_subchannel* channel, - grpc_closure* notify); + grpc_closure* on_initiate, + grpc_closure* on_ack); /** retrieve the grpc_connected_subchannel - or NULL if called before the subchannel becomes connected */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 63ac65ac788..ea637e6bece 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1815,8 +1815,9 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); } - if (op->send_ping) { - send_ping_locked(exec_ctx, t, nullptr, op->send_ping); + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { + send_ping_locked(exec_ctx, t, op->send_ping.on_initiate, + op->send_ping.on_ack); grpc_chttp2_initiate_write(exec_ctx, t, GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); } diff --git a/src/core/lib/surface/channel_ping.cc b/src/core/lib/surface/channel_ping.cc index e8f47f01cf4..7b1964fd55e 100644 --- a/src/core/lib/surface/channel_ping.cc +++ b/src/core/lib/surface/channel_ping.cc @@ -57,7 +57,7 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq, pr->tag = tag; pr->cq = cq; GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); - op->send_ping = &pr->closure; + op->send_ping.on_ack = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); GPR_ASSERT(grpc_cq_begin_op(cq, tag)); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index c32c9af50e5..559d7af43e6 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -104,9 +104,14 @@ static void lame_start_transport_op(grpc_exec_ctx* exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE); } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr) { GRPC_CLOSURE_SCHED( - exec_ctx, op->send_ping, + exec_ctx, op->send_ping.on_initiate, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); + } + if (op->send_ping.on_ack != nullptr) { + GRPC_CLOSURE_SCHED( + exec_ctx, op->send_ping.on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index b3cf04c22da..73264142d96 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -245,8 +245,14 @@ typedef struct grpc_transport_op { grpc_pollset* bind_pollset; /** add this transport to a pollset_set */ grpc_pollset_set* bind_pollset_set; - /** send a ping, call this back if not NULL */ - grpc_closure* send_ping; + /** send a ping, if either on_initiate or on_ack is not NULL */ + struct { + /** Ping may be delayed by the transport, on_initiate callback will be + called when the ping is actually being sent. */ + grpc_closure* on_initiate; + /** Called when the ping ack is received */ + grpc_closure* on_ack; + } send_ping; /*************************************************************************** * remaining fields are initialized and used at the discretion of the diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index e69ab025707..c0f82fea0dc 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -187,7 +187,7 @@ char* grpc_transport_op_string(grpc_transport_op* op) { gpr_strvec_add(&b, gpr_strdup("BIND_POLLSET_SET")); } - if (op->send_ping != nullptr) { + if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) { if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); // first = false; gpr_strvec_add(&b, gpr_strdup("SEND_PING"));