Add on_initiate callback for the send_ping tranport op

pull/13647/head
Yuchen Zeng 7 years ago
parent 96311af518
commit c272dd73aa
  1. 14
      src/core/ext/filters/client_channel/client_channel.cc
  2. 5
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 5
      src/core/ext/filters/client_channel/lb_policy.h
  4. 32
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 9
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  6. 10
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  7. 6
      src/core/ext/filters/client_channel/subchannel.cc
  8. 3
      src/core/ext/filters/client_channel/subchannel.h
  9. 5
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  10. 2
      src/core/lib/surface/channel_ping.cc
  11. 9
      src/core/lib/surface/lame_client.cc
  12. 10
      src/core/lib/transport/transport.h
  13. 2
      src/core/lib/transport/transport_op_string.cc

@ -643,16 +643,22 @@ static void start_transport_op_locked(grpc_exec_ctx* exec_ctx, void* arg,
op->connectivity_state = nullptr; op->connectivity_state = nullptr;
} }
if (op->send_ping != nullptr) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
if (chand->lb_policy == nullptr) { if (chand->lb_policy == nullptr) {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping, exec_ctx, op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing"));
} else { } else {
grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy,
op->send_ping.on_initiate,
op->send_ping.on_ack);
op->bind_pollset = nullptr; op->bind_pollset = nullptr;
} }
op->send_ping = nullptr; op->send_ping.on_initiate = nullptr;
op->send_ping.on_ack = nullptr;
} }
if (op->disconnect_with_error != GRPC_ERROR_NONE) { if (op->disconnect_with_error != GRPC_ERROR_NONE) {

@ -138,8 +138,9 @@ void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx* exec_ctx,
void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy, grpc_lb_policy* policy,
grpc_closure* closure) { grpc_closure* on_initiate,
policy->vtable->ping_one_locked(exec_ctx, policy, closure); grpc_closure* on_ack) {
policy->vtable->ping_one_locked(exec_ctx, policy, on_initiate, on_ack);
} }
void grpc_lb_policy_notify_on_state_change_locked( void grpc_lb_policy_notify_on_state_change_locked(

@ -78,7 +78,7 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_ping_one */ /** \see grpc_lb_policy_ping_one */
void (*ping_one_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy, void (*ping_one_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
grpc_closure* closure); grpc_closure* on_initiate, grpc_closure* on_ack);
/** Try to enter a READY connectivity state */ /** Try to enter a READY connectivity state */
void (*exit_idle_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy); void (*exit_idle_locked)(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy);
@ -171,7 +171,8 @@ int grpc_lb_policy_pick_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* policy,
against one of the connected subchannels managed by \a policy. */ against one of the connected subchannels managed by \a policy. */
void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx, void grpc_lb_policy_ping_one_locked(grpc_exec_ctx* exec_ctx,
grpc_lb_policy* policy, grpc_lb_policy* policy,
grpc_closure* closure); grpc_closure* on_initiate,
grpc_closure* on_ack);
/** Cancel picks for \a target. /** Cancel picks for \a target.
The \a on_complete callback of the pending picks will be invoked with \a The \a on_complete callback of the pending picks will be invoked with \a

@ -275,18 +275,17 @@ static void add_pending_pick(pending_pick** root,
typedef struct pending_ping { typedef struct pending_ping {
struct pending_ping* next; struct pending_ping* next;
/* args for wrapped_notify */ /* args for sending the ping */
wrapped_rr_closure_arg wrapped_notify_arg; grpc_closure* on_initiate;
grpc_closure* on_ack;
} pending_ping; } pending_ping;
static void add_pending_ping(pending_ping** root, grpc_closure* notify) { static void add_pending_ping(pending_ping** root, grpc_closure* on_initiate,
grpc_closure* on_ack) {
pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping)); pending_ping* pping = (pending_ping*)gpr_zalloc(sizeof(*pping));
pping->wrapped_notify_arg.wrapped_closure = notify; pping->on_initiate = on_initiate;
pping->wrapped_notify_arg.free_when_done = pping; pping->on_ack = on_ack;
pping->next = *root; pping->next = *root;
GRPC_CLOSURE_INIT(&pping->wrapped_notify_arg.wrapper_closure,
wrapped_rr_closure, &pping->wrapped_notify_arg,
grpc_schedule_on_exec_ctx);
*root = pping; *root = pping;
} }
@ -822,14 +821,13 @@ static void create_rr_locked(grpc_exec_ctx* exec_ctx, glb_lb_policy* glb_policy,
pending_ping* pping; pending_ping* pping;
while ((pping = glb_policy->pending_pings)) { while ((pping = glb_policy->pending_pings)) {
glb_policy->pending_pings = pping->next; glb_policy->pending_pings = pping->next;
GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_ping");
pping->wrapped_notify_arg.rr_policy = glb_policy->rr_policy;
if (grpc_lb_glb_trace.enabled()) { if (grpc_lb_glb_trace.enabled()) {
gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p", gpr_log(GPR_INFO, "[grpclb %p] Pending ping about to PING from RR %p",
glb_policy, glb_policy->rr_policy); glb_policy, glb_policy->rr_policy);
} }
grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy,
&pping->wrapped_notify_arg.wrapper_closure); pping->on_initiate, pping->on_ack);
gpr_free(pping);
} }
} }
@ -1052,8 +1050,8 @@ static void glb_shutdown_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol) {
while (pping != nullptr) { while (pping != nullptr) {
pending_ping* next = pping->next; pending_ping* next = pping->next;
GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, GRPC_CLOSURE_SCHED(exec_ctx, pping->on_initiate, GRPC_ERROR_REF(error));
GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(exec_ctx, pping->on_ack, GRPC_ERROR_REF(error));
gpr_free(pping); gpr_free(pping);
pping = next; pping = next;
} }
@ -1251,12 +1249,14 @@ static grpc_connectivity_state glb_check_connectivity_locked(
} }
static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, static void glb_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_closure* closure) { grpc_closure* on_initiate,
grpc_closure* on_ack) {
glb_lb_policy* glb_policy = (glb_lb_policy*)pol; glb_lb_policy* glb_policy = (glb_lb_policy*)pol;
if (glb_policy->rr_policy) { if (glb_policy->rr_policy) {
grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, on_initiate,
on_ack);
} else { } else {
add_pending_ping(&glb_policy->pending_pings, closure); add_pending_ping(&glb_policy->pending_pings, on_initiate, on_ack);
if (!glb_policy->started_picking) { if (!glb_policy->started_picking) {
start_picking_locked(exec_ctx, glb_policy); start_picking_locked(exec_ctx, glb_policy);
} }

@ -226,13 +226,16 @@ static void pf_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
} }
static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, static void pf_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_closure* closure) { grpc_closure* on_initiate,
grpc_closure* on_ack) {
pick_first_lb_policy* p = (pick_first_lb_policy*)pol; pick_first_lb_policy* p = (pick_first_lb_policy*)pol;
if (p->selected) { if (p->selected) {
grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel, grpc_connected_subchannel_ping(exec_ctx, p->selected->connected_subchannel,
closure); on_initiate, on_ack);
} else { } else {
GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_CLOSURE_SCHED(exec_ctx, on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
GRPC_CLOSURE_SCHED(exec_ctx, on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Not connected"));
} }
} }

@ -548,7 +548,8 @@ static void rr_notify_on_state_change_locked(grpc_exec_ctx* exec_ctx,
} }
static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol, static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
grpc_closure* closure) { grpc_closure* on_initiate,
grpc_closure* on_ack) {
round_robin_lb_policy* p = (round_robin_lb_policy*)pol; round_robin_lb_policy* p = (round_robin_lb_policy*)pol;
const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); const size_t next_ready_index = get_next_ready_subchannel_index_locked(p);
if (next_ready_index < p->subchannel_list->num_subchannels) { if (next_ready_index < p->subchannel_list->num_subchannels) {
@ -556,11 +557,14 @@ static void rr_ping_one_locked(grpc_exec_ctx* exec_ctx, grpc_lb_policy* pol,
&p->subchannel_list->subchannels[next_ready_index]; &p->subchannel_list->subchannels[next_ready_index];
grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_connected_subchannel* target = GRPC_CONNECTED_SUBCHANNEL_REF(
selected->connected_subchannel, "rr_ping"); selected->connected_subchannel, "rr_ping");
grpc_connected_subchannel_ping(exec_ctx, target, closure); grpc_connected_subchannel_ping(exec_ctx, target, on_initiate, on_ack);
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping"); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_ping");
} else { } else {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
exec_ctx, closure, exec_ctx, on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
GRPC_CLOSURE_SCHED(
exec_ctx, on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Round Robin not connected"));
} }
} }

@ -584,10 +584,12 @@ void grpc_connected_subchannel_notify_on_state_change(
void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx,
grpc_connected_subchannel* con, grpc_connected_subchannel* con,
grpc_closure* closure) { grpc_closure* on_initiate,
grpc_closure* on_ack) {
grpc_transport_op* op = grpc_make_transport_op(nullptr); grpc_transport_op* op = grpc_make_transport_op(nullptr);
grpc_channel_element* elem; grpc_channel_element* elem;
op->send_ping = closure; op->send_ping.on_initiate = on_initiate;
op->send_ping.on_ack = on_ack;
elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(con), 0);
elem->filter->start_transport_op(exec_ctx, elem, op); elem->filter->start_transport_op(exec_ctx, elem, op);
} }

@ -135,7 +135,8 @@ void grpc_connected_subchannel_notify_on_state_change(
grpc_closure* notify); grpc_closure* notify);
void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx, void grpc_connected_subchannel_ping(grpc_exec_ctx* exec_ctx,
grpc_connected_subchannel* channel, grpc_connected_subchannel* channel,
grpc_closure* notify); grpc_closure* on_initiate,
grpc_closure* on_ack);
/** retrieve the grpc_connected_subchannel - or NULL if called before /** retrieve the grpc_connected_subchannel - or NULL if called before
the subchannel becomes connected */ the subchannel becomes connected */

@ -1815,8 +1815,9 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set); grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, op->bind_pollset_set);
} }
if (op->send_ping) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
send_ping_locked(exec_ctx, t, nullptr, op->send_ping); send_ping_locked(exec_ctx, t, op->send_ping.on_initiate,
op->send_ping.on_ack);
grpc_chttp2_initiate_write(exec_ctx, t, grpc_chttp2_initiate_write(exec_ctx, t,
GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING); GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
} }

@ -57,7 +57,7 @@ void grpc_channel_ping(grpc_channel* channel, grpc_completion_queue* cq,
pr->tag = tag; pr->tag = tag;
pr->cq = cq; pr->cq = cq;
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure; op->send_ping.on_ack = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq); op->bind_pollset = grpc_cq_pollset(cq);
GPR_ASSERT(grpc_cq_begin_op(cq, tag)); GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op); top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);

@ -104,9 +104,14 @@ static void lame_start_transport_op(grpc_exec_ctx* exec_ctx,
GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change, GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
if (op->send_ping != nullptr) { if (op->send_ping.on_initiate != nullptr) {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping, exec_ctx, op->send_ping.on_initiate,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
}
if (op->send_ping.on_ack != nullptr) {
GRPC_CLOSURE_SCHED(
exec_ctx, op->send_ping.on_ack,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
} }
GRPC_ERROR_UNREF(op->disconnect_with_error); GRPC_ERROR_UNREF(op->disconnect_with_error);

@ -245,8 +245,14 @@ typedef struct grpc_transport_op {
grpc_pollset* bind_pollset; grpc_pollset* bind_pollset;
/** add this transport to a pollset_set */ /** add this transport to a pollset_set */
grpc_pollset_set* bind_pollset_set; grpc_pollset_set* bind_pollset_set;
/** send a ping, call this back if not NULL */ /** send a ping, if either on_initiate or on_ack is not NULL */
grpc_closure* send_ping; struct {
/** Ping may be delayed by the transport, on_initiate callback will be
called when the ping is actually being sent. */
grpc_closure* on_initiate;
/** Called when the ping ack is received */
grpc_closure* on_ack;
} send_ping;
/*************************************************************************** /***************************************************************************
* remaining fields are initialized and used at the discretion of the * remaining fields are initialized and used at the discretion of the

@ -187,7 +187,7 @@ char* grpc_transport_op_string(grpc_transport_op* op) {
gpr_strvec_add(&b, gpr_strdup("BIND_POLLSET_SET")); gpr_strvec_add(&b, gpr_strdup("BIND_POLLSET_SET"));
} }
if (op->send_ping != nullptr) { if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
if (!first) gpr_strvec_add(&b, gpr_strdup(" ")); if (!first) gpr_strvec_add(&b, gpr_strdup(" "));
// first = false; // first = false;
gpr_strvec_add(&b, gpr_strdup("SEND_PING")); gpr_strvec_add(&b, gpr_strdup("SEND_PING"));

Loading…
Cancel
Save