Async server dispatch

pull/501/head
Craig Tiller 10 years ago
parent 984b09087f
commit 3d6ceb6461
  1. 5
      include/grpc++/impl/service_type.h
  2. 54
      include/grpc++/server_context.h
  3. 47
      src/cpp/server/server.cc
  4. 6
      src/cpp/server/server_context.cc

@ -42,6 +42,7 @@ class Message;
namespace grpc {
class Call;
class RpcService;
class Server;
class ServerContext;
@ -59,6 +60,10 @@ class ServerAsyncStreamingInterface {
virtual void SendInitialMetadata(void* tag) = 0;
virtual void Finish(const Status& status, void* tag) = 0;
private:
friend class Server;
virtual void BindCall(Call* call) = 0;
};
class AsynchronousService {

@ -39,43 +39,61 @@
#include "config.h"
struct grpc_metadata;
struct gpr_timespec;
struct grpc_metadata;
struct grpc_call;
namespace grpc {
template <class R> class ServerAsyncReader;
template <class W> class ServerAsyncWriter;
template <class R, class W> class ServerAsyncReaderWriter;
template <class R> class ServerReader;
template <class W> class ServerWriter;
template <class R, class W> class ServerReaderWriter;
template <class R>
class ServerAsyncReader;
template <class W>
class ServerAsyncWriter;
template <class R, class W>
class ServerAsyncReaderWriter;
template <class R>
class ServerReader;
template <class W>
class ServerWriter;
template <class R, class W>
class ServerReaderWriter;
class CallOpBuffer;
class Server;
// Interface of server side rpc context.
class ServerContext {
class ServerContext final {
public:
virtual ~ServerContext() {}
ServerContext(); // for async calls
~ServerContext();
std::chrono::system_clock::time_point absolute_deadline() { return deadline_; }
std::chrono::system_clock::time_point absolute_deadline() {
return deadline_;
}
void AddInitialMetadata(const grpc::string& key, const grpc::string& value);
void AddTrailingMetadata(const grpc::string& key, const grpc::string& value);
private:
friend class ::grpc::Server;
template <class R> friend class ::grpc::ServerAsyncReader;
template <class W> friend class ::grpc::ServerAsyncWriter;
template <class R, class W> friend class ::grpc::ServerAsyncReaderWriter;
template <class R> friend class ::grpc::ServerReader;
template <class W> friend class ::grpc::ServerWriter;
template <class R, class W> friend class ::grpc::ServerReaderWriter;
template <class R>
friend class ::grpc::ServerAsyncReader;
template <class W>
friend class ::grpc::ServerAsyncWriter;
template <class R, class W>
friend class ::grpc::ServerAsyncReaderWriter;
template <class R>
friend class ::grpc::ServerReader;
template <class W>
friend class ::grpc::ServerWriter;
template <class R, class W>
friend class ::grpc::ServerReaderWriter;
ServerContext(gpr_timespec deadline, grpc_metadata *metadata, size_t metadata_count);
ServerContext(gpr_timespec deadline, grpc_metadata* metadata,
size_t metadata_count);
const std::chrono::system_clock::time_point deadline_;
std::chrono::system_clock::time_point deadline_;
grpc_call* call_ = nullptr;
bool sent_initial_metadata_ = false;
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;

@ -45,6 +45,7 @@
#include <grpc++/thread_pool_interface.h>
#include "src/cpp/proto/proto_utils.h"
#include "src/cpp/util/time.h"
namespace grpc {
@ -175,15 +176,12 @@ class Server::SyncRequest final : public CompletionQueueTag {
has_response_payload_(mrd->has_response_payload_),
request_payload_(mrd->request_payload_),
method_(mrd->method_) {
ctx_.call_ = mrd->call_;
GPR_ASSERT(mrd->in_flight_);
mrd->in_flight_ = false;
mrd->request_metadata_.count = 0;
}
~CallData() {
if (call_.call()) grpc_call_destroy(call_.call());
}
void Run() {
std::unique_ptr<google::protobuf::Message> req;
std::unique_ptr<google::protobuf::Message> res;
@ -283,20 +281,57 @@ class Server::AsyncRequest final : public CompletionQueueTag {
::google::protobuf::Message* request,
ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
void* tag)
: tag_(tag), request_(request), stream_(stream), ctx_(ctx) {
: tag_(tag),
request_(request),
stream_(stream),
cq_(cq),
ctx_(ctx),
server_(server) {
memset(&array_, 0, sizeof(array_));
grpc_server_request_registered_call(
server->server_, registered_method, &call_, &deadline_, &array_,
request ? &payload_ : nullptr, cq->cq(), this);
}
void FinalizeResult(void** tag, bool* status) override {}
~AsyncRequest() {
if (payload_) {
grpc_byte_buffer_destroy(payload_);
}
grpc_metadata_array_destroy(&array_);
}
void FinalizeResult(void** tag, bool* status) override {
*tag = tag_;
if (*status && request_) {
if (payload_) {
*status = DeserializeProto(payload_, request_);
} else {
*status = false;
}
}
if (*status) {
ctx_->deadline_ = Timespec2Timepoint(deadline_);
for (size_t i = 0; i < array_.count; i++) {
ctx_->client_metadata_.insert(std::make_pair(
grpc::string(array_.metadata[i].key),
grpc::string(
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
}
ctx_->call_ = call_;
Call call(call_, server_, cq_);
stream_->BindCall(&call);
delete this;
}
private:
void* const tag_;
::google::protobuf::Message* const request_;
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_;
ServerContext* const ctx_;
Server* const server_;
grpc_call* call_ = nullptr;
gpr_timespec deadline_;
grpc_metadata_array array_;

@ -49,4 +49,10 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata *metadata,
}
}
ServerContext::~ServerContext() {
if (call_) {
grpc_call_destroy(call_);
}
}
} // namespace grpc

Loading…
Cancel
Save