clean up and add a test

pull/2811/head
yang-g 10 years ago
parent a73dc1c708
commit 36f5965247
  1. 61
      src/cpp/client/channel.cc
  2. 24
      test/cpp/end2end/end2end_test.cc

@ -98,44 +98,69 @@ grpc_connectivity_state Channel::GetState(bool try_to_connect) {
return grpc_channel_check_connectivity_state(c_channel_, try_to_connect);
}
void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
cq->cq(), tag);
namespace {
class TagSaver GRPC_FINAL : public CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() GRPC_OVERRIDE {}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
*tag = tag_;
delete this;
return true;
}
private:
void* tag_;
};
template <typename T>
void NotifyOnStateChangeShared(grpc_channel* channel,
grpc_connectivity_state last_observed,
const T& deadline,
CompletionQueue* cq, void* tag) {
TimePoint<T> deadline_tp(deadline);
TagSaver* tag_saver = new TagSaver(tag);
grpc_channel_watch_connectivity_state(
channel, last_observed, deadline_tp.raw_time(), cq->cq(), tag_saver);
}
bool Channel::WaitForStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline) {
template <typename T>
bool WaitForStateChangeShared(grpc_channel* channel,
grpc_connectivity_state last_observed,
const T& deadline) {
CompletionQueue cq;
bool ok = false;
void* tag = NULL;
NotifyOnStateChange(last_observed, deadline, &cq, NULL);
NotifyOnStateChangeShared(channel, last_observed, deadline, &cq, NULL);
cq.Next(&tag, &ok);
GPR_ASSERT(tag == NULL);
return ok;
}
} // namespace
void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
}
bool Channel::WaitForStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline) {
return WaitForStateChangeShared(c_channel_, last_observed, deadline);
}
#ifndef GRPC_CXX0X_NO_CHRONO
void Channel::NotifyOnStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline,
CompletionQueue* cq, void* tag) {
TimePoint<std::chrono::system_clock::time_point> deadline_tp(deadline);
grpc_channel_watch_connectivity_state(c_channel_, last_observed,
deadline_tp.raw_time(), cq->cq(), tag);
NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
}
bool Channel::WaitForStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline) {
CompletionQueue cq;
bool ok = false;
void* tag = NULL;
NotifyOnStateChange(last_observed, deadline, &cq, NULL);
cq.Next(&tag, &ok);
GPR_ASSERT(tag == NULL);
return ok;
return WaitForStateChangeShared(c_channel_, last_observed, deadline);
}
#endif // !GRPC_CXX0X_NO_CHRONO
} // namespace grpc

@ -831,7 +831,8 @@ TEST_F(End2endTest, HugeResponse) {
}
namespace {
void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, gpr_event *ev) {
void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
gpr_event *ev) {
EchoResponse resp;
gpr_event_set(ev, (void*)1);
while (stream->Read(&resp)) {
@ -870,6 +871,27 @@ TEST_F(End2endTest, Peer) {
EXPECT_TRUE(CheckIsLocalhost(context.peer()));
}
TEST_F(End2endTest, ChannelState) {
ResetStub();
// Start IDLE
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
CompletionQueue cq;
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(10);
// No state change.
channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, NULL);
void* tag;
bool ok = true;
cq.Next(&tag, &ok);
EXPECT_FALSE(ok);
EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
EXPECT_TRUE(channel_->WaitForStateChange(
GRPC_CHANNEL_IDLE, gpr_inf_future(GPR_CLOCK_REALTIME)));
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
}
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save