Merge branch 'write_completion' into write_completion_stats

pull/12431/head
Craig Tiller 7 years ago
commit 99bdb388a7
  1. 8
      doc/environment_variables.md
  2. 1
      grpc.def
  3. 3
      include/grpc/grpc.h
  4. 6
      src/core/ext/filters/client_channel/channel_connectivity.c
  5. 8
      src/core/lib/iomgr/iomgr.c
  6. 13
      src/core/lib/support/string.c
  7. 3
      src/core/lib/support/string.h
  8. 202
      src/cpp/client/channel_cc.cc
  9. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  10. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  11. 16
      test/core/support/string_test.c
  12. 134
      test/cpp/end2end/async_end2end_test.cc
  13. 26
      test/cpp/end2end/end2end_test.cc

@ -114,3 +114,11 @@ some configuration as environment variables that can be set.
- native (default)- a DNS resolver based around getaddrinfo(), creates a new thread to
perform name resolution
- ares - a DNS resolver based around the c-ares library
* GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER
The channel connectivity watcher uses one extra thread to check the channel
state every 500 ms on the client side. It can help reconnect disconnected
client channels (mostly due to idleness), so that the next RPC on this channel
won't fail. Set to 1 to turn off this watcher and save a thread. Please note
this is a temporary work-around, it will be removed in the future once we have
support for automatically reestablishing failed connections.

@ -70,6 +70,7 @@ EXPORTS
grpc_channel_check_connectivity_state
grpc_channel_num_external_connectivity_watchers
grpc_channel_watch_connectivity_state
grpc_channel_support_connectivity_watcher
grpc_channel_create_call
grpc_channel_ping
grpc_channel_register_call

@ -178,6 +178,9 @@ GRPCAPI void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
/** Check whether a grpc channel supports connectivity watcher */
GRPCAPI int grpc_channel_support_connectivity_watcher(grpc_channel *channel);
/** Create a call given a grpc_channel, in order to call 'method'. All
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function.

@ -191,6 +191,12 @@ static void watcher_timer_init(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(wa);
}
int grpc_channel_support_connectivity_watcher(grpc_channel *channel) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
return client_channel_elem->filter != &grpc_client_channel_filter ? 0 : 1;
}
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {

@ -164,13 +164,7 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
if (env == NULL) return false;
static const char *truthy[] = {"yes", "Yes", "YES", "true",
"True", "TRUE", "1"};
bool should_we = false;
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == strcmp(env, truthy[i])) should_we = true;
}
bool should_we = gpr_is_true(env);
gpr_free(env);
return should_we;
}

@ -298,3 +298,16 @@ void *gpr_memrchr(const void *s, int c, size_t n) {
}
return NULL;
}
bool gpr_is_true(const char *s) {
if (s == NULL) {
return false;
}
static const char *truthy[] = {"yes", "true", "1"};
for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
if (0 == gpr_stricmp(s, truthy[i])) {
return true;
}
}
return false;
}

@ -19,6 +19,7 @@
#ifndef GRPC_CORE_LIB_SUPPORT_STRING_H
#define GRPC_CORE_LIB_SUPPORT_STRING_H
#include <stdbool.h>
#include <stddef.h>
#include <grpc/support/port_platform.h>
@ -106,6 +107,8 @@ int gpr_stricmp(const char *a, const char *b);
void *gpr_memrchr(const void *s, int c, size_t n);
/** Return true if lower(s) equals "true", "yes" or "1", otherwise false. */
bool gpr_is_true(const char *s);
#ifdef __cplusplus
}
#endif

@ -18,7 +18,10 @@
#include <grpc++/channel.h>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
@ -35,17 +38,197 @@
#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
namespace grpc {
namespace {
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
class TagSaver final : public CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
// Constantly watches channel connectivity status to reconnect a transiently
// disconnected channel. This is a temporary work-around before we have retry
// support.
class ChannelConnectivityWatcher : private GrpcLibraryCodegen {
public:
static void StartWatching(grpc_channel* channel) {
if (!IsDisabled()) {
std::unique_lock<std::mutex> lock(g_watcher_mu_);
if (g_watcher_ == nullptr) {
g_watcher_ = new ChannelConnectivityWatcher();
}
g_watcher_->StartWatchingLocked(channel);
}
}
static void StopWatching() {
if (!IsDisabled()) {
std::unique_lock<std::mutex> lock(g_watcher_mu_);
if (g_watcher_->StopWatchingLocked()) {
delete g_watcher_;
g_watcher_ = nullptr;
}
}
}
private:
ChannelConnectivityWatcher() : channel_count_(0), shutdown_(false) {
gpr_ref_init(&ref_, 0);
gpr_thd_options options = gpr_thd_options_default();
gpr_thd_options_set_joinable(&options);
gpr_thd_new(&thd_id_, &WatchStateChange, this, &options);
}
static bool IsDisabled() {
char* env = gpr_getenv("GRPC_DISABLE_CHANNEL_CONNECTIVITY_WATCHER");
bool disabled = gpr_is_true(env);
gpr_free(env);
return disabled;
}
void WatchStateChangeImpl() {
bool ok = false;
void* tag = NULL;
CompletionQueue::NextStatus status = CompletionQueue::GOT_EVENT;
while (true) {
{
std::unique_lock<std::mutex> lock(shutdown_mu_);
if (shutdown_) {
// Drain cq_ if the watcher is shutting down
status = cq_.AsyncNext(&tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME));
} else {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
// Make sure we've seen 2 TIMEOUTs before going to sleep
if (status == CompletionQueue::TIMEOUT) {
status = cq_.AsyncNext(&tag, &ok, gpr_inf_past(GPR_CLOCK_REALTIME));
if (status == CompletionQueue::TIMEOUT) {
shutdown_cv_.wait_for(lock, std::chrono::milliseconds(
kConnectivityCheckIntervalMsec));
continue;
}
}
}
}
ChannelState* channel_state = static_cast<ChannelState*>(tag);
channel_state->state =
grpc_channel_check_connectivity_state(channel_state->channel, false);
if (channel_state->state == GRPC_CHANNEL_SHUTDOWN) {
void* shutdown_tag = NULL;
channel_state->shutdown_cq.Next(&shutdown_tag, &ok);
delete channel_state;
if (gpr_unref(&ref_)) {
break;
}
} else {
TagSaver* tag_saver = new TagSaver(channel_state);
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(), tag_saver);
}
}
}
void StartWatchingLocked(grpc_channel* channel) {
if (thd_id_ != 0) {
gpr_ref(&ref_);
++channel_count_;
ChannelState* channel_state = new ChannelState(channel);
// The first grpc_channel_watch_connectivity_state() is not used to
// monitor the channel state change, but to hold a reference of the
// c channel. So that WatchStateChangeImpl() can observe state ==
// GRPC_CHANNEL_SHUTDOWN before the channel gets destroyed.
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), channel_state->shutdown_cq.cq(),
new TagSaver(nullptr));
grpc_channel_watch_connectivity_state(
channel_state->channel, channel_state->state,
gpr_inf_future(GPR_CLOCK_REALTIME), cq_.cq(),
new TagSaver(channel_state));
}
}
bool StopWatchingLocked() {
if (--channel_count_ == 0) {
{
std::unique_lock<std::mutex> lock(shutdown_mu_);
shutdown_ = true;
shutdown_cv_.notify_one();
}
gpr_thd_join(thd_id_);
return true;
}
return false;
}
friend void WatchStateChange(void* arg);
struct ChannelState {
explicit ChannelState(grpc_channel* channel)
: channel(channel), state(GRPC_CHANNEL_IDLE){};
grpc_channel* channel;
grpc_connectivity_state state;
CompletionQueue shutdown_cq;
};
gpr_thd_id thd_id_;
CompletionQueue cq_;
gpr_refcount ref_;
int channel_count_;
std::mutex shutdown_mu_;
std::condition_variable shutdown_cv_; // protected by shutdown_mu_
bool shutdown_; // protected by shutdown_mu_
static std::mutex g_watcher_mu_;
static ChannelConnectivityWatcher* g_watcher_; // protected by g_watcher_mu_
};
std::mutex ChannelConnectivityWatcher::g_watcher_mu_;
ChannelConnectivityWatcher* ChannelConnectivityWatcher::g_watcher_ = nullptr;
void WatchStateChange(void* arg) {
ChannelConnectivityWatcher* watcher =
static_cast<ChannelConnectivityWatcher*>(arg);
watcher->WatchStateChangeImpl();
}
} // namespace
static internal::GrpcLibraryInitializer g_gli_initializer;
Channel::Channel(const grpc::string& host, grpc_channel* channel)
: host_(host), c_channel_(channel) {
g_gli_initializer.summon();
if (grpc_channel_support_connectivity_watcher(channel)) {
ChannelConnectivityWatcher::StartWatching(channel);
}
}
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Channel::~Channel() {
const bool stop_watching =
grpc_channel_support_connectivity_watcher(c_channel_);
grpc_channel_destroy(c_channel_);
if (stop_watching) {
ChannelConnectivityWatcher::StopWatching();
}
}
namespace {
@ -130,23 +313,6 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
namespace {
class TagSaver final : public CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
bool FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
} // namespace
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {

@ -93,6 +93,7 @@ grpc_alarm_destroy_type grpc_alarm_destroy_import;
grpc_channel_check_connectivity_state_type grpc_channel_check_connectivity_state_import;
grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_external_connectivity_watchers_import;
grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import;
grpc_channel_create_call_type grpc_channel_create_call_import;
grpc_channel_ping_type grpc_channel_ping_import;
grpc_channel_register_call_type grpc_channel_register_call_import;
@ -399,6 +400,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_channel_check_connectivity_state_import = (grpc_channel_check_connectivity_state_type) GetProcAddress(library, "grpc_channel_check_connectivity_state");
grpc_channel_num_external_connectivity_watchers_import = (grpc_channel_num_external_connectivity_watchers_type) GetProcAddress(library, "grpc_channel_num_external_connectivity_watchers");
grpc_channel_watch_connectivity_state_import = (grpc_channel_watch_connectivity_state_type) GetProcAddress(library, "grpc_channel_watch_connectivity_state");
grpc_channel_support_connectivity_watcher_import = (grpc_channel_support_connectivity_watcher_type) GetProcAddress(library, "grpc_channel_support_connectivity_watcher");
grpc_channel_create_call_import = (grpc_channel_create_call_type) GetProcAddress(library, "grpc_channel_create_call");
grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping");
grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call");

@ -260,6 +260,9 @@ extern grpc_channel_num_external_connectivity_watchers_type grpc_channel_num_ext
typedef void(*grpc_channel_watch_connectivity_state_type)(grpc_channel *channel, grpc_connectivity_state last_observed_state, gpr_timespec deadline, grpc_completion_queue *cq, void *tag);
extern grpc_channel_watch_connectivity_state_type grpc_channel_watch_connectivity_state_import;
#define grpc_channel_watch_connectivity_state grpc_channel_watch_connectivity_state_import
typedef int(*grpc_channel_support_connectivity_watcher_type)(grpc_channel *channel);
extern grpc_channel_support_connectivity_watcher_type grpc_channel_support_connectivity_watcher_import;
#define grpc_channel_support_connectivity_watcher grpc_channel_support_connectivity_watcher_import
typedef grpc_call *(*grpc_channel_create_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, grpc_slice method, const grpc_slice *host, gpr_timespec deadline, void *reserved);
extern grpc_channel_create_call_type grpc_channel_create_call_import;
#define grpc_channel_create_call grpc_channel_create_call_import

@ -279,6 +279,21 @@ static void test_memrchr(void) {
GPR_ASSERT(0 == strcmp((const char *)gpr_memrchr("hello", 'l', 5), "lo"));
}
static void test_is_true(void) {
LOG_TEST_NAME("test_is_true");
GPR_ASSERT(true == gpr_is_true("True"));
GPR_ASSERT(true == gpr_is_true("true"));
GPR_ASSERT(true == gpr_is_true("TRUE"));
GPR_ASSERT(true == gpr_is_true("Yes"));
GPR_ASSERT(true == gpr_is_true("yes"));
GPR_ASSERT(true == gpr_is_true("YES"));
GPR_ASSERT(true == gpr_is_true("1"));
GPR_ASSERT(false == gpr_is_true(NULL));
GPR_ASSERT(false == gpr_is_true(""));
GPR_ASSERT(false == gpr_is_true("0"));
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_strdup();
@ -292,5 +307,6 @@ int main(int argc, char **argv) {
test_leftpad();
test_stricmp();
test_memrchr();
test_is_true();
return 0;
}

@ -260,11 +260,31 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
server_address_ << "localhost:" << port_;
// Setup server
BuildAndStartServer();
gpr_tls_set(&g_is_async_end2end_test, 1);
}
void TearDown() override {
gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
cq_->Shutdown();
while (cq_->Next(&ignored_tag, &ignored_ok))
;
stub_.reset();
poll_overrider_.reset();
grpc_recycle_unused_port(port_);
}
void BuildAndStartServer() {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
service_.reset(new grpc::testing::EchoTestService::AsyncService());
builder.RegisterService(service_.get());
if (GetParam().health_check_service) {
builder.RegisterService(&health_check_);
}
@ -276,20 +296,6 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
new ServerBuilderSyncPluginDisabler());
builder.SetOption(move(sync_plugin_disabler));
server_ = builder.BuildAndStart();
gpr_tls_set(&g_is_async_end2end_test, 1);
}
void TearDown() override {
gpr_tls_set(&g_is_async_end2end_test, 0);
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
cq_->Shutdown();
while (cq_->Next(&ignored_tag, &ignored_ok))
;
poll_overrider_.reset();
grpc_recycle_unused_port(port_);
}
void ResetStub() {
@ -319,8 +325,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@ -341,7 +347,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@ -359,6 +365,26 @@ TEST_P(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
TEST_P(AsyncEnd2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
ResetStub();
SendRpc(1);
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
cq_->Shutdown();
while (cq_->Next(&ignored_tag, &ignored_ok))
;
BuildAndStartServer();
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
SendRpc(1);
}
// We do not need to protect notify because the use is synchronized.
void ServerWait(Server* server, int* notify) {
server->Wait();
@ -409,8 +435,8 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@ -446,8 +472,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam().disable_blocking)
.Expect(2, true)
@ -508,8 +534,8 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
cli_stream->Write(send_request, tag(3));
@ -581,8 +607,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@ -637,8 +663,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@ -689,8 +715,8 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@ -743,8 +769,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam().disable_blocking)
.Expect(1, true)
@ -803,8 +829,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@ -871,8 +897,8 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
@ -948,8 +974,8 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@ -993,8 +1019,8 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
@ -1043,8 +1069,8 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
@ -1106,8 +1132,8 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
@ -1170,8 +1196,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@ -1205,8 +1231,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
srv_ctx.AsyncNotifyWhenDone(tag(5));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@ -1297,8 +1323,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
@ -1428,8 +1454,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
@ -1564,8 +1590,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message

@ -238,6 +238,18 @@ class End2endTest : public ::testing::TestWithParam<TestScenario> {
int port = grpc_pick_unused_port_or_die();
server_address_ << "127.0.0.1:" << port;
// Setup server
BuildAndStartServer(processor);
}
void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
if (is_server_started_) {
server_->Shutdown();
BuildAndStartServer(processor);
}
}
void BuildAndStartServer(
const std::shared_ptr<AuthMetadataProcessor>& processor) {
ServerBuilder builder;
ConfigureServerBuilder(&builder);
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
@ -685,6 +697,20 @@ TEST_P(End2endTest, MultipleRpcs) {
}
}
TEST_P(End2endTest, ReconnectChannel) {
if (GetParam().inproc) {
return;
}
ResetStub();
SendRpc(stub_.get(), 1, false);
RestartServer(std::shared_ptr<AuthMetadataProcessor>());
// It needs more than kConnectivityCheckIntervalMsec time to reconnect the
// channel.
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(1600, GPR_TIMESPAN)));
SendRpc(stub_.get(), 1, false);
}
TEST_P(End2endTest, RequestStreamOneRequest) {
ResetStub();
EchoRequest request;

Loading…
Cancel
Save