Add ServerContext. It is currently empty and deadline and metadata will come to

it in subsequent cl's.
	Change on 2014/12/17 by yangg <yangg@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82372934
pull/1/merge
yangg 10 years ago committed by Michael Lumish
parent 081e2b0e63
commit a4b6f5df94
  1. 47
      include/grpc++/server_context.h
  2. 20
      src/compiler/cpp_generator.cc
  3. 59
      src/cpp/server/rpc_service_method.h
  4. 40
      src/cpp/server/server_context_impl.cc
  5. 49
      src/cpp/server/server_context_impl.h
  6. 28
      src/cpp/server/server_rpc_handler.cc
  7. 6
      src/cpp/server/server_rpc_handler.h
  8. 12
      test/cpp/end2end/end2end_test.cc
  9. 5
      test/cpp/qps/server.cc

@ -0,0 +1,47 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_SERVER_CONTEXT_H_
#define __GRPCPP_SERVER_CONTEXT_H_
namespace grpc {
// Interface of server side rpc context.
class ServerContext {
public:
virtual ~ServerContext() {}
};
} // namespace grpc
#endif // __GRPCPP_SERVER_CONTEXT_H_

@ -103,7 +103,8 @@ string GetHeaderIncludes(const google::protobuf::FileDescriptor* file) {
"\n"
"namespace grpc {\n"
"class ChannelInterface;\n"
"class RpcService;\n";
"class RpcService;\n"
"class ServerContext;\n";
if (HasClientOnlyStreaming(file)) {
temp.append("template <class OutMessage> class ClientWriter;\n");
temp.append("template <class InMessage> class ServerReader;\n");
@ -170,20 +171,24 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer* printer,
grpc_cpp_generator::ClassName(method->output_type(), true);
if (NoStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$(const $Request$* request, "
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
"$Response$* response);\n");
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReader<$Request$>* reader, "
"$Response$* response);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$(const $Request$* request, "
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, const $Request$* request, "
"::grpc::ServerWriter<$Response$>* writer);\n");
} else if (BidiStreaming(method)) {
printer->Print(*vars,
"virtual ::grpc::Status $Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter<$Response$, $Request$>* stream);"
"\n");
}
@ -313,6 +318,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
if (NoStreaming(method)) {
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, $Response$* response) {\n");
printer->Print(
" return ::grpc::Status("
@ -321,6 +327,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReader<$Request$>* reader, "
"$Response$* response) {\n");
printer->Print(
@ -330,6 +337,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
} else if (ServerOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"const $Request$* request, "
"::grpc::ServerWriter<$Response$>* writer) {\n");
printer->Print(
@ -339,6 +347,7 @@ void PrintSourceServerMethod(google::protobuf::io::Printer* printer,
} else if (BidiStreaming(method)) {
printer->Print(*vars,
"::grpc::Status $Service$::Service::$Method$("
"::grpc::ServerContext* context, "
"::grpc::ServerReaderWriter<$Response$, $Request$>* "
"stream) {\n");
printer->Print(
@ -392,7 +401,7 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
" new ::grpc::RpcMethodHandler<$Service$::Service, $Request$, "
"$Response$>(\n"
" std::function<::grpc::Status($Service$::Service*, "
"const $Request$*, $Response$*)>("
"::grpc::ServerContext*, const $Request$*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
} else if (ClientOnlyStreaming(method)) {
@ -403,6 +412,7 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
" new ::grpc::ClientStreamingHandler<"
"$Service$::Service, $Request$, $Response$>(\n"
" std::function<::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
"::grpc::ServerReader<$Request$>*, $Response$*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
@ -414,6 +424,7 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
" new ::grpc::ServerStreamingHandler<"
"$Service$::Service, $Request$, $Response$>(\n"
" std::function<::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
"const $Request$*, ::grpc::ServerWriter<$Response$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");
@ -425,6 +436,7 @@ void PrintSourceService(google::protobuf::io::Printer* printer,
" new ::grpc::BidiStreamingHandler<"
"$Service$::Service, $Request$, $Response$>(\n"
" std::function<::grpc::Status($Service$::Service*, "
"::grpc::ServerContext*, "
"::grpc::ServerReaderWriter<$Response$, $Request$>*)>("
"&$Service$::Service::$Method$), this),\n"
" new $Request$, new $Response$));\n");

@ -45,6 +45,7 @@
#include <grpc++/stream.h>
namespace grpc {
class ServerContext;
class StreamContextInterface;
// TODO(rocking): we might need to split this file into multiple ones.
@ -54,11 +55,19 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp)
: request(req), response(resp), stream_context(nullptr) {}
HandlerParameter(const google::protobuf::Message* req, google::protobuf::Message* resp,
StreamContextInterface* context)
: request(req), response(resp), stream_context(context) {}
HandlerParameter(ServerContext* context, const google::protobuf::Message* req,
google::protobuf::Message* resp)
: server_context(context),
request(req),
response(resp),
stream_context(nullptr) {}
HandlerParameter(ServerContext* context, const google::protobuf::Message* req,
google::protobuf::Message* resp, StreamContextInterface* stream)
: server_context(context),
request(req),
response(resp),
stream_context(stream) {}
ServerContext* server_context;
const google::protobuf::Message* request;
google::protobuf::Message* response;
StreamContextInterface* stream_context;
@ -70,20 +79,23 @@ class MethodHandler {
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
public:
RpcMethodHandler(std::function<Status(ServiceType*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
RpcMethodHandler(
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
// Invoke application function, cast proto messages to their actual types.
return func_(service_, dynamic_cast<const RequestType*>(param.request),
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request),
dynamic_cast<ResponseType*>(param.response));
}
private:
// Application provided rpc handler function.
std::function<Status(ServiceType*, const RequestType*, ResponseType*)> func_;
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ResponseType*)> func_;
// The class the above handler function lives in.
ServiceType* service_;
};
@ -93,20 +105,20 @@ template <class ServiceType, class RequestType, class ResponseType>
class ClientStreamingHandler : public MethodHandler {
public:
ClientStreamingHandler(
std::function<Status(ServiceType*, ServerReader<RequestType>*,
ResponseType*)> func,
std::function<Status(ServiceType*, ServerContext*,
ServerReader<RequestType>*, ResponseType*)> func,
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
ServerReader<RequestType> reader(param.stream_context);
return func_(service_, &reader,
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
}
private:
std::function<Status(ServiceType*, ServerReader<RequestType>*, ResponseType*)>
func_;
std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
ResponseType*)> func_;
ServiceType* service_;
};
@ -115,19 +127,19 @@ template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler : public MethodHandler {
public:
ServerStreamingHandler(
std::function<Status(ServiceType*, const RequestType*,
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func,
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
ServerWriter<ResponseType> writer(param.stream_context);
return func_(service_, dynamic_cast<const RequestType*>(param.request),
&writer);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
}
private:
std::function<Status(ServiceType*, const RequestType*,
std::function<Status(ServiceType*, ServerContext*, const RequestType*,
ServerWriter<ResponseType>*)> func_;
ServiceType* service_;
};
@ -137,18 +149,19 @@ template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler : public MethodHandler {
public:
BidiStreamingHandler(
std::function<Status(
ServiceType*, ServerReaderWriter<ResponseType, RequestType>*)> func,
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)>
func,
ServiceType* service)
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context);
return func_(service_, &stream);
return func_(service_, param.server_context, &stream);
}
private:
std::function<Status(ServiceType*,
std::function<Status(ServiceType*, ServerContext*,
ServerReaderWriter<ResponseType, RequestType>*)> func_;
ServiceType* service_;
};

@ -0,0 +1,40 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/cpp/server/server_context_impl.h"
namespace grpc {
ServerContextImpl::ServerContextImpl() {}
} // namespace grpc

@ -0,0 +1,49 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_
#define __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_
#include <grpc++/server_context.h>
namespace grpc {
class ServerContextImpl : public ServerContext {
public:
ServerContextImpl();
~ServerContextImpl() {}
};
} // namespace grpc
#endif // __GRPCPP_INTERNAL_SERVER_SERVER_CONTEXT_IMPL_H_

@ -35,14 +35,15 @@
#include <grpc/support/log.h>
#include "src/cpp/server/rpc_service_method.h"
#include "src/cpp/server/server_context_impl.h"
#include "src/cpp/stream/stream_context.h"
#include <grpc++/async_server_context.h>
namespace grpc {
ServerRpcHandler::ServerRpcHandler(AsyncServerContext* server_context,
ServerRpcHandler::ServerRpcHandler(AsyncServerContext* async_server_context,
RpcServiceMethod* method)
: server_context_(server_context), method_(method) {}
: async_server_context_(async_server_context), method_(method) {}
void ServerRpcHandler::StartRpc() {
if (method_ == nullptr) {
@ -52,27 +53,29 @@ void ServerRpcHandler::StartRpc() {
return;
}
ServerContextImpl user_context;
if (method_->method_type() == RpcMethod::NORMAL_RPC) {
// Start the rpc on this dedicated completion queue.
server_context_->Accept(cq_.cq());
async_server_context_->Accept(cq_.cq());
// Allocate request and response.
std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto());
std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto());
// Read request
server_context_->StartRead(request.get());
async_server_context_->StartRead(request.get());
auto type = WaitForNextEvent();
GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK);
// Run the application's rpc handler
MethodHandler* handler = method_->handler();
Status status = handler->RunHandler(
MethodHandler::HandlerParameter(request.get(), response.get()));
Status status = handler->RunHandler(MethodHandler::HandlerParameter(
&user_context, request.get(), response.get()));
if (status.IsOk()) {
// Send the response if we get an ok status.
server_context_->StartWrite(*response, 0);
async_server_context_->StartWrite(*response, 0);
type = WaitForNextEvent();
if (type != CompletionQueue::SERVER_WRITE_OK) {
status = Status(StatusCode::INTERNAL, "Error writing response.");
@ -86,13 +89,13 @@ void ServerRpcHandler::StartRpc() {
std::unique_ptr<google::protobuf::Message> request(method_->AllocateRequestProto());
std::unique_ptr<google::protobuf::Message> response(method_->AllocateResponseProto());
StreamContext stream_context(*method_, server_context_->call(), cq_.cq(),
request.get(), response.get());
StreamContext stream_context(*method_, async_server_context_->call(),
cq_.cq(), request.get(), response.get());
// Run the application's rpc handler
MethodHandler* handler = method_->handler();
Status status = handler->RunHandler(MethodHandler::HandlerParameter(
request.get(), response.get(), &stream_context));
&user_context, request.get(), response.get(), &stream_context));
if (status.IsOk() &&
method_->method_type() == RpcMethod::CLIENT_STREAMING) {
stream_context.Write(response.get(), false);
@ -107,13 +110,14 @@ CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() {
CompletionQueue::CompletionType type = cq_.Next(&tag);
if (type != CompletionQueue::QUEUE_CLOSED &&
type != CompletionQueue::RPC_END) {
GPR_ASSERT(static_cast<AsyncServerContext*>(tag) == server_context_.get());
GPR_ASSERT(static_cast<AsyncServerContext*>(tag) ==
async_server_context_.get());
}
return type;
}
void ServerRpcHandler::FinishRpc(const Status& status) {
server_context_->StartWriteStatus(status);
async_server_context_->StartWriteStatus(status);
CompletionQueue::CompletionType type;
// HALFCLOSE_OK and RPC_END events come in either order.

@ -46,8 +46,8 @@ class RpcServiceMethod;
class ServerRpcHandler {
public:
// Takes ownership of server_context.
ServerRpcHandler(AsyncServerContext* server_context,
// Takes ownership of async_server_context.
ServerRpcHandler(AsyncServerContext* async_server_context,
RpcServiceMethod* method);
void StartRpc();
@ -56,7 +56,7 @@ class ServerRpcHandler {
CompletionQueue::CompletionType WaitForNextEvent();
void FinishRpc(const Status& status);
std::unique_ptr<AsyncServerContext> server_context_;
std::unique_ptr<AsyncServerContext> async_server_context_;
RpcServiceMethod* method_;
CompletionQueue cq_;
};

@ -40,6 +40,7 @@
#include <grpc++/create_channel.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include <gtest/gtest.h>
@ -55,14 +56,16 @@ namespace grpc {
class TestServiceImpl : public TestService::Service {
public:
Status Echo(const EchoRequest* request, EchoResponse* response) {
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) {
response->set_message(request->message());
return Status::OK;
}
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerReader<EchoRequest>* reader,
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) {
EchoRequest request;
response->set_message("");
@ -74,7 +77,7 @@ class TestServiceImpl : public TestService::Service {
// Return 3 messages.
// TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(const EchoRequest* request,
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) {
EchoResponse response;
response.set_message(request->message() + "0");
@ -87,7 +90,8 @@ class TestServiceImpl : public TestService::Service {
return Status::OK;
}
Status BidiStream(ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
Status BidiStream(ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
EchoRequest request;
EchoResponse response;
while (stream->Read(&request)) {

@ -39,6 +39,7 @@
#include <grpc++/config.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/status.h>
#include "test/cpp/interop/test.pb.h"
@ -50,6 +51,7 @@ DEFINE_int32(port, 0, "Server port.");
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::testing::Payload;
using grpc::testing::PayloadType;
using grpc::testing::SimpleRequest;
@ -71,7 +73,8 @@ bool SetPayload(PayloadType type, int size, Payload* payload) {
class TestServiceImpl : public TestService::Service {
public:
Status UnaryCall(const SimpleRequest* request, SimpleResponse* response) {
Status UnaryCall(ServerContext* context, const SimpleRequest* request,
SimpleResponse* response) {
if (request->has_response_size() && request->response_size() > 0) {
if (!SetPayload(request->response_type(), request->response_size(),
response->mutable_payload())) {

Loading…
Cancel
Save