Add NextMessageSize method to all readable streams as an upper-bound

on the actual message size.
Rename Size of FCUnary to NextMessageSize for consistency
pull/7018/head
vjpai 9 years ago
parent a12276932d
commit fcb98a578c
  1. 10
      include/grpc++/impl/codegen/fc_unary.h
  2. 3
      include/grpc++/impl/codegen/method_handler_impl.h
  3. 11
      include/grpc++/impl/codegen/sync_stream.h
  4. 3
      test/cpp/end2end/mock_test.cc

@ -45,16 +45,17 @@ namespace grpc {
/// of a hybrid between conventional unary and streaming. This is invoked /// of a hybrid between conventional unary and streaming. This is invoked
/// through a unary call on the client side, but the server responds to it /// through a unary call on the client side, but the server responds to it
/// as though it were a single-ping-pong streaming call. The server can use /// as though it were a single-ping-pong streaming call. The server can use
/// the \a Size method to determine an upper-bound on the size of the message /// the \a NextMessageSize method to determine an upper-bound on the size of
/// the message.
/// A key difference relative to streaming: an FCUnary must have exactly 1 Read /// A key difference relative to streaming: an FCUnary must have exactly 1 Read
/// and exactly 1 Write, in that order, to function correctly. /// and exactly 1 Write, in that order, to function correctly.
/// Otherwise, the RPC is in error /// Otherwise, the RPC is in error.
template <class RequestType, class ResponseType> template <class RequestType, class ResponseType>
class FCUnary GRPC_FINAL { class FCUnary GRPC_FINAL {
public: public:
FCUnary(Call* call, ServerContext* ctx, int max_message_size): call_(call), ctx_(ctx), max_msg_size_(max_message_size), read_done_(false), write_done_(false) {} FCUnary(Call* call, ServerContext* ctx): call_(call), ctx_(ctx), read_done_(false), write_done_(false) {}
~FCUnary() {} ~FCUnary() {}
uint32_t Size() {return max_msg_size_;} uint32_t NextMessageSize() {return call_->max_message_size();}
bool Read(RequestType *request) { bool Read(RequestType *request) {
if (read_done_) { if (read_done_) {
return false; return false;
@ -88,7 +89,6 @@ template <class RequestType, class ResponseType>
private: private:
Call* const call_; Call* const call_;
ServerContext* const ctx_; ServerContext* const ctx_;
const int max_msg_size_;
bool read_done_; bool read_done_;
bool write_done_; bool write_done_;
}; };

@ -204,8 +204,7 @@ class FCUnaryMethodHandler : public MethodHandler {
void RunHandler(const HandlerParameter& param) GRPC_FINAL { void RunHandler(const HandlerParameter& param) GRPC_FINAL {
FCUnary<RequestType, ResponseType> fc_unary(param.call, FCUnary<RequestType, ResponseType> fc_unary(param.call,
param.server_context, param.server_context);
param.max_message_size);
Status status = func_(service_, param.server_context, &fc_unary); Status status = func_(service_, param.server_context, &fc_unary);
if (!param.server_context->sent_initial_metadata_) { if (!param.server_context->sent_initial_metadata_) {
// means that the write never happened, which is bad // means that the write never happened, which is bad

@ -70,6 +70,9 @@ class ReaderInterface {
public: public:
virtual ~ReaderInterface() {} virtual ~ReaderInterface() {}
/// Upper bound on the next message size available for reading on this stream
virtual uint32_t NextMessageSize() = 0;
/// Blocking read a message and parse to \a msg. Returns \a true on success. /// Blocking read a message and parse to \a msg. Returns \a true on success.
/// ///
/// \param[out] msg The read message. /// \param[out] msg The read message.
@ -143,6 +146,8 @@ class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
cq_.Pluck(&ops); /// status ignored cq_.Pluck(&ops); /// status ignored
} }
uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();}
bool Read(R* msg) GRPC_OVERRIDE { bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
@ -286,6 +291,8 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
cq_.Pluck(&ops); // status ignored cq_.Pluck(&ops); // status ignored
} }
uint32_t NextMessageSize() GRPC_OVERRIDE {return call_.max_message_size();}
bool Read(R* msg) GRPC_OVERRIDE { bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops; CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
if (!context_->initial_metadata_received_) { if (!context_->initial_metadata_received_) {
@ -345,6 +352,8 @@ class ServerReader GRPC_FINAL : public ReaderInterface<R> {
call_->cq()->Pluck(&ops); call_->cq()->Pluck(&ops);
} }
uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();}
bool Read(R* msg) GRPC_OVERRIDE { bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops; CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg); ops.RecvMessage(msg);
@ -411,6 +420,8 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
call_->cq()->Pluck(&ops); call_->cq()->Pluck(&ops);
} }
uint32_t NextMessageSize() GRPC_OVERRIDE {return call_->max_message_size();}
bool Read(R* msg) GRPC_OVERRIDE { bool Read(R* msg) GRPC_OVERRIDE {
CallOpSet<CallOpRecvMessage<R>> ops; CallOpSet<CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg); ops.RecvMessage(msg);

@ -31,6 +31,7 @@
* *
*/ */
#include <climits>
#include <thread> #include <thread>
#include <grpc++/channel.h> #include <grpc++/channel.h>
@ -63,6 +64,7 @@ class MockClientReaderWriter GRPC_FINAL
: public ClientReaderWriterInterface<W, R> { : public ClientReaderWriterInterface<W, R> {
public: public:
void WaitForInitialMetadata() GRPC_OVERRIDE {} void WaitForInitialMetadata() GRPC_OVERRIDE {}
uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;}
bool Read(R* msg) GRPC_OVERRIDE { return true; } bool Read(R* msg) GRPC_OVERRIDE { return true; }
bool Write(const W& msg) GRPC_OVERRIDE { return true; } bool Write(const W& msg) GRPC_OVERRIDE { return true; }
bool WritesDone() GRPC_OVERRIDE { return true; } bool WritesDone() GRPC_OVERRIDE { return true; }
@ -74,6 +76,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> GRPC_FINAL
public: public:
MockClientReaderWriter() : writes_done_(false) {} MockClientReaderWriter() : writes_done_(false) {}
void WaitForInitialMetadata() GRPC_OVERRIDE {} void WaitForInitialMetadata() GRPC_OVERRIDE {}
uint32_t NextMessageSize() GRPC_OVERRIDE {return UINT_MAX;}
bool Read(EchoResponse* msg) GRPC_OVERRIDE { bool Read(EchoResponse* msg) GRPC_OVERRIDE {
if (writes_done_) return false; if (writes_done_) return false;
msg->set_message(last_message_); msg->set_message(last_message_);

Loading…
Cancel
Save