mirror of https://github.com/grpc/grpc.git
parent
555b84506e
commit
7269667f9e
19 changed files with 491 additions and 156 deletions
@ -0,0 +1,195 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/surface/api_trace.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
|
||||
typedef enum { |
||||
WAITING, |
||||
READY_TO_CALL_BACK, |
||||
CALLING_BACK_AND_FINISHED, |
||||
} callback_phase; |
||||
|
||||
typedef struct { |
||||
gpr_mu mu; |
||||
callback_phase phase; |
||||
grpc_closure on_complete; |
||||
grpc_closure on_timeout; |
||||
grpc_closure watcher_timer_init; |
||||
grpc_timer alarm; |
||||
grpc_connectivity_state state; |
||||
grpc_completion_queue *cq; |
||||
grpc_cq_completion completion_storage; |
||||
grpc_channel_element *client_channel_elem; |
||||
grpc_channel_stack *channel_stack; |
||||
grpc_error *error; |
||||
void *tag; |
||||
} state_watcher; |
||||
|
||||
static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) { |
||||
GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->channel_stack, |
||||
"watch_channel_connectivity"); |
||||
gpr_mu_destroy(&w->mu); |
||||
gpr_free(w); |
||||
} |
||||
|
||||
static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw, |
||||
grpc_cq_completion *ignored) { |
||||
bool should_delete = false; |
||||
state_watcher *w = (state_watcher *)pw; |
||||
gpr_mu_lock(&w->mu); |
||||
switch (w->phase) { |
||||
case WAITING: |
||||
case READY_TO_CALL_BACK: |
||||
GPR_UNREACHABLE_CODE(return ); |
||||
case CALLING_BACK_AND_FINISHED: |
||||
should_delete = true; |
||||
break; |
||||
} |
||||
gpr_mu_unlock(&w->mu); |
||||
|
||||
if (should_delete) { |
||||
delete_state_watcher(exec_ctx, w); |
||||
} |
||||
} |
||||
|
||||
static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w, |
||||
bool due_to_completion, grpc_error *error) { |
||||
if (due_to_completion) { |
||||
grpc_timer_cancel(exec_ctx, &w->alarm); |
||||
} else { |
||||
grpc_channel_element *client_channel_elem = w->client_channel_elem; |
||||
grpc_client_channel_watch_connectivity_state( |
||||
exec_ctx, client_channel_elem, |
||||
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(w->cq)), NULL, |
||||
&w->on_complete, NULL); |
||||
} |
||||
|
||||
gpr_mu_lock(&w->mu); |
||||
|
||||
if (due_to_completion) { |
||||
if (GRPC_TRACER_ON(grpc_trace_operation_failures)) { |
||||
GRPC_LOG_IF_ERROR("watch_completion_error", GRPC_ERROR_REF(error)); |
||||
} |
||||
GRPC_ERROR_UNREF(error); |
||||
error = GRPC_ERROR_NONE; |
||||
} else { |
||||
if (error == GRPC_ERROR_NONE) { |
||||
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||
"Timed out waiting for connection state change"); |
||||
} else if (error == GRPC_ERROR_CANCELLED) { |
||||
error = GRPC_ERROR_NONE; |
||||
} |
||||
} |
||||
switch (w->phase) { |
||||
case WAITING: |
||||
GRPC_ERROR_REF(error); |
||||
w->error = error; |
||||
w->phase = READY_TO_CALL_BACK; |
||||
break; |
||||
case READY_TO_CALL_BACK: |
||||
if (error != GRPC_ERROR_NONE) { |
||||
GPR_ASSERT(!due_to_completion); |
||||
GRPC_ERROR_UNREF(w->error); |
||||
GRPC_ERROR_REF(error); |
||||
w->error = error; |
||||
} |
||||
w->phase = CALLING_BACK_AND_FINISHED; |
||||
grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->error, finished_completion, w, |
||||
&w->completion_storage); |
||||
break; |
||||
case CALLING_BACK_AND_FINISHED: |
||||
GPR_UNREACHABLE_CODE(return ); |
||||
break; |
||||
} |
||||
gpr_mu_unlock(&w->mu); |
||||
|
||||
GRPC_ERROR_UNREF(error); |
||||
} |
||||
|
||||
static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, |
||||
grpc_error *error) { |
||||
partly_done(exec_ctx, (state_watcher *)pw, true, GRPC_ERROR_REF(error)); |
||||
} |
||||
|
||||
static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, |
||||
grpc_error *error) { |
||||
partly_done(exec_ctx, (state_watcher *)pw, false, GRPC_ERROR_REF(error)); |
||||
} |
||||
|
||||
typedef struct watcher_timer_init_arg { |
||||
state_watcher *w; |
||||
gpr_timespec deadline; |
||||
} watcher_timer_init_arg; |
||||
|
||||
static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg, |
||||
grpc_error *error_ignored) { |
||||
watcher_timer_init_arg *wa = (watcher_timer_init_arg *)arg; |
||||
|
||||
grpc_timer_init(exec_ctx, &wa->w->alarm, |
||||
gpr_convert_clock_type(wa->deadline, GPR_CLOCK_MONOTONIC), |
||||
&wa->w->on_timeout, gpr_now(GPR_CLOCK_MONOTONIC)); |
||||
gpr_free(wa); |
||||
} |
||||
|
||||
void grpc_channel_watch_connectivity_state_internal( |
||||
grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem, |
||||
grpc_channel_stack *channel_stack, |
||||
grpc_connectivity_state last_observed_state, gpr_timespec deadline, |
||||
grpc_completion_queue *cq, void *tag) { |
||||
state_watcher *w = (state_watcher *)gpr_malloc(sizeof(*w)); |
||||
|
||||
GPR_ASSERT(grpc_cq_begin_op(cq, tag)); |
||||
|
||||
gpr_mu_init(&w->mu); |
||||
GRPC_CLOSURE_INIT(&w->on_complete, watch_complete, w, |
||||
grpc_schedule_on_exec_ctx); |
||||
GRPC_CLOSURE_INIT(&w->on_timeout, timeout_complete, w, |
||||
grpc_schedule_on_exec_ctx); |
||||
w->phase = WAITING; |
||||
w->state = last_observed_state; |
||||
w->cq = cq; |
||||
w->tag = tag; |
||||
w->client_channel_elem = client_channel_elem; |
||||
w->channel_stack = channel_stack; |
||||
w->error = NULL; |
||||
|
||||
watcher_timer_init_arg *wa = |
||||
(watcher_timer_init_arg *)gpr_malloc(sizeof(watcher_timer_init_arg)); |
||||
wa->w = w; |
||||
wa->deadline = deadline; |
||||
GRPC_CLOSURE_INIT(&w->watcher_timer_init, watcher_timer_init, wa, |
||||
grpc_schedule_on_exec_ctx); |
||||
|
||||
if (client_channel_elem->filter == &grpc_client_channel_filter) { |
||||
GRPC_CHANNEL_STACK_REF(channel_stack, "watch_channel_connectivity"); |
||||
grpc_client_channel_watch_connectivity_state( |
||||
exec_ctx, client_channel_elem, |
||||
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)), &w->state, |
||||
&w->on_complete, &w->watcher_timer_init); |
||||
} else { |
||||
abort(); |
||||
} |
||||
} |
@ -0,0 +1,33 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
|
||||
void grpc_channel_watch_connectivity_state_internal( |
||||
grpc_exec_ctx *exec_ctx, grpc_channel_element *client_channel_elem, |
||||
grpc_channel_stack *channel_stack, |
||||
grpc_connectivity_state last_observed_state, gpr_timespec deadline, |
||||
grpc_completion_queue *cq, void *tag); |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CHANNEL_CONNECTIVITY_INTERNAL_H \ |
||||
*/ |
@ -0,0 +1,179 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include "src/core/ext/filters/client_channel/connectivity_watcher.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/sync.h> |
||||
#include "src/core/ext/filters/client_channel/channel_connectivity_internal.h" |
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/lib/iomgr/timer.h" |
||||
#include "src/core/lib/support/env.h" |
||||
#include "src/core/lib/support/string.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/surface/completion_queue.h" |
||||
|
||||
#define DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS 500 |
||||
|
||||
typedef struct connectivity_watcher { |
||||
grpc_timer watcher_timer; |
||||
grpc_closure check_connectivity_closure; |
||||
grpc_completion_queue* cq; |
||||
gpr_refcount refs; |
||||
size_t channel_count; |
||||
bool shutting_down; |
||||
} connectivity_watcher; |
||||
|
||||
typedef struct channel_state { |
||||
grpc_channel_element* client_channel_elem; |
||||
grpc_channel_stack* channel_stack; |
||||
grpc_connectivity_state state; |
||||
} channel_state; |
||||
|
||||
static gpr_once g_once = GPR_ONCE_INIT; |
||||
static gpr_mu g_watcher_mu; |
||||
static connectivity_watcher* g_watcher = NULL; |
||||
|
||||
static void init_g_watcher_mu() { gpr_mu_init(&g_watcher_mu); } |
||||
|
||||
static void start_watching_locked(grpc_exec_ctx* exec_ctx, |
||||
grpc_channel_element* client_channel_elem, |
||||
grpc_channel_stack* channel_stack) { |
||||
gpr_ref(&g_watcher->refs); |
||||
++g_watcher->channel_count; |
||||
channel_state* s = (channel_state*)gpr_zalloc(sizeof(channel_state)); |
||||
s->client_channel_elem = client_channel_elem; |
||||
s->channel_stack = channel_stack; |
||||
s->state = GRPC_CHANNEL_IDLE; |
||||
grpc_channel_watch_connectivity_state_internal( |
||||
exec_ctx, client_channel_elem, channel_stack, s->state, |
||||
gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, (void*)s); |
||||
} |
||||
|
||||
static bool is_disabled() { |
||||
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER"); |
||||
bool disabled = gpr_is_true(env); |
||||
gpr_free(env); |
||||
return disabled; |
||||
} |
||||
|
||||
static bool connectivity_watcher_unref(grpc_exec_ctx* exec_ctx) { |
||||
if (gpr_unref(&g_watcher->refs)) { |
||||
gpr_mu_lock(&g_watcher_mu); |
||||
grpc_completion_queue_destroy(g_watcher->cq); |
||||
gpr_free(g_watcher); |
||||
g_watcher = NULL; |
||||
gpr_mu_unlock(&g_watcher_mu); |
||||
return true; |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
static void check_connectivity_state(grpc_exec_ctx* exec_ctx, void* ignored, |
||||
grpc_error* error) { |
||||
grpc_event ev; |
||||
while (true) { |
||||
gpr_mu_lock(&g_watcher_mu); |
||||
if (g_watcher->shutting_down) { |
||||
// Drain cq if the watcher is shutting down
|
||||
ev = grpc_completion_queue_next( |
||||
g_watcher->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL); |
||||
} else { |
||||
ev = grpc_completion_queue_next(g_watcher->cq, |
||||
gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); |
||||
// Make sure we've seen 2 TIMEOUTs before going to sleep
|
||||
if (ev.type == GRPC_QUEUE_TIMEOUT) { |
||||
ev = grpc_completion_queue_next( |
||||
g_watcher->cq, gpr_inf_past(GPR_CLOCK_MONOTONIC), NULL); |
||||
if (ev.type == GRPC_QUEUE_TIMEOUT) { |
||||
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); |
||||
grpc_timer_init( |
||||
exec_ctx, &g_watcher->watcher_timer, |
||||
gpr_time_add(now, gpr_time_from_millis( |
||||
DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS, |
||||
GPR_TIMESPAN)), |
||||
&g_watcher->check_connectivity_closure, now); |
||||
gpr_mu_unlock(&g_watcher_mu); |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
gpr_mu_unlock(&g_watcher_mu); |
||||
if (ev.type != GRPC_OP_COMPLETE) { |
||||
break; |
||||
} |
||||
channel_state* s = (channel_state*)(ev.tag); |
||||
s->state = grpc_client_channel_check_connectivity_state( |
||||
exec_ctx, s->client_channel_elem, false /* try_to_connect */); |
||||
if (s->state == GRPC_CHANNEL_SHUTDOWN) { |
||||
GRPC_CHANNEL_STACK_UNREF(exec_ctx, s->channel_stack, |
||||
"connectivity_watcher_stop_watching"); |
||||
gpr_free(s); |
||||
if (connectivity_watcher_unref(exec_ctx)) { |
||||
break; |
||||
} |
||||
} else { |
||||
grpc_channel_watch_connectivity_state_internal( |
||||
exec_ctx, s->client_channel_elem, s->channel_stack, s->state, |
||||
gpr_inf_future(GPR_CLOCK_MONOTONIC), g_watcher->cq, s); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void grpc_client_channel_start_watching_connectivity( |
||||
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, |
||||
grpc_channel_stack* channel_stack) { |
||||
if (is_disabled()) return; |
||||
GRPC_CHANNEL_STACK_REF(channel_stack, "connectivity_watcher_start_watching"); |
||||
gpr_once_init(&g_once, init_g_watcher_mu); |
||||
gpr_mu_lock(&g_watcher_mu); |
||||
if (g_watcher == NULL) { |
||||
g_watcher = (connectivity_watcher*)gpr_zalloc(sizeof(connectivity_watcher)); |
||||
g_watcher->cq = grpc_completion_queue_create_internal( |
||||
GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING); |
||||
gpr_ref_init(&g_watcher->refs, 0); |
||||
GRPC_CLOSURE_INIT(&g_watcher->check_connectivity_closure, |
||||
check_connectivity_state, NULL, |
||||
grpc_schedule_on_exec_ctx); |
||||
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); |
||||
grpc_timer_init( |
||||
exec_ctx, &g_watcher->watcher_timer, |
||||
gpr_time_add( |
||||
now, gpr_time_from_millis(DEFAULT_CONNECTIVITY_CHECK_INTERVAL_MS, |
||||
GPR_TIMESPAN)), |
||||
&g_watcher->check_connectivity_closure, now); |
||||
} |
||||
start_watching_locked(exec_ctx, client_channel_elem, channel_stack); |
||||
gpr_mu_init(&g_watcher_mu); |
||||
} |
||||
|
||||
void grpc_client_channel_stop_watching_connectivity( |
||||
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, |
||||
grpc_channel_stack* channel_stack) { |
||||
if (is_disabled()) return; |
||||
gpr_once_init(&g_once, init_g_watcher_mu); |
||||
gpr_mu_lock(&g_watcher_mu); |
||||
if (--g_watcher->channel_count == 0) { |
||||
g_watcher->shutting_down = true; |
||||
grpc_timer_cancel(exec_ctx, &g_watcher->watcher_timer); |
||||
connectivity_watcher_unref(exec_ctx); |
||||
} |
||||
gpr_mu_unlock(&g_watcher_mu); |
||||
} |
@ -0,0 +1,30 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
|
||||
void grpc_client_channel_start_watching_connectivity( |
||||
grpc_exec_ctx* exec_ctx, grpc_channel_element* client_channel_elem, |
||||
grpc_channel_stack* channel_stack); |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CONNECTIVITY_WATCHER_H */ |
Loading…
Reference in new issue