Use a sync service to handle requests to unknown services

pull/2921/head
yang-g 9 years ago
parent abfa427d65
commit 9b7757dd35
  1. 2
      include/grpc++/completion_queue.h
  2. 15
      include/grpc++/impl/rpc_service_method.h
  3. 2
      include/grpc++/server.h
  4. 2
      include/grpc++/server_context.h
  5. 44
      src/cpp/server/server.cc
  6. 7
      src/cpp/server/server_builder.cc
  7. 22
      test/cpp/end2end/async_end2end_test.cc
  8. 23
      test/cpp/end2end/end2end_test.cc
  9. 4
      test/cpp/util/echo.proto

@ -63,6 +63,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
class UnknownMethodHandler;
class ChannelInterface; class ChannelInterface;
class ClientContext; class ClientContext;
@ -138,6 +139,7 @@ class CompletionQueue : public GrpcLibrary {
friend class ServerStreamingHandler; friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler; friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::Server; friend class ::grpc::Server;
friend class ::grpc::ServerContext; friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage> template <class InputMessage, class OutputMessage>

@ -208,6 +208,21 @@ class BidiStreamingHandler : public MethodHandler {
ServiceType* service_; ServiceType* service_;
}; };
// Handle unknown method by returning UNIMPLEMENTED error.
class UnknownMethodHandler : public MethodHandler {
public:
void RunHandler(const HandlerParameter& param) GRPC_FINAL {
Status status(StatusCode::UNIMPLEMENTED, "");
CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
if (!param.server_context->sent_initial_metadata_) {
ops.SendInitialMetadata(param.server_context->initial_metadata_);
}
ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
param.call->PerformOps(&ops);
param.call->cq()->Pluck(&ops);
}
};
// Server side rpc method class // Server side rpc method class
class RpcServiceMethod : public RpcMethod { class RpcServiceMethod : public RpcMethod {
public: public:

@ -228,6 +228,8 @@ class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
grpc::condition_variable callback_cv_; grpc::condition_variable callback_cv_;
std::list<SyncRequest>* sync_methods_; std::list<SyncRequest>* sync_methods_;
std::unique_ptr<RpcServiceMethod> unknown_method_;
bool has_generic_service_;
// Pointer to the c grpc server. // Pointer to the c grpc server.
grpc_server* const server_; grpc_server* const server_;

@ -73,6 +73,7 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler; class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler; class BidiStreamingHandler;
class UnknownMethodHandler;
class Call; class Call;
class CallOpBuffer; class CallOpBuffer;
@ -159,6 +160,7 @@ class ServerContext {
friend class ServerStreamingHandler; friend class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType> template <class ServiceType, class RequestType, class ResponseType>
friend class BidiStreamingHandler; friend class BidiStreamingHandler;
friend class UnknownMethodHandler;
friend class ::grpc::ClientContext; friend class ::grpc::ClientContext;
// Prevent copying. // Prevent copying.

@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC || has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() == method->method_type() ==
RpcMethod::SERVER_STREAMING), RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) { cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_); grpc_metadata_array_init(&request_metadata_);
} }
~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); } ~SyncRequest() {
if (call_details_) {
delete call_details_;
}
grpc_metadata_array_destroy(&request_metadata_);
}
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) { static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr; void* tag = nullptr;
@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) { void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_); GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true; in_flight_ = true;
GPR_ASSERT(GRPC_CALL_OK == if (tag_) {
grpc_server_request_registered_call( GPR_ASSERT(GRPC_CALL_OK ==
server, tag_, &call_, &deadline_, &request_metadata_, grpc_server_request_registered_call(
has_request_payload_ ? &request_payload_ : nullptr, cq_, server, tag_, &call_, &deadline_, &request_metadata_,
notify_cq, this)); has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this));
} else {
if (!call_details_) {
call_details_ = new grpc_call_details;
grpc_call_details_init(call_details_);
}
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
server, &call_, call_details_,
&request_metadata_, cq_, notify_cq, this));
}
} }
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE { bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
if (!*status) { if (!*status) {
grpc_completion_queue_destroy(cq_); grpc_completion_queue_destroy(cq_);
} }
if (call_details_) {
deadline_ = call_details_->deadline;
grpc_call_details_destroy(call_details_);
grpc_call_details_init(call_details_);
}
return true; return true;
} }
@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
bool in_flight_; bool in_flight_;
const bool has_request_payload_; const bool has_request_payload_;
grpc_call* call_; grpc_call* call_;
grpc_call_details* call_details_;
gpr_timespec deadline_; gpr_timespec deadline_;
grpc_metadata_array request_metadata_; grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_; grpc_byte_buffer* request_payload_;
@ -183,6 +205,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false), shutdown_(false),
num_running_cb_(0), num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>), sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
server_(CreateServer(max_message_size)), server_(CreateServer(max_message_size)),
thread_pool_(thread_pool), thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) { thread_pool_owned_(thread_pool_owned) {
@ -223,7 +246,8 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) {
return true; return true;
} }
bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) { bool Server::RegisterAsyncService(const grpc::string* host,
AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr && GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server."); "Can only register an asynchronous service against one server.");
service->server_ = this; service->server_ = this;
@ -245,6 +269,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr && GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server."); "Can only register an async generic service against one server.");
service->server_ = this; service->server_ = this;
has_generic_service_ = true;
} }
int Server::AddListeningPort(const grpc::string& addr, int Server::AddListeningPort(const grpc::string& addr,
@ -258,6 +283,11 @@ bool Server::Start() {
started_ = true; started_ = true;
grpc_server_start(server_); grpc_server_start(server_);
if (!has_generic_service_) {
unknown_method_.reset(new RpcServiceMethod(
"unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
sync_methods_->emplace_back(SyncRequest(unknown_method_.get(), nullptr));
}
// Start processing rpcs. // Start processing rpcs.
if (!sync_methods_->empty()) { if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) { for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {

@ -38,6 +38,7 @@
#include <grpc++/impl/service_type.h> #include <grpc++/impl/service_type.h>
#include <grpc++/server.h> #include <grpc++/server.h>
#include <grpc++/thread_pool_interface.h> #include <grpc++/thread_pool_interface.h>
#include <grpc++/fixed_size_thread_pool.h>
namespace grpc { namespace grpc {
@ -100,6 +101,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool(); thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true; thread_pool_owned = true;
} }
// Async services only, create a thread pool to handle requests to unknown
// services.
if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
thread_pool_ = new FixedSizeThreadPool(1);
thread_pool_owned = true;
}
std::unique_ptr<Server> server( std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_)); new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {

@ -666,6 +666,28 @@ TEST_F(AsyncEnd2endTest, ServerCheckDone) {
EXPECT_TRUE(recv_status.ok()); EXPECT_TRUE(recv_status.ok());
} }
TEST_F(AsyncEnd2endTest, UnimplementedRpc) {
std::shared_ptr<ChannelInterface> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
stub =
std::move(grpc::cpp::test::util::UnimplementedService::NewStub(channel));
EchoRequest send_request;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

@ -290,13 +290,17 @@ class End2endTest : public ::testing::TestWithParam<bool> {
if (proxy_server_) proxy_server_->Shutdown(); if (proxy_server_) proxy_server_->Shutdown();
} }
void ResetStub(bool use_proxy) { void ResetChannel() {
SslCredentialsOptions ssl_opts = {test_root_cert, "", ""}; SslCredentialsOptions ssl_opts = {test_root_cert, "", ""};
ChannelArguments args; ChannelArguments args;
args.SetSslTargetNameOverride("foo.test.google.fr"); args.SetSslTargetNameOverride("foo.test.google.fr");
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
channel_ = CreateChannel(server_address_.str(), SslCredentials(ssl_opts), channel_ = CreateChannel(server_address_.str(), SslCredentials(ssl_opts),
args); args);
}
void ResetStub(bool use_proxy) {
ResetChannel();
if (use_proxy) { if (use_proxy) {
proxy_service_.reset(new Proxy(channel_)); proxy_service_.reset(new Proxy(channel_));
int port = grpc_pick_unused_port_or_die(); int port = grpc_pick_unused_port_or_die();
@ -930,6 +934,23 @@ TEST_F(End2endTest, ChannelState) {
EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false)); EXPECT_EQ(GRPC_CHANNEL_CONNECTING, channel_->GetState(false));
} }
// Talking to a non-existing service.
TEST_F(End2endTest, NonExistingService) {
ResetChannel();
std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
stub =
std::move(grpc::cpp::test::util::UnimplementedService::NewStub(channel_));
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
ClientContext context;
Status s = stub->Unimplemented(&context, request, &response);
EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
EXPECT_EQ("", s.error_message());
}
INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true)); INSTANTIATE_TEST_CASE_P(End2end, End2endTest, ::testing::Values(false, true));
} // namespace testing } // namespace testing

@ -41,3 +41,7 @@ service TestService {
rpc BidiStream(stream EchoRequest) returns (stream EchoResponse); rpc BidiStream(stream EchoRequest) returns (stream EchoResponse);
rpc Unimplemented(EchoRequest) returns (EchoResponse); rpc Unimplemented(EchoRequest) returns (EchoResponse);
} }
service UnimplementedService {
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}

Loading…
Cancel
Save