Starting to scratch out the API

pull/431/head
Craig Tiller 10 years ago
parent 2ca4708921
commit c4965751a0
  1. 1
      build.json
  2. 80
      include/grpc++/call.h
  3. 32
      include/grpc++/channel_interface.h
  4. 4
      include/grpc++/completion_queue.h
  5. 4
      include/grpc++/server.h
  6. 149
      include/grpc++/stream.h
  7. 14
      src/compiler/cpp_generator.cc
  8. 104
      src/cpp/client/channel.cc
  9. 13
      src/cpp/client/channel.h
  10. 42
      src/cpp/common/call.cc
  11. 7
      src/cpp/common/completion_queue.cc
  12. 1
      src/cpp/server/server_rpc_handler.h

@ -413,6 +413,7 @@
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/internal_stub.cc",
"src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
"src/cpp/proto/proto_utils.cc",

@ -0,0 +1,80 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPCPP_CALL_H__
#define __GRPCPP_CALL_H__
#include <grpc++/status.h>
#include <grpc/grpc.h>
namespace google {
namespace protobuf {
class Message;
} // namespace protobuf
} // namespace google
struct grpc_call;
namespace grpc {
class ChannelInterface;
class CallOpBuffer final {
public:
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(Status *status);
void FinalizeResult();
private:
static const size_t MAX_OPS = 6;
grpc_op ops_[MAX_OPS];
int num_ops_ = 0;
};
// Straightforward wrapping of the C call object
class Call final {
public:
Call(grpc_call *call, ChannelInterface *channel);
void PerformOps(const CallOpBuffer &buffer, void *tag);
private:
ChannelInterface *const channel_;
};
} // namespace grpc
#endif // __GRPCPP_CALL_INTERFACE_H__

@ -39,30 +39,40 @@
namespace google {
namespace protobuf {
class Message;
}
}
} // namespace protobuf
} // namespace google
struct grpc_call;
namespace grpc {
class ClientContext;
class CompletionQueue;
class RpcMethod;
class StreamContextInterface;
class CallInterface;
class ChannelInterface {
public:
virtual ~ChannelInterface() {}
virtual Status StartBlockingRpc(const RpcMethod& method,
ClientContext* context,
const google::protobuf::Message& request,
google::protobuf::Message* result) = 0;
virtual StreamContextInterface* CreateStream(
const RpcMethod& method, ClientContext* context,
const google::protobuf::Message* request,
google::protobuf::Message* result) = 0;
virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq);
};
// Wrapper that begins an asynchronous unary call
void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);
// Wrapper that performs a blocking unary call
Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result);
} // namespace grpc
#endif // __GRPCPP_CHANNEL_INTERFACE_H__

@ -49,7 +49,9 @@ class CompletionQueue {
// Blocking read from queue.
// Returns true if an event was received, false if the queue is ready
// for destruction.
bool Next(void** tag);
bool Next(void **tag, bool *ok);
bool Pluck(void *tag);
// Prepare a tag for the C api
// Given a tag we'd like to receive from Next, what tag should we pass

@ -48,8 +48,8 @@ struct grpc_server;
namespace google {
namespace protobuf {
class Message;
}
}
} // namespace protobuf
} // namespace google
namespace grpc {
class AsyncServerContext;

@ -34,6 +34,9 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
#include <grpc++/call.h>
#include <grpc++/channel_interface.h>
#include <grpc++/completion_queue.h>
#include <grpc++/stream_context_interface.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@ -45,16 +48,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
// Try to cancel the stream. Wait() still needs to be called to get the final
// status. Cancelling after the stream has finished has no effects.
virtual void Cancel() = 0;
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
virtual const Status& Wait() = 0;
virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@ -82,95 +81,127 @@ class WriterInterface {
};
template <class R>
class ClientReader : public ClientStreamingInterface,
public ReaderInterface<R> {
class ClientReader final : public ClientStreamingInterface,
public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
explicit ClientReader(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_);
context_->Start(true);
context_->Write(context_->request(), true);
explicit ClientReader(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request)
: call_(channel->CreateCall(method, context, &cq_), channel) {
CallOpBuffer buf;
buf.AddSendMessage(request);
buf.AddClientSendClose();
call_.PerformOps(buf, (void *)1);
cq_.Pluck((void *)1);
}
~ClientReader() { delete context_; }
virtual bool Read(R* msg) { return context_->Read(msg); }
virtual void Cancel() { context_->Cancel(); }
virtual bool Read(R *msg) {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_.PerformOps(buf, (void *)2);
return cq_.Pluck((void *)2);
}
virtual const Status& Wait() { return context_->Wait(); }
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(buf, (void *)3);
GPR_ASSERT(cq_.Pluck((void *)3));
return status;
}
private:
StreamContextInterface* const context_;
CompletionQueue cq_;
Call call_;
};
template <class W>
class ClientWriter : public ClientStreamingInterface,
public WriterInterface<W> {
class ClientWriter final : public ClientStreamingInterface,
public WriterInterface<W> {
public:
// Blocking create a stream.
explicit ClientWriter(StreamContextInterface* context) : context_(context) {
GPR_ASSERT(context_);
context_->Start(false);
}
~ClientWriter() { delete context_; }
explicit ClientWriter(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
google::protobuf::Message *response)
: response_(response),
call_(channel->CreateCall(method, context, &cq_), channel) {}
virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false);
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(buf, (void *)2);
return cq_.Pluck((void *)2);
}
virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->Cancel(); }
virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(buf, (void *)3);
return cq_.Pluck((void *)3);
}
// Read the final response and wait for the final status.
virtual const Status& Wait() {
bool success = context_->Read(context_->response());
if (!success) {
Cancel();
} else {
success = context_->Read(nullptr);
if (success) {
Cancel();
}
}
return context_->Wait();
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(buf, (void *)4);
GPR_ASSERT(cq_.Pluck((void *)4));
return status;
}
private:
StreamContextInterface* const context_;
google::protobuf::Message *const response_;
CompletionQueue cq_;
Call call_;
};
// Client-side interface for bi-directional streaming.
template <class W, class R>
class ClientReaderWriter : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
class ClientReaderWriter final : public ClientStreamingInterface,
public WriterInterface<W>,
public ReaderInterface<R> {
public:
// Blocking create a stream.
explicit ClientReaderWriter(StreamContextInterface* context)
: context_(context) {
GPR_ASSERT(context_);
context_->Start(false);
explicit ClientReaderWriter(ChannelInterface *channel,
const RpcMethod &method, ClientContext *context)
: call_(channel->CreateCall(method, context, &cq_), channel) {}
virtual bool Read(R *msg) {
CallOpBuffer buf;
buf.AddRecvMessage(msg);
call_.PerformOps(buf, (void *)2);
return cq_.Pluck((void *)2);
}
~ClientReaderWriter() { delete context_; }
virtual bool Read(R* msg) { return context_->Read(msg); }
virtual bool Write(const W& msg) {
return context_->Write(const_cast<W*>(&msg), false);
CallOpBuffer buf;
buf.AddSendMessage(msg);
call_.PerformOps(buf, (void *)3);
return cq_.Pluck((void *)3);
}
virtual void WritesDone() { context_->Write(nullptr, true); }
virtual void Cancel() { context_->Cancel(); }
virtual bool WritesDone() {
CallOpBuffer buf;
buf.AddClientSendClose();
call_.PerformOps(buf, (void *)4);
return cq_.Pluck((void *)4);
}
virtual const Status& Wait() { return context_->Wait(); }
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&status);
call_.PerformOps(buf, (void *)5);
GPR_ASSERT(cq_.Pluck((void *)5));
return status;
}
private:
StreamContextInterface* const context_;
CompletionQueue cq_;
Call call_;
};
template <class R>

@ -268,7 +268,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
" return channel()->StartBlockingRpc("
"return ::grpc::BlockingUnaryCall(channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), "
"context, request, response);\n"
"}\n\n");
@ -279,10 +279,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return new ::grpc::ClientWriter< $Request$>("
"channel()->CreateStream("
"channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
"context, nullptr, response));\n"
"context, response);\n"
"}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
@ -291,10 +291,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$* request) {\n");
printer->Print(*vars,
" return new ::grpc::ClientReader< $Response$>("
"channel()->CreateStream("
"channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
"context, request, nullptr));\n"
"context, *request);\n"
"}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
@ -304,10 +304,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
printer->Print(
*vars,
" return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
"channel()->CreateStream("
"channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
"context, nullptr, nullptr));\n"
"context);\n"
"}\n\n");
}
}

@ -77,103 +77,13 @@ Channel::Channel(const grpc::string &target,
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
namespace {
// Pluck the finished event and set to status when it is not nullptr.
void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag,
Status *status) {
grpc_event *ev =
grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future);
if (status) {
StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
grpc::string details(ev->data.finished.details ? ev->data.finished.details
: "");
*status = Status(error_code, details);
}
grpc_event_finish(ev);
}
} // namespace
// TODO(yangg) more error handling
Status Channel::StartBlockingRpc(const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result) {
Status status;
grpc_call *call = grpc_channel_create_call_old(
c_channel_, method.name(), target_.c_str(), context->RawDeadline());
context->set_call(call);
grpc_event *ev;
void *finished_tag = reinterpret_cast<char *>(call);
void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
void *write_tag = reinterpret_cast<char *>(call) + 3;
void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
void *read_tag = reinterpret_cast<char *>(call) + 5;
grpc_completion_queue *cq = grpc_completion_queue_create();
context->set_cq(cq);
// add_metadata from context
//
// invoke
GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
// write request
grpc_byte_buffer *write_buffer = nullptr;
bool success = SerializeProto(request, &write_buffer);
if (!success) {
grpc_call_cancel(call);
status =
Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
GetFinalStatus(cq, finished_tag, nullptr);
return status;
}
GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag,
GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
grpc_byte_buffer_destroy(write_buffer);
ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
success = ev->data.write_accepted == GRPC_OP_OK;
grpc_event_finish(ev);
if (!success) {
GetFinalStatus(cq, finished_tag, &status);
return status;
}
// writes done
GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK);
ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
grpc_event_finish(ev);
// start read metadata
//
ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
grpc_event_finish(ev);
// start read
GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK);
ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
if (ev->data.read) {
if (!DeserializeProto(ev->data.read, result)) {
grpc_event_finish(ev);
status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
GetFinalStatus(cq, finished_tag, nullptr);
return status;
}
}
grpc_event_finish(ev);
// wait status
GetFinalStatus(cq, finished_tag, &status);
return status;
}
StreamContextInterface *Channel::CreateStream(
const RpcMethod &method, ClientContext *context,
const google::protobuf::Message *request,
google::protobuf::Message *result) {
grpc_call *call = grpc_channel_create_call_old(
c_channel_, method.name(), target_.c_str(), context->RawDeadline());
context->set_call(call);
grpc_completion_queue *cq = grpc_completion_queue_create();
context->set_cq(cq);
return new StreamContext(method, context, request, result);
grpc_call *Channel::CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq) {
auto c_call =
grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
target_.c_str(), context->RawDeadline());
context->set_call(c_call);
return c_call;
}
} // namespace grpc

@ -43,10 +43,11 @@ struct grpc_channel;
namespace grpc {
class ChannelArguments;
class CompletionQueue;
class Credentials;
class StreamContextInterface;
class Channel : public ChannelInterface {
class Channel final : public ChannelInterface {
public:
Channel(const grpc::string &target, const ChannelArguments &args);
Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds,
@ -54,14 +55,8 @@ class Channel : public ChannelInterface {
~Channel() override;
Status StartBlockingRpc(const RpcMethod &method, ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result) override;
StreamContextInterface *CreateStream(
const RpcMethod &method, ClientContext *context,
const google::protobuf::Message *request,
google::protobuf::Message *result) override;
virtual grpc_call *CreateCall(const RpcMethod &method, ClientContext *context,
CompletionQueue *cq);
private:
const grpc::string target_;

@ -0,0 +1,42 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <include/grpc++/call.h>
namespace grpc {
void Call::PerformOps(const CallOpBuffer& buffer, void* tag) {
channel_->PerformOpsOnCall(buffer, tag, call_);
}
} // namespace grpc

@ -55,19 +55,16 @@ class EventDeleter {
void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); }
};
bool CompletionQueue::Next(void **tag) {
bool CompletionQueue::Next(void **tag, bool *ok) {
std::unique_ptr<grpc_event, EventDeleter> ev;
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
if (!ev) {
gpr_log(GPR_ERROR, "no next event in queue");
abort();
}
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
}
std::unique_ptr<FinishFunc> func(static_cast<FinishFunc*>(ev->tag));
*tag = (*func)();
*ok = (ev->data.op_complete == GRPC_OP_OK);
return true;
}

@ -53,7 +53,6 @@ class ServerRpcHandler {
void StartRpc();
private:
CompletionQueue::CompletionType WaitForNextEvent();
void FinishRpc(const Status &status);
std::unique_ptr<AsyncServerContext> async_server_context_;

Loading…
Cancel
Save