Resolve comments

pull/2811/head
yang-g 10 years ago
parent 8708dd76c1
commit c8abca8f53
  1. 40
      include/grpc++/channel_interface.h
  2. 86
      src/cpp/client/channel.cc
  3. 30
      src/cpp/client/channel.h
  4. 2
      test/cpp/end2end/end2end_test.cc

@ -48,7 +48,6 @@ class CallOpBuffer;
class ClientContext;
class CompletionQueue;
class RpcMethod;
class CallInterface;
class ChannelInterface : public CallHook,
public std::enable_shared_from_this<ChannelInterface> {
@ -65,32 +64,27 @@ class ChannelInterface : public CallHook,
// Return the tag on cq when the channel state is changed or deadline expires.
// GetState needs to called to get the current state.
virtual void NotifyOnStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) = 0;
template <typename T>
void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline,
CompletionQueue* cq, void* tag) {
TimePoint<T> deadline_tp(deadline);
NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag);
}
// Blocking wait for channel state change or deadline expiration.
// GetState needs to called to get the current state.
virtual bool WaitForStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
// Blocking wait for target state or deadline expriration.
virtual bool WaitForState(grpc_connectivity_state target_state,
gpr_timespec deadline) = 0;
#ifndef GRPC_CXX0X_NO_CHRONO
virtual void NotifyOnStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline,
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline) = 0;
virtual bool WaitForState(
grpc_connectivity_state target_state,
const std::chrono::system_clock::time_point& deadline) = 0;
#endif // !GRPC_CXX0X_NO_CHRONO
template <typename T>
bool WaitForStateChange(grpc_connectivity_state last_observed, T deadline) {
TimePoint<T> deadline_tp(deadline);
return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time());
}
private:
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
};
} // namespace grpc

@ -112,91 +112,25 @@ class TagSaver GRPC_FINAL : public CompletionQueueTag {
void* tag_;
};
template <typename T>
void NotifyOnStateChangeShared(grpc_channel* channel,
grpc_connectivity_state last_observed,
const T& deadline,
CompletionQueue* cq, void* tag) {
TimePoint<T> deadline_tp(deadline);
} // namespace
void Channel::NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
TagSaver* tag_saver = new TagSaver(tag);
grpc_channel_watch_connectivity_state(
channel, last_observed, deadline_tp.raw_time(), cq->cq(), tag_saver);
grpc_channel_watch_connectivity_state(c_channel_, last_observed, deadline,
cq->cq(), tag_saver);
}
template <typename T>
bool WaitForStateChangeShared(grpc_channel* channel,
grpc_connectivity_state last_observed,
const T& deadline) {
bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) {
CompletionQueue cq;
bool ok = false;
void* tag = NULL;
NotifyOnStateChangeShared(channel, last_observed, deadline, &cq, NULL);
NotifyOnStateChangeImpl(last_observed, deadline, &cq, NULL);
cq.Next(&tag, &ok);
GPR_ASSERT(tag == NULL);
return ok;
}
template <typename T>
bool WaitForStateShared(grpc_channel* channel,
grpc_connectivity_state target_state,
const T& deadline) {
grpc_connectivity_state current_state =
grpc_channel_check_connectivity_state(channel, 0);
if (current_state == target_state) {
return true;
}
TimePoint<T> deadline_tp(deadline);
CompletionQueue cq;
bool ok = false;
void* tag = NULL;
while (current_state != target_state) {
NotifyOnStateChangeShared(channel, current_state, deadline_tp.raw_time(),
&cq, NULL);
cq.Next(&tag, &ok);
if (!ok) {
return false;
}
current_state = grpc_channel_check_connectivity_state(channel, 0);
}
return true;
}
} // namespace
void Channel::NotifyOnStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) {
NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
}
bool Channel::WaitForStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline) {
return WaitForStateChangeShared(c_channel_, last_observed, deadline);
}
bool Channel::WaitForState(grpc_connectivity_state target_state,
gpr_timespec deadline) {
return WaitForStateShared(c_channel_, target_state, deadline);
}
#ifndef GRPC_CXX0X_NO_CHRONO
void Channel::NotifyOnStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline,
CompletionQueue* cq, void* tag) {
NotifyOnStateChangeShared(c_channel_, last_observed, deadline, cq, tag);
}
bool Channel::WaitForStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline) {
return WaitForStateChangeShared(c_channel_, last_observed, deadline);
}
bool Channel::WaitForState(
grpc_connectivity_state target_state,
const std::chrono::system_clock::time_point& deadline) {
return WaitForStateShared(c_channel_, target_state, deadline);
}
#endif // !GRPC_CXX0X_NO_CHRONO
} // namespace grpc

@ -64,32 +64,14 @@ class Channel GRPC_FINAL : public GrpcLibrary, public ChannelInterface {
grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
void NotifyOnStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
bool WaitForStateChange(grpc_connectivity_state last_observed,
gpr_timespec deadline) GRPC_OVERRIDE;
bool WaitForState(grpc_connectivity_state target_state,
gpr_timespec deadline) GRPC_OVERRIDE;
#ifndef GRPC_CXX0X_NO_CHRONO
void NotifyOnStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline,
CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
bool WaitForStateChange(
grpc_connectivity_state last_observed,
const std::chrono::system_clock::time_point& deadline) GRPC_OVERRIDE;
private:
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, CompletionQueue* cq,
void* tag) GRPC_OVERRIDE;
bool WaitForState(grpc_connectivity_state target_state,
const std::chrono::system_clock::time_point& deadline)
GRPC_OVERRIDE;
#endif // !GRPC_CXX0X_NO_CHRONO
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) GRPC_OVERRIDE;
private:
const grpc::string host_;
grpc_channel* const c_channel_; // owned
};

@ -890,8 +890,6 @@ TEST_F(End2endTest, ChannelState) {
EXPECT_TRUE(channel_->WaitForStateChange(
GRPC_CHANNEL_IDLE, gpr_inf_future(GPR_CLOCK_REALTIME)));
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
EXPECT_TRUE(channel_->WaitForState(GRPC_CHANNEL_READY,
gpr_inf_future(GPR_CLOCK_REALTIME)));
}
} // namespace testing

Loading…
Cancel
Save