Merge branch 'proto_interfaces' into yang-g-sync_async_mix

pull/4775/head
David Garcia Quintas 9 years ago
commit a701ed7759
  1. 2
      build.yaml
  2. 72
      include/grpc++/channel.h
  3. 3
      include/grpc++/client_context.h
  4. 3
      include/grpc++/completion_queue.h
  5. 1
      include/grpc++/generic/async_generic_service.h
  6. 4
      include/grpc++/generic/generic_stub.h
  7. 3
      include/grpc++/impl/client_unary_call.h
  8. 122
      include/grpc++/impl/codegen/channel_interface.h
  9. 253
      include/grpc++/impl/codegen/server_interface.h
  10. 4
      include/grpc++/impl/rpc_method.h
  11. 9
      include/grpc++/impl/service_type.h
  12. 166
      include/grpc++/server.h
  13. 2
      include/grpc++/server_context.h
  14. 8
      include/grpc++/support/async_stream.h
  15. 4
      include/grpc++/support/async_unary_call.h
  16. 7
      include/grpc++/support/sync_stream.h
  17. 13
      src/compiler/cpp_generator.cc
  18. 26
      src/cpp/server/server.cc
  19. 1
      test/cpp/util/metrics_server.cc

@ -50,6 +50,8 @@ filegroups:
- include/grpc++/security/auth_metadata_processor.h
- include/grpc++/security/credentials.h
- include/grpc++/security/server_credentials.h
- include/grpc++/impl/codegen/server_interface.h
include/grpc++/impl/codegen/channel_interface.h
- include/grpc++/server.h
- include/grpc++/server_builder.h
- include/grpc++/server_context.h

@ -38,35 +38,16 @@
#include <grpc/grpc.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/config.h>
struct grpc_channel;
namespace grpc {
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class ChannelCredentials;
class SecureChannelCredentials;
template <class R>
class ClientReader;
template <class W>
class ClientWriter;
template <class W, class R>
class ClientReaderWriter;
template <class R>
class ClientAsyncReader;
template <class W>
class ClientAsyncWriter;
template <class W, class R>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel GRPC_FINAL : public GrpcLibrary,
class Channel GRPC_FINAL : public ChannelInterface,
public GrpcLibrary,
public CallHook,
public std::enable_shared_from_this<Channel> {
public:
@ -74,61 +55,28 @@ class Channel GRPC_FINAL : public GrpcLibrary,
/// Get the current channel state. If the channel is in IDLE and
/// \a try_to_connect is set to true, try to connect.
grpc_connectivity_state GetState(bool try_to_connect);
/// Return the \a tag on \a cq when the channel state is changed or \a
/// deadline expires. \a GetState needs to called to get the current state.
template <typename T>
void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline,
CompletionQueue* cq, void* tag) {
TimePoint<T> deadline_tp(deadline);
NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag);
}
/// Blocking wait for channel state change or \a deadline expiration.
/// \a GetState needs to called to get the current state.
template <typename T>
bool WaitForStateChange(grpc_connectivity_state last_observed, T deadline) {
TimePoint<T> deadline_tp(deadline);
return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time());
}
grpc_connectivity_state GetState(bool try_to_connect) GRPC_OVERRIDE;
private:
template <class R>
friend class ::grpc::ClientReader;
template <class W>
friend class ::grpc::ClientWriter;
template <class W, class R>
friend class ::grpc::ClientReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncReader;
template <class W>
friend class ::grpc::ClientAsyncWriter;
template <class W, class R>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
friend class ::grpc::RpcMethod;
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);
Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq);
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call);
void* RegisterMethod(const char* method);
CompletionQueue* cq) GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void* RegisterMethod(const char* method) GRPC_OVERRIDE;
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, CompletionQueue* cq,
void* tag);
void* tag) GRPC_OVERRIDE;
bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline);
gpr_timespec deadline) GRPC_OVERRIDE;
const grpc::string host_;
grpc_channel* const c_channel_; // owned

@ -69,6 +69,7 @@ struct census_context;
namespace grpc {
class Channel;
class ChannelInterface;
class CompletionQueue;
class CallCredentials;
class RpcMethod;
@ -315,7 +316,7 @@ class ClientContext {
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);

@ -68,6 +68,7 @@ class BidiStreamingHandler;
class UnknownMethodHandler;
class Channel;
class ChannelInterface;
class ClientContext;
class CompletionQueueTag;
class CompletionQueue;
@ -171,7 +172,7 @@ class CompletionQueue : public GrpcLibrary {
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);

@ -51,6 +51,7 @@ class GenericServerContext GRPC_FINAL : public ServerContext {
private:
friend class Server;
friend class ServerInterface;
grpc::string method_;
grpc::string host_;

@ -47,7 +47,7 @@ typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer>
// by name.
class GenericStub GRPC_FINAL {
public:
explicit GenericStub(std::shared_ptr<Channel> channel) : channel_(channel) {}
explicit GenericStub(std::shared_ptr<ChannelInterface> channel) : channel_(channel) {}
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> Call(
@ -55,7 +55,7 @@ class GenericStub GRPC_FINAL {
void* tag);
private:
std::shared_ptr<Channel> channel_;
std::shared_ptr<ChannelInterface> channel_;
};
} // namespace grpc

@ -35,6 +35,7 @@
#define GRPCXX_IMPL_CLIENT_UNARY_CALL_H
#include <grpc++/impl/call.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
@ -47,7 +48,7 @@ class RpcMethod;
// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(Channel* channel, const RpcMethod& method,
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
CompletionQueue cq;

@ -0,0 +1,122 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_CHANNEL_INTERFACE_H
#define GRPCXX_CHANNEL_INTERFACE_H
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
namespace grpc {
class Call;
class ClientContext;
class RpcMethod;
class CallOpSetInterface;
class CompletionQueue;
template <class R>
class ClientReader;
template <class W>
class ClientWriter;
template <class W, class R>
class ClientReaderWriter;
template <class R>
class ClientAsyncReader;
template <class W>
class ClientAsyncWriter;
template <class W, class R>
class ClientAsyncReaderWriter;
template <class R>
class ClientAsyncResponseReader;
/// Codegen interface for \a grpc::Channel.
class ChannelInterface {
public:
virtual ~ChannelInterface() {}
/// Get the current channel state. If the channel is in IDLE and
/// \a try_to_connect is set to true, try to connect.
virtual grpc_connectivity_state GetState(bool try_to_connect) = 0;
/// Return the \a tag on \a cq when the channel state is changed or \a
/// deadline expires. \a GetState needs to called to get the current state.
template <typename T>
void NotifyOnStateChange(grpc_connectivity_state last_observed, T deadline,
CompletionQueue* cq, void* tag) {
TimePoint<T> deadline_tp(deadline);
NotifyOnStateChangeImpl(last_observed, deadline_tp.raw_time(), cq, tag);
}
/// Blocking wait for channel state change or \a deadline expiration.
/// \a GetState needs to called to get the current state.
template <typename T>
bool WaitForStateChange(grpc_connectivity_state last_observed, T deadline) {
TimePoint<T> deadline_tp(deadline);
return WaitForStateChangeImpl(last_observed, deadline_tp.raw_time());
}
private:
template <class R>
friend class ::grpc::ClientReader;
template <class W>
friend class ::grpc::ClientWriter;
template <class W, class R>
friend class ::grpc::ClientReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncReader;
template <class W>
friend class ::grpc::ClientAsyncWriter;
template <class W, class R>
friend class ::grpc::ClientAsyncReaderWriter;
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
friend Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context,
const InputMessage& request,
OutputMessage* result);
friend class ::grpc::RpcMethod;
virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) = 0;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
virtual void* RegisterMethod(const char* method) = 0;
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline, CompletionQueue* cq,
void* tag) = 0;
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
};
} // namespace grpc
#endif // GRPCXX_CHANNEL_INTERFACE_H

@ -0,0 +1,253 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_SERVER_INTERFACE_H
#define GRPCXX_SERVER_INTERFACE_H
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/rpc_service_method.h>
namespace grpc {
class AsyncGenericService;
class AsynchronousService;
class GenericServerContext;
class RpcService;
class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerContext;
class ServerCredentials;
class Service;
class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
class ServerInterface : public CallHook {
public:
virtual ~ServerInterface() {}
/// Shutdown the server, blocking until all rpc processing finishes.
/// Forcefully terminate pending calls after \a deadline expires.
///
/// \param deadline How long to wait until pending rpcs are forcefully
/// terminated.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
/// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
/// Block waiting for all work to complete.
///
/// \warning The server must be either shutting down or some other thread must
/// call \a Shutdown for this function to ever return.
virtual void Wait() = 0;
protected:
friend class AsynchronousService;
friend class Service;
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
virtual bool RegisterService(const grpc::string* host, Service* service) = 0;
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
///
/// \param addr The address to try to bind to the server (eg, localhost:1234,
/// 192.168.1.1:31416, [::1]:27182, etc.).
/// \params creds The credentials associated with the server.
///
/// \return bound port number on sucess, 0 on failure.
///
/// \warning It's an error to call this method on an already started server.
virtual int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) = 0;
/// Start the server.
///
/// \param cqs Completion queues for handling asynchronous services. The
/// caller is required to keep all completion queues live until the server is
/// destroyed.
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
/// Process one or more incoming calls.
virtual void RunRpc() = 0;
/// Schedule \a RunRpc to run in the threadpool.
virtual void ScheduleCallback() = 0;
virtual void ShutdownInternal(gpr_timespec deadline) = 0;
virtual int max_message_size() const = 0;
virtual grpc_server* server() = 0;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest() {}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
protected:
ServerInterface* const server_;
ServerContext* const context_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
protected:
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
};
class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
// uses RegisteredAsyncRequest::FinalizeResult
};
template <class Message>
class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
PayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool serialization_status =
*status && payload_ &&
SerializationTraits<Message>::Deserialize(
payload_, request_, server_->max_message_size()).ok();
bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status&&* status;
return ret;
}
private:
grpc_byte_buffer* payload_;
Message* const request_;
};
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
private:
grpc_call_details call_details_;
};
template <class Message>
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
GPR_ASSERT(method);
new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
stream, call_cq, notification_cq, tag,
message);
}
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_ASSERT(method);
new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
call_cq, notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}
};
} // namespace grpc
#endif // GRPCXX_SERVER_INTERFACE_H

@ -36,7 +36,7 @@
#include <memory>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
namespace grpc {
@ -53,7 +53,7 @@ class RpcMethod {
: name_(name), method_type_(type), channel_tag_(NULL) {}
RpcMethod(const char* name, RpcType type,
const std::shared_ptr<Channel>& channel)
const std::shared_ptr<ChannelInterface>& channel)
: name_(name),
method_type_(type),
channel_tag_(channel->RegisterMethod(name)) {}

@ -36,7 +36,7 @@
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/impl/codegen/server_interface.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
@ -45,6 +45,7 @@ namespace grpc {
class Call;
class CompletionQueue;
class Server;
class ServerInterface;
class ServerCompletionQueue;
class ServerContext;
@ -55,7 +56,7 @@ class ServerAsyncStreamingInterface {
virtual void SendInitialMetadata(void* tag) = 0;
private:
friend class Server;
friend class ServerInterface;
virtual void BindCall(Call* call) = 0;
};
@ -146,8 +147,8 @@ class Service {
private:
friend class Server;
Server* server_;
friend class ServerInterface;
ServerInterface* server_;
std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
};

@ -42,6 +42,7 @@
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/sync.h>
#include <grpc++/impl/codegen/server_interface.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
@ -56,34 +57,21 @@ class GenericServerContext;
class AsyncGenericService;
class ServerAsyncStreamingInterface;
class ServerContext;
class Service;
class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
class Server GRPC_FINAL : public ServerInterface,
public GrpcLibrary {
public:
~Server();
/// Shutdown the server, blocking until all rpc processing finishes.
/// Forcefully terminate pending calls after \a deadline expires.
///
/// \param deadline How long to wait until pending rpcs are forcefully
/// terminated.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
/// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
/// Block waiting for all work to complete.
///
/// \warning The server must be either shutting down or some other thread must
/// call \a Shutdown for this function to ever return.
void Wait();
void Wait() GRPC_OVERRIDE;
/// Global Callbacks
///
@ -104,13 +92,16 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
private:
friend class AsyncGenericService;
friend class Service;
friend class ServerBuilder;
class SyncRequest;
class AsyncRequest;
class ShutdownRequest;
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
/// \param thread_pool The threadpool instance to use for call processing.
@ -122,11 +113,11 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
bool RegisterService(const grpc::string* host, Service* service);
bool RegisterService(const grpc::string* host, Service* service) GRPC_OVERRIDE;
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
void RegisterAsyncGenericService(AsyncGenericService* service);
void RegisterAsyncGenericService(AsyncGenericService* service) GRPC_OVERRIDE;
/// Tries to bind \a server to the given \a addr.
///
@ -139,7 +130,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// \return bound port number on sucess, 0 on failure.
///
/// \warning It's an error to call this method on an already started server.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) GRPC_OVERRIDE;
/// Start the server.
///
@ -149,144 +140,21 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
void HandleQueueClosed();
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) GRPC_OVERRIDE;
/// Process one or more incoming calls.
void RunRpc();
void RunRpc() GRPC_OVERRIDE;
/// Schedule \a RunRpc to run in the threadpool.
void ScheduleCallback();
void ScheduleCallback() GRPC_OVERRIDE;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline);
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
protected:
Server* const server_;
ServerContext* const context_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
protected:
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
};
class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
// uses RegisteredAsyncRequest::FinalizeResult
};
template <class Message>
class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
PayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool serialization_status =
*status && payload_ &&
SerializationTraits<Message>::Deserialize(
payload_, request_, server_->max_message_size_).ok();
bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status&&* status;
return ret;
}
private:
grpc_byte_buffer* payload_;
Message* const request_;
};
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
private:
grpc_call_details call_details_;
};
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
template <class Message>
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
GPR_ASSERT(method);
new PayloadAsyncRequest<Message>(method->server_tag(), this, context,
stream, call_cq, notification_cq, tag,
message);
}
void ShutdownInternal(gpr_timespec deadline) GRPC_OVERRIDE;
void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_ASSERT(method);
new NoPayloadAsyncRequest(method->server_tag(), this, context, stream,
call_cq, notification_cq, tag);
}
int max_message_size() const GRPC_OVERRIDE { return max_message_size_; };
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}
grpc_server* server() GRPC_OVERRIDE { return server_; };
const int max_message_size_;

@ -80,6 +80,7 @@ class Call;
class CallOpBuffer;
class CompletionQueue;
class Server;
class ServerInterface;
namespace testing {
class InteropServerContextInspector;
@ -138,6 +139,7 @@ class ServerContext {
private:
friend class ::grpc::testing::InteropServerContextInspector;
friend class ::grpc::ServerInterface;
friend class ::grpc::Server;
template <class W, class R>
friend class ::grpc::ServerAsyncReader;

@ -35,7 +35,7 @@
#define GRPCXX_SUPPORT_ASYNC_STREAM_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
@ -103,7 +103,7 @@ class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
public:
/// Create a stream and write the first request out.
template <class W>
ClientAsyncReader(Channel* channel, CompletionQueue* cq,
ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -166,7 +166,7 @@ template <class W>
class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
public:
template <class R>
ClientAsyncWriter(Channel* channel, CompletionQueue* cq,
ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
R* response, void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {
@ -234,7 +234,7 @@ template <class W, class R>
class ClientAsyncReaderWriter GRPC_FINAL
: public ClientAsyncReaderWriterInterface<W, R> {
public:
ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq,
ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
void* tag)
: context_(context), call_(channel->CreateCall(method, context, cq)) {

@ -35,7 +35,7 @@
#define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/server_context.h>
@ -58,7 +58,7 @@ class ClientAsyncResponseReader GRPC_FINAL
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
ClientAsyncResponseReader(Channel* channel, CompletionQueue* cq,
ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
const RpcMethod& method, ClientContext* context,
const W& request)
: context_(context), call_(channel->CreateCall(method, context, cq)) {

@ -36,6 +36,7 @@
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
@ -118,7 +119,7 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
public:
/// Blocking create a stream and write the first request out.
template <class W>
ClientReader(Channel* channel, const RpcMethod& method,
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
@ -182,7 +183,7 @@ class ClientWriter : public ClientWriterInterface<W> {
public:
/// Blocking create a stream.
template <class R>
ClientWriter(Channel* channel, const RpcMethod& method,
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
@ -248,7 +249,7 @@ template <class W, class R>
class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
public:
/// Blocking create a stream.
ClientReaderWriter(Channel* channel, const RpcMethod& method,
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context), call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;

@ -123,7 +123,6 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
"class Channel;\n"
"class RpcService;\n"
"class ServerCompletionQueue;\n"
"class ServerContext;\n"
@ -704,14 +703,14 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
"class Stub GRPC_FINAL : public StubInterface"
" {\n public:\n");
printer->Indent();
printer->Print("Stub(const std::shared_ptr< ::grpc::Channel>& channel);\n");
printer->Print("Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, true);
}
printer->Outdent();
printer->Print("\n private:\n");
printer->Indent();
printer->Print("std::shared_ptr< ::grpc::Channel> channel_;\n");
printer->Print("std::shared_ptr< ::grpc::ChannelInterface> channel_;\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i), vars, false);
}
@ -722,7 +721,7 @@ void PrintHeaderService(grpc::protobuf::io::Printer *printer,
printer->Print("};\n");
printer->Print(
"static std::unique_ptr<Stub> NewStub(const std::shared_ptr< "
"::grpc::Channel>& channel, "
"::grpc::ChannelInterface>& channel, "
"const ::grpc::StubOptions& options = ::grpc::StubOptions());\n");
printer->Print("\n");
@ -861,11 +860,11 @@ grpc::string GetSourceIncludes(const grpc::protobuf::FileDescriptor *file,
grpc::protobuf::io::Printer printer(&output_stream, '$');
std::map<grpc::string, grpc::string> vars;
printer.Print(vars, "#include <grpc++/channel.h>\n");
printer.Print(vars, "#include <grpc++/impl/client_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/impl/method_handler_impl.h>\n");
printer.Print(vars, "#include <grpc++/impl/rpc_service_method.h>\n");
printer.Print(vars, "#include <grpc++/impl/service_type.h>\n");
printer.Print(vars, "#include <grpc++/impl/codegen/channel_interface.h>\n");
printer.Print(vars, "#include <grpc++/support/async_unary_call.h>\n");
printer.Print(vars, "#include <grpc++/support/async_stream.h>\n");
printer.Print(vars, "#include <grpc++/support/sync_stream.h>\n");
@ -1064,7 +1063,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
printer->Print(*vars,
"std::unique_ptr< $ns$$Service$::Stub> $ns$$Service$::NewStub("
"const std::shared_ptr< ::grpc::Channel>& channel, "
"const std::shared_ptr< ::grpc::ChannelInterface>& channel, "
"const ::grpc::StubOptions& options) {\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
@ -1072,7 +1071,7 @@ void PrintSourceService(grpc::protobuf::io::Printer *printer,
"}\n\n");
printer->Print(*vars,
"$ns$$Service$::Stub::Stub(const std::shared_ptr< "
"::grpc::Channel>& channel)\n");
"::grpc::ChannelInterface>& channel)\n");
printer->Indent();
printer->Print(": channel_(channel)");
for (int i = 0; i < service->method_count(); ++i) {

@ -437,8 +437,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server),
@ -451,9 +451,7 @@ Server::BaseAsyncRequest::BaseAsyncRequest(
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
Server::BaseAsyncRequest::~BaseAsyncRequest() {}
bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
if (*status) {
for (size_t i = 0; i < initial_metadata_array_.count; i++) {
context_->client_metadata_.insert(
@ -467,7 +465,7 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
grpc_metadata_array_destroy(&initial_metadata_array_);
context_->set_call(call_);
context_->cq_ = call_cq_;
Call call(call_, server_, call_cq_, server_->max_message_size_);
Call call(call_, server_, call_cq_, server_->max_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@ -480,22 +478,22 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
return true;
}
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
server_->server_, registered_method, &call_, &context_->deadline_,
server_->server(), registered_method, &call_, &context_->deadline_,
&initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
this);
}
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
@ -503,12 +501,12 @@ Server::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_,
grpc_server_request_call(server->server(), &call_, &call_details_,
&initial_metadata_array_, call_cq->cq(),
notification_cq->cq(), this);
}
bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
// TODO(yangg) remove the copy here.
if (*status) {
static_cast<GenericServerContext*>(context_)->method_ =

@ -33,6 +33,7 @@
#include "test/cpp/util/metrics_server.h"
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include "src/proto/grpc/testing/metrics.grpc.pb.h"

Loading…
Cancel
Save