Client connectivity API

Initial plumbing work; needs tests and more client_channel
implementation.
pull/2477/head
Craig Tiller 10 years ago
parent 3e5d61670e
commit 48cb07c909
  1. 3
      BUILD
  2. 2
      Makefile
  3. 1
      build.json
  4. 1
      gRPC.podspec
  5. 19
      include/grpc/grpc.h
  6. 40
      src/core/channel/client_channel.c
  7. 7
      src/core/channel/client_channel.h
  8. 36
      src/core/client_config/lb_policies/pick_first.c
  9. 4
      src/core/client_config/lb_policy.c
  10. 5
      src/core/client_config/lb_policy.h
  11. 182
      src/core/surface/channel_connectivity.c
  12. 1
      tools/doxygen/Doxyfile.core.internal
  13. 2
      tools/run_tests/sources_and_headers.json
  14. 2
      vsprojects/grpc/grpc.vcxproj
  15. 3
      vsprojects/grpc/grpc.vcxproj.filters
  16. 2
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  17. 3
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -340,6 +340,7 @@ cc_library(
"src/core/surface/call_details.c",
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",
@ -571,6 +572,7 @@ cc_library(
"src/core/surface/call_details.c",
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",
@ -1045,6 +1047,7 @@ objc_library(
"src/core/surface/call_details.c",
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",

@ -3340,6 +3340,7 @@ LIBGRPC_SRC = \
src/core/surface/call_details.c \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_connectivity.c \
src/core/surface/channel_create.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \
@ -3601,6 +3602,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/surface/call_details.c \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_connectivity.c \
src/core/surface/channel_create.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \

@ -277,6 +277,7 @@
"src/core/surface/call_details.c",
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/event_string.c",

@ -349,6 +349,7 @@ Pod::Spec.new do |s|
'src/core/surface/call_details.c',
'src/core/surface/call_log_batch.c',
'src/core/surface/channel.c',
'src/core/surface/channel_connectivity.c',
'src/core/surface/channel_create.c',
'src/core/surface/completion_queue.c',
'src/core/surface/event_string.c',

@ -392,6 +392,25 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cq);
drained and no threads are executing grpc_completion_queue_next */
void grpc_completion_queue_destroy(grpc_completion_queue *cq);
/** Check the connectivity state of a channel. */
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect);
/** Watch for a change in connectivity state.
Once the channel connectivity state is different from last_observed_state,
tag will be enqueued on cq with success=1.
If deadline expires BEFORE the state is changed, tag will be enqueued on cq
with success=0.
If optional_new_state is non-NULL, it will be set to the newly observed
connectivity
state of the channel at the same point as tag is enqueued onto the
completion
queue. */
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
grpc_completion_queue *cq, void *tag);
/* Create a call given a grpc_channel, in order to call 'method'. All
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function. */

@ -77,6 +77,8 @@ typedef struct {
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
int exit_idle_when_lb_policy_arrives;
} channel_data;
typedef enum {
@ -398,6 +400,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
@ -415,8 +418,18 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
wakeup_closures = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = NULL;
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
exit_idle = 1;
chand->exit_idle_when_lb_policy_arrives = 0;
}
gpr_mu_unlock(&chand->mu_config);
if (exit_idle) {
grpc_lb_policy_exit_idle(lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
}
if (old_lb_policy) {
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
@ -609,3 +622,30 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_exit_idle(chand->lb_policy);
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
}
}
gpr_mu_unlock(&chand->mu_config);
return out;
}
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
on_complete);
gpr_mu_unlock(&chand->mu_config);
}

@ -52,4 +52,11 @@ extern const grpc_channel_filter grpc_client_channel_filter;
void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_channel_element *elem, int try_to_connect);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
grpc_iomgr_closure *on_complete);
#endif /* GRPC_INTERNAL_CORE_CHANNEL_CLIENT_CHANNEL_H */

@ -97,6 +97,25 @@ void pf_shutdown(grpc_lb_policy *pol) {
gpr_mu_unlock(&p->mu);
}
static void start_picking(pick_first_lb_policy *p) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
&p->checking_connectivity,
&p->connectivity_changed);
}
void pf_exit_idle(grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
start_picking(p);
}
gpr_mu_unlock(&p->mu);
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete) {
@ -109,13 +128,7 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
on_complete->cb(on_complete->cb_arg, 1);
} else {
if (!p->started_picking) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(pol, "pick_first_connectivity");
grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
&p->connectivity_changed);
start_picking(p);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@ -249,8 +262,13 @@ static void pf_notify_on_state_change(grpc_lb_policy *pol,
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
pf_destroy, pf_shutdown, pf_pick,
pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
pf_destroy,
pf_shutdown,
pf_pick,
pf_exit_idle,
pf_broadcast,
pf_check_connectivity,
pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {

@ -77,3 +77,7 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
policy->vtable->broadcast(policy, op);
}
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
policy->vtable->exit_idle(policy);
}

@ -59,6 +59,9 @@ struct grpc_lb_policy_vtable {
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
/** try to enter a READY connectivity state */
void (*exit_idle)(grpc_lb_policy *policy);
/** broadcast a transport op to all subchannels */
void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
@ -106,4 +109,6 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */

@ -0,0 +1,182 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/surface/channel.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/channel/client_channel.h"
#include "src/core/iomgr/alarm.h"
#include "src/core/surface/completion_queue.h"
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect) {
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
return GRPC_CHANNEL_FATAL_FAILURE;
}
return grpc_client_channel_check_connectivity_state(client_channel_elem,
try_to_connect);
}
typedef enum {
WAITING,
CALLING_BACK,
CALLING_BACK_AND_FINISHED,
CALLED_BACK
} callback_phase;
typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
grpc_iomgr_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
grpc_connectivity_state *optional_new_state;
grpc_completion_queue *cq;
grpc_cq_completion completion_storage;
void *tag;
} state_watcher;
static void delete_state_watcher(state_watcher *w) {
gpr_mu_destroy(&w->mu);
gpr_free(w);
}
static void finished_completion(void *pw, grpc_cq_completion *ignored) {
int delete = 0;
state_watcher *w = pw;
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
case CALLED_BACK:
gpr_log(GPR_ERROR, "should never reach here");
abort();
break;
case CALLING_BACK:
w->phase = CALLED_BACK;
break;
case CALLING_BACK_AND_FINISHED:
delete = 1;
break;
}
gpr_mu_unlock(&w->mu);
if (delete) {
delete_state_watcher(w);
}
}
static void partly_done(state_watcher *w, int due_to_completion) {
int delete = 0;
if (due_to_completion) {
gpr_mu_lock(&w->mu);
w->success = 1;
gpr_mu_unlock(&w->mu);
grpc_alarm_cancel(&w->alarm);
}
gpr_mu_lock(&w->mu);
switch (w->phase) {
case WAITING:
w->phase = CALLING_BACK;
if (w->optional_new_state) {
*w->optional_new_state = w->state;
}
grpc_cq_end_op(w->cq, w->tag, w->success, finished_completion, w,
&w->completion_storage);
break;
case CALLING_BACK:
w->phase = CALLING_BACK_AND_FINISHED;
break;
case CALLING_BACK_AND_FINISHED:
gpr_log(GPR_ERROR, "should never reach here");
abort();
break;
case CALLED_BACK:
delete = 1;
break;
}
gpr_mu_unlock(&w->mu);
if (delete) {
delete_state_watcher(w);
}
}
static void watch_complete(void *pw, int success) { partly_done(pw, 0); }
static void timeout_complete(void *pw, int success) { partly_done(pw, 1); }
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
grpc_connectivity_state *optional_new_state, gpr_timespec deadline,
grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
state_watcher *w = gpr_malloc(sizeof(*w));
grpc_cq_begin_op(cq);
gpr_mu_init(&w->mu);
grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
w->optional_new_state = optional_new_state;
w->cq = cq;
w->tag = tag;
grpc_alarm_init(&w->alarm, deadline, timeout_complete, w,
gpr_now(GPR_CLOCK_REALTIME));
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
grpc_iomgr_add_delayed_callback(&w->on_complete, 1);
} else {
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
&w->on_complete);
}
}

@ -975,6 +975,7 @@ src/core/surface/call.c \
src/core/surface/call_details.c \
src/core/surface/call_log_batch.c \
src/core/surface/channel.c \
src/core/surface/channel_connectivity.c \
src/core/surface/channel_create.c \
src/core/surface/completion_queue.c \
src/core/surface/event_string.c \

@ -9023,6 +9023,7 @@
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel.h",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/completion_queue.h",
@ -9426,6 +9427,7 @@
"src/core/surface/call_log_batch.c",
"src/core/surface/channel.c",
"src/core/surface/channel.h",
"src/core/surface/channel_connectivity.c",
"src/core/surface/channel_create.c",
"src/core/surface/completion_queue.c",
"src/core/surface/completion_queue.h",

@ -461,6 +461,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">

@ -286,6 +286,9 @@
<ClCompile Include="..\..\src\core\surface\channel.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
<Filter>src\core\surface</Filter>
</ClCompile>

@ -396,6 +396,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\completion_queue.c">

@ -217,6 +217,9 @@
<ClCompile Include="..\..\src\core\surface\channel.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_connectivity.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\channel_create.c">
<Filter>src\core\surface</Filter>
</ClCompile>

Loading…
Cancel
Save