Introduced ServerInterface

pull/4716/head
David Garcia Quintas 9 years ago
parent ec0d07f58e
commit 44f3249018
  1. 2
      BUILD
  2. 2
      Makefile
  3. 33
      build.yaml
  4. 3
      include/grpc++/generic/async_generic_service.h
  5. 4
      include/grpc++/generic/generic_stub.h
  6. 13
      include/grpc++/impl/codegen/channel_interface.h
  7. 250
      include/grpc++/impl/codegen/server_interface.h
  8. 1
      include/grpc++/impl/rpc_method.h
  9. 8
      include/grpc++/impl/service_type.h
  10. 163
      include/grpc++/server.h
  11. 2
      include/grpc++/server_context.h
  12. 1
      include/grpc++/support/async_stream.h
  13. 1
      include/grpc++/support/async_unary_call.h
  14. 1
      src/compiler/cpp_generator.cc
  15. 26
      src/cpp/server/server.cc
  16. 1
      tools/doxygen/Doxyfile.c++
  17. 1
      tools/doxygen/Doxyfile.c++.internal
  18. 4
      tools/run_tests/sources_and_headers.json
  19. 1
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  20. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  21. 1
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  22. 3
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters

@ -785,6 +785,7 @@ cc_library(
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
@ -879,6 +880,7 @@ cc_library(
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",

@ -2984,6 +2984,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/channel_interface.h \
include/grpc++/impl/codegen/server_interface.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/proto_utils.h \
include/grpc++/impl/rpc_method.h \
@ -3250,6 +3251,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/channel_interface.h \
include/grpc++/impl/codegen/server_interface.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/proto_utils.h \
include/grpc++/impl/rpc_method.h \

@ -32,7 +32,6 @@ filegroups:
- include/grpc++/grpc++.h
- include/grpc++/impl/call.h
- include/grpc++/impl/client_unary_call.h
- include/grpc++/impl/codegen/channel_interface.h
- include/grpc++/impl/grpc_library.h
- include/grpc++/impl/proto_utils.h
- include/grpc++/impl/rpc_method.h
@ -121,6 +120,7 @@ filegroups:
- src/core/client_config/client_config.h
- src/core/client_config/connector.h
- src/core/client_config/initial_connect_string.h
- src/core/client_config/lb_policies/load_balancer_api.h
- src/core/client_config/lb_policies/pick_first.h
- src/core/client_config/lb_policies/round_robin.h
- src/core/client_config/lb_policy.h
@ -181,6 +181,7 @@ filegroups:
- src/core/json/json_common.h
- src/core/json/json_reader.h
- src/core/json/json_writer.h
- src/core/proto/grpc/lb/v0/load_balancer.pb.h
- src/core/statistics/census_interface.h
- src/core/statistics/census_rpc_stats.h
- src/core/surface/api_trace.h
@ -236,6 +237,7 @@ filegroups:
- src/core/client_config/connector.c
- src/core/client_config/default_initial_connect_string.c
- src/core/client_config/initial_connect_string.c
- src/core/client_config/lb_policies/load_balancer_api.c
- src/core/client_config/lb_policies/pick_first.c
- src/core/client_config/lb_policies/round_robin.c
- src/core/client_config/lb_policy.c
@ -299,6 +301,7 @@ filegroups:
- src/core/json/json_reader.c
- src/core/json/json_string.c
- src/core/json/json_writer.c
- src/core/proto/grpc/lb/v0/load_balancer.pb.c
- src/core/surface/api_trace.c
- src/core/surface/byte_buffer.c
- src/core/surface/byte_buffer_reader.c
@ -365,6 +368,16 @@ filegroups:
- test/core/util/port_posix.c
- test/core/util/port_windows.c
- test/core/util/slice_splitter.c
- name: nanopb
headers:
- third_party/nanopb/pb.h
- third_party/nanopb/pb_common.h
- third_party/nanopb/pb_decode.h
- third_party/nanopb/pb_encode.h
src:
- third_party/nanopb/pb_common.c
- third_party/nanopb/pb_decode.c
- third_party/nanopb/pb_encode.c
libs:
- name: gpr
build: all
@ -513,6 +526,7 @@ libs:
filegroups:
- grpc_base
- census
- nanopb
secure: true
vs_packages:
- grpc.dependencies.openssl
@ -559,6 +573,7 @@ libs:
filegroups:
- grpc_base
- census
- nanopb
secure: false
vs_project_guid: '{46CEDFFF-9692-456A-AA24-38B5D6BCF4C5}'
- name: grpc_zookeeper
@ -799,8 +814,9 @@ libs:
src:
- src/csharp/ext/grpc_csharp_ext.c
deps:
- gpr
- grpc
- gpr
deps_linkage: static
dll: only
vs_config_type: DynamicLibrary
vs_packages:
@ -2045,6 +2061,16 @@ targets:
secure: false
vs_config_type: Application
vs_project_guid: '{069E9D05-B78B-4751-9252-D21EBAE7DE8E}'
- name: grpclb_api_test
build: test
language: c++
src:
- src/proto/grpc/lb/v0/load_balancer.proto
- test/cpp/grpclb/grpclb_api_test.cc
deps:
- grpc++
- grpc
- gpr
- name: interop_client
build: test
run: false
@ -2449,7 +2475,8 @@ vspackages:
props: false
redist: true
version: 1.2.8.10
- name: grpc.dependencies.openssl
- linkage: static
name: grpc.dependencies.openssl
props: true
redist: true
version: 1.0.204.1

@ -51,6 +51,7 @@ class GenericServerContext GRPC_FINAL : public ServerContext {
private:
friend class Server;
friend class ServerInterface;
grpc::string method_;
grpc::string host_;
@ -76,4 +77,4 @@ class AsyncGenericService GRPC_FINAL {
} // namespace grpc
#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H
#endif // GRPCXX_GENERIC_ASYNC_GENERIC_SERVICE_H

@ -47,7 +47,7 @@ typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer>
// by name.
class GenericStub GRPC_FINAL {
public:
explicit GenericStub(std::shared_ptr<Channel> channel) : channel_(channel) {}
explicit GenericStub(std::shared_ptr<ChannelInterface> channel) : channel_(channel) {}
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> Call(
@ -55,7 +55,7 @@ class GenericStub GRPC_FINAL {
void* tag);
private:
std::shared_ptr<Channel> channel_;
std::shared_ptr<ChannelInterface> channel_;
};
} // namespace grpc

@ -34,19 +34,16 @@
#ifndef GRPCXX_CHANNEL_INTERFACE_H
#define GRPCXX_CHANNEL_INTERFACE_H
#include <memory>
#include <grpc/grpc.h>
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
#include <grpc++/support/time.h>
namespace grpc {
class Call;
class ClientContext;
class RpcMethod;
class CallOpSetInterface;
class ChannelArguments;
class CompletionQueue;
class ChannelCredentials;
class SecureChannelCredentials;
template <class R>
class ClientReader;

@ -0,0 +1,250 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_SERVER_INTERFACE_H
#define GRPCXX_SERVER_INTERFACE_H
#include <grpc++/completion_queue.h>
#include <grpc++/impl/call.h>
namespace grpc {
class ServerCredentials;
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
class ServerAsyncStreamingInterface;
class ServerContext;
class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
class ServerInterface : public CallHook {
public:
/// Shutdown the server, blocking until all rpc processing finishes.
/// Forcefully terminate pending calls after \a deadline expires.
///
/// \param deadline How long to wait until pending rpcs are forcefully
/// terminated.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
/// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
/// Block waiting for all work to complete.
///
/// \warning The server must be either shutting down or some other thread must
/// call \a Shutdown for this function to ever return.
virtual void Wait() = 0;
protected:
friend class AsynchronousService;
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
virtual bool RegisterService(const grpc::string* host, RpcService* service) = 0;
/// Register an asynchronous service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
virtual bool RegisterAsyncService(const grpc::string* host,
AsynchronousService* service) = 0;
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
/// Tries to bind \a server to the given \a addr.
///
/// It can be invoked multiple times.
///
/// \param addr The address to try to bind to the server (eg, localhost:1234,
/// 192.168.1.1:31416, [::1]:27182, etc.).
/// \params creds The credentials associated with the server.
///
/// \return bound port number on sucess, 0 on failure.
///
/// \warning It's an error to call this method on an already started server.
virtual int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) = 0;
/// Start the server.
///
/// \param cqs Completion queues for handling asynchronous services. The
/// caller is required to keep all completion queues live until the server is
/// destroyed.
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
virtual bool Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0;
/// Process one or more incoming calls.
virtual void RunRpc() = 0;
/// Schedule \a RunRpc to run in the threadpool.
virtual void ScheduleCallback() = 0;
virtual void ShutdownInternal(gpr_timespec deadline) = 0;
virtual int max_message_size() const = 0;
virtual grpc_server* server() = 0;
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest() {}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
protected:
ServerInterface* const server_;
ServerContext* const context_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
protected:
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
};
class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
// uses RegisteredAsyncRequest::FinalizeResult
};
template <class Message>
class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
PayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool serialization_status =
*status && payload_ &&
SerializationTraits<Message>::Deserialize(
payload_, request_, server_->max_message_size()).ok();
bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status&&* status;
return ret;
}
private:
grpc_byte_buffer* payload_;
Message* const request_;
};
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
private:
grpc_call_details call_details_;
};
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
call_cq, notification_cq, tag, message);
}
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}
};
} // namespace grpc
#endif // GRPCXX_SERVER_INTERFACE_H

@ -36,7 +36,6 @@
#include <memory>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
namespace grpc {

@ -35,7 +35,7 @@
#define GRPCXX_IMPL_SERVICE_TYPE_H
#include <grpc++/impl/serialization_traits.h>
#include <grpc++/server.h>
#include <grpc++/impl/codegen/server_interface.h>
#include <grpc++/support/config.h>
#include <grpc++/support/status.h>
@ -45,6 +45,7 @@ class Call;
class CompletionQueue;
class RpcService;
class Server;
class ServerInterface;
class ServerCompletionQueue;
class ServerContext;
@ -61,7 +62,7 @@ class ServerAsyncStreamingInterface {
virtual void SendInitialMetadata(void* tag) = 0;
private:
friend class Server;
friend class ServerInterface;
virtual void BindCall(Call* call) = 0;
};
@ -112,7 +113,8 @@ class AsynchronousService {
private:
friend class Server;
Server* server_;
friend class ServerInterface;
ServerInterface* server_;
const char** const method_names_;
size_t method_count_;
void** request_args_;

@ -41,6 +41,7 @@
#include <grpc++/impl/call.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/sync.h>
#include <grpc++/impl/codegen/server_interface.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/support/channel_arguments.h>
#include <grpc++/support/config.h>
@ -63,28 +64,16 @@ class ThreadPoolInterface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
class Server GRPC_FINAL : public ServerInterface,
public GrpcLibrary {
public:
~Server();
/// Shutdown the server, blocking until all rpc processing finishes.
/// Forcefully terminate pending calls after \a deadline expires.
///
/// \param deadline How long to wait until pending rpcs are forcefully
/// terminated.
template <class T>
void Shutdown(const T& deadline) {
ShutdownInternal(TimePoint<T>(deadline).raw_time());
}
/// Shutdown the server, waiting for all rpc processing to finish.
void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
/// Block waiting for all work to complete.
///
/// \warning The server must be either shutting down or some other thread must
/// call \a Shutdown for this function to ever return.
void Wait();
void Wait() override;
/// Global Callbacks
///
@ -112,6 +101,10 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
class AsyncRequest;
class ShutdownRequest;
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
/// Server constructors. To be used by \a ServerBuilder only.
///
/// \param thread_pool The threadpool instance to use for call processing.
@ -123,16 +116,16 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
bool RegisterService(const grpc::string* host, RpcService* service);
bool RegisterService(const grpc::string* host, RpcService* service) override;
/// Register an asynchronous service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
bool RegisterAsyncService(const grpc::string* host,
AsynchronousService* service);
AsynchronousService* service) override;
/// Register a generic service. This call does not take ownership of the
/// service. The service must exist for the lifetime of the Server instance.
void RegisterAsyncGenericService(AsyncGenericService* service);
void RegisterAsyncGenericService(AsyncGenericService* service) override;
/// Tries to bind \a server to the given \a addr.
///
@ -145,7 +138,7 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// \return bound port number on sucess, 0 on failure.
///
/// \warning It's an error to call this method on an already started server.
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
int AddListeningPort(const grpc::string& addr, ServerCredentials* creds) override;
/// Start the server.
///
@ -155,141 +148,21 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
/// \param num_cqs How many completion queues does \a cqs hold.
///
/// \return true on a successful shutdown.
bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
void HandleQueueClosed();
bool Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
/// Process one or more incoming calls.
void RunRpc();
void RunRpc() override;
/// Schedule \a RunRpc to run in the threadpool.
void ScheduleCallback();
void ScheduleCallback() override;
void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
void ShutdownInternal(gpr_timespec deadline);
class BaseAsyncRequest : public CompletionQueueTag {
public:
BaseAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
protected:
Server* const server_;
ServerContext* const context_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
grpc_call* call_;
grpc_metadata_array initial_metadata_array_;
};
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(Server* server, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
protected:
void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq);
};
class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
NoPayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
IssueRequest(registered_method, nullptr, notification_cq);
}
// uses RegisteredAsyncRequest::FinalizeResult
};
void ShutdownInternal(gpr_timespec deadline) override;
template <class Message>
class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
public:
PayloadAsyncRequest(void* registered_method, Server* server,
ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag),
request_(request) {
IssueRequest(registered_method, &payload_, notification_cq);
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
bool serialization_status =
*status && payload_ &&
SerializationTraits<Message>::Deserialize(
payload_, request_, server_->max_message_size_).ok();
bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
*status = serialization_status&&* status;
return ret;
}
private:
grpc_byte_buffer* payload_;
Message* const request_;
};
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(Server* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
private:
grpc_call_details call_details_;
};
class UnimplementedAsyncRequestContext;
class UnimplementedAsyncRequest;
class UnimplementedAsyncResponse;
int max_message_size() const override { return max_message_size_; };
template <class Message>
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
call_cq, notification_cq, tag, message);
}
void RequestAsyncCall(void* registered_method, ServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
notification_cq, tag);
}
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true);
}
grpc_server* server() override { return server_; };
const int max_message_size_;

@ -80,6 +80,7 @@ class Call;
class CallOpBuffer;
class CompletionQueue;
class Server;
class ServerInterface;
namespace testing {
class InteropServerContextInspector;
@ -138,6 +139,7 @@ class ServerContext {
private:
friend class ::grpc::testing::InteropServerContextInspector;
friend class ::grpc::ServerInterface;
friend class ::grpc::Server;
template <class W, class R>
friend class ::grpc::ServerAsyncReader;

@ -35,7 +35,6 @@
#define GRPCXX_SUPPORT_ASYNC_STREAM_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>

@ -35,7 +35,6 @@
#define GRPCXX_SUPPORT_ASYNC_UNARY_CALL_H
#include <grpc/support/log.h>
#include <grpc++/channel.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/completion_queue.h>

@ -123,7 +123,6 @@ grpc::string GetHeaderIncludes(const grpc::protobuf::FileDescriptor *file,
"\n"
"namespace grpc {\n"
"class CompletionQueue;\n"
"class Channel;\n"
"class RpcService;\n"
"class ServerCompletionQueue;\n"
"class ServerContext;\n"

@ -439,8 +439,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
Server* server, ServerContext* context,
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
bool delete_on_finalize)
: server_(server),
@ -453,9 +453,7 @@ Server::BaseAsyncRequest::BaseAsyncRequest(
memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
}
Server::BaseAsyncRequest::~BaseAsyncRequest() {}
bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
if (*status) {
for (size_t i = 0; i < initial_metadata_array_.count; i++) {
context_->client_metadata_.insert(
@ -469,7 +467,7 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
grpc_metadata_array_destroy(&initial_metadata_array_);
context_->set_call(call_);
context_->cq_ = call_cq_;
Call call(call_, server_, call_cq_, server_->max_message_size_);
Call call(call_, server_, call_cq_, server_->max_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@ -482,22 +480,22 @@ bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
return true;
}
Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
Server* server, ServerContext* context,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void Server::RegisteredAsyncRequest::IssueRequest(
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
void* registered_method, grpc_byte_buffer** payload,
ServerCompletionQueue* notification_cq) {
grpc_server_request_registered_call(
server_->server_, registered_method, &call_, &context_->deadline_,
server_->server(), registered_method, &call_, &context_->deadline_,
&initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
this);
}
Server::GenericAsyncRequest::GenericAsyncRequest(
Server* server, GenericServerContext* context,
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
@ -505,12 +503,12 @@ Server::GenericAsyncRequest::GenericAsyncRequest(
grpc_call_details_init(&call_details_);
GPR_ASSERT(notification_cq);
GPR_ASSERT(call_cq);
grpc_server_request_call(server->server_, &call_, &call_details_,
grpc_server_request_call(server->server(), &call_, &call_details_,
&initial_metadata_array_, call_cq->cq(),
notification_cq->cq(), this);
}
bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
// TODO(yangg) remove the copy here.
if (*status) {
static_cast<GenericServerContext*>(context_)->method_ =

@ -770,6 +770,7 @@ include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/channel_interface.h \
include/grpc++/impl/codegen/server_interface.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/proto_utils.h \
include/grpc++/impl/rpc_method.h \

@ -770,6 +770,7 @@ include/grpc++/grpc++.h \
include/grpc++/impl/call.h \
include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/codegen/channel_interface.h \
include/grpc++/impl/codegen/server_interface.h \
include/grpc++/impl/grpc_library.h \
include/grpc++/impl/proto_utils.h \
include/grpc++/impl/rpc_method.h \

@ -3829,6 +3829,7 @@
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
@ -3884,6 +3885,7 @@
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
@ -4017,6 +4019,7 @@
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",
@ -4069,6 +4072,7 @@
"include/grpc++/impl/call.h",
"include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/codegen/channel_interface.h",
"include/grpc++/impl/codegen/server_interface.h",
"include/grpc++/impl/grpc_library.h",
"include/grpc++/impl/proto_utils.h",
"include/grpc++/impl/rpc_method.h",

@ -268,6 +268,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\channel_interface.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\server_interface.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\proto_utils.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\rpc_method.h" />

@ -129,6 +129,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\channel_interface.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\server_interface.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>

@ -268,6 +268,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\client_unary_call.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\channel_interface.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\server_interface.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\proto_utils.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\rpc_method.h" />

@ -114,6 +114,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\channel_interface.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\server_interface.h">
<Filter>include\grpc++\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\grpc_library.h">
<Filter>include\grpc++\impl</Filter>
</ClInclude>

Loading…
Cancel
Save