pull/18551/head
Nicolas "Pixel" Noble 6 years ago
parent 12b0db3e57
commit 5847c3a87a
  1. 4
      include/grpcpp/channel_impl.h
  2. 3
      include/grpcpp/impl/codegen/channel_interface.h
  3. 1
      include/grpcpp/impl/codegen/client_context.h
  4. 2
      include/grpcpp/impl/codegen/completion_queue.h
  5. 8
      include/grpcpp/impl/codegen/completion_queue_impl.h
  6. 7
      include/grpcpp/impl/codegen/intercepted_channel.h
  7. 42
      include/grpcpp/impl/codegen/server_interface.h
  8. 1
      include/grpcpp/server_builder.h
  9. 8
      src/cpp/common/completion_queue_cc.cc

@ -84,8 +84,8 @@ class Channel final : public ::grpc::ChannelInterface,
void* RegisterMethod(const char* method) override;
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
CompletionQueue* cq, void* tag) override;
gpr_timespec deadline, CompletionQueue* cq,
void* tag) override;
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) override;

@ -134,7 +134,8 @@ class ChannelInterface {
virtual void* RegisterMethod(const char* method) = 0;
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
::grpc_impl::CompletionQueue* cq, void* tag) = 0;
::grpc_impl::CompletionQueue* cq,
void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;

@ -59,7 +59,6 @@ struct grpc_call;
namespace grpc_impl {
class Channel;
}
namespace grpc {

@ -39,6 +39,6 @@ namespace grpc {
typedef ::grpc_impl::CompletionQueue CompletionQueue;
typedef ::grpc_impl::ServerCompletionQueue ServerCompletionQueue;
}
} // namespace grpc
#endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H

@ -330,7 +330,8 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen {
/// timeout. i.e:
/// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME))
void TryPluck(::grpc::internal::CompletionQueueTag* tag) {
auto deadline = ::grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
auto deadline =
::grpc::g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = ::grpc::g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return;
@ -345,7 +346,8 @@ class CompletionQueue : private ::grpc::GrpcLibraryCodegen {
///
/// This exects tag->FinalizeResult (if called) to return 'false' i.e expects
/// that the tag is internal not something that is returned to the user.
void TryPluck(::grpc::internal::CompletionQueueTag* tag, gpr_timespec deadline) {
void TryPluck(::grpc::internal::CompletionQueueTag* tag,
gpr_timespec deadline) {
auto ev = ::grpc::g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
@ -412,6 +414,6 @@ class ServerCompletionQueue : public CompletionQueue {
friend class ::grpc::Server;
};
} // namespace grpc
} // namespace grpc_impl
#endif // GRPCPP_IMPL_CODEGEN_COMPLETION_QUEUE_H

@ -62,7 +62,8 @@ class InterceptedChannel : public ChannelInterface {
}
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, ::grpc_impl::CompletionQueue* cq,
gpr_timespec deadline,
::grpc_impl::CompletionQueue* cq,
void* tag) override {
return channel_->NotifyOnStateChangeImpl(last_observed, deadline, cq, tag);
}
@ -71,7 +72,9 @@ class InterceptedChannel : public ChannelInterface {
return channel_->WaitForStateChangeImpl(last_observed, deadline);
}
::grpc_impl::CompletionQueue* CallbackCQ() override { return channel_->CallbackCQ(); }
::grpc_impl::CompletionQueue* CallbackCQ() override {
return channel_->CallbackCQ();
}
ChannelInterface* channel_;
size_t interceptor_pos_;

@ -32,7 +32,7 @@ namespace grpc_impl {
class Channel;
class CompletionQueue;
class ServerCompletionQueue;
}
} // namespace grpc_impl
namespace grpc {
@ -139,7 +139,8 @@ class ServerInterface : public internal::CallHook {
/// caller is required to keep all completion queues live until the server is
/// destroyed.
/// \param num_cqs How many completion queues does \a cqs hold.
virtual void Start(::grpc_impl::ServerCompletionQueue** cqs, size_t num_cqs) = 0;
virtual void Start(::grpc_impl::ServerCompletionQueue** cqs,
size_t num_cqs) = 0;
virtual void ShutdownInternal(gpr_timespec deadline) = 0;
@ -155,8 +156,8 @@ class ServerInterface : public internal::CallHook {
BaseAsyncRequest(ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag, bool delete_on_finalize);
virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) override;
@ -184,8 +185,9 @@ class ServerInterface : public internal::CallHook {
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag,
const char* name, internal::RpcMethod::RpcType type);
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag, const char* name,
internal::RpcMethod::RpcType type);
virtual bool FinalizeResult(void** tag, bool* status) override {
/* If we are done intercepting, then there is nothing more for us to do */
@ -212,7 +214,8 @@ class ServerInterface : public internal::CallHook {
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag)
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag)
: RegisteredAsyncRequest(
server, context, stream, call_cq, notification_cq, tag,
registered_method->name(), registered_method->method_type()) {
@ -229,8 +232,8 @@ class ServerInterface : public internal::CallHook {
ServerInterface* server, ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag,
Message* request)
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag, Message* request)
: RegisteredAsyncRequest(
server, context, stream, call_cq, notification_cq, tag,
registered_method->name(), registered_method->method_type()),
@ -298,8 +301,8 @@ class ServerInterface : public internal::CallHook {
GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag, bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) override;
@ -312,8 +315,8 @@ class ServerInterface : public internal::CallHook {
ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag, Message* message) {
GPR_CODEGEN_ASSERT(method);
new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
notification_cq, tag, message);
@ -323,17 +326,18 @@ class ServerInterface : public internal::CallHook {
ServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) {
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag) {
GPR_CODEGEN_ASSERT(method);
new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq,
void* tag) {
void RequestAsyncGenericCall(
GenericServerContext* context,
internal::ServerAsyncStreamingInterface* stream,
::grpc_impl::CompletionQueue* call_cq,
::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}

@ -38,7 +38,6 @@ struct grpc_resource_quota;
namespace grpc_impl {
class ServerCompletionQueue;
}
namespace grpc {

@ -52,7 +52,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
auto core_cq_tag = static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
auto core_cq_tag =
static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = core_cq_tag;
if (core_cq_tag->FinalizeResult(tag, ok)) {
@ -79,7 +80,8 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
flushed_ = true;
if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
&res)) {
auto core_cq_tag = static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
auto core_cq_tag =
static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
*ok = res == 1;
if (core_cq_tag->FinalizeResult(tag, ok)) {
return true;
@ -88,4 +90,4 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
return false;
}
} // namespace grpc
} // namespace grpc_impl

Loading…
Cancel
Save