Merge pull request #1003 from yang-g/untypedAPI

Anonymous API implementation
pull/1068/head
Vijay Pai 10 years ago
commit d3d55c76f1
  1. 68
      Makefile
  2. 22
      build.json
  3. 79
      include/grpc++/async_generic_service.h
  4. 84
      include/grpc++/byte_buffer.h
  5. 62
      include/grpc++/generic_stub.h
  6. 11
      include/grpc++/impl/call.h
  7. 8
      include/grpc++/server.h
  8. 5
      include/grpc++/server_builder.h
  9. 2
      include/grpc++/server_context.h
  10. 74
      include/grpc++/slice.h
  11. 78
      src/cpp/common/call.cc
  12. 51
      src/cpp/server/async_generic_service.cc
  13. 59
      src/cpp/server/server.cc
  14. 16
      src/cpp/server/server_builder.cc
  15. 76
      src/cpp/util/byte_buffer.cc
  16. 50
      src/cpp/util/slice.cc
  17. 271
      test/cpp/end2end/generic_end2end_test.cc
  18. 5
      tools/run_tests/tests.json

File diff suppressed because one or more lines are too long

@ -12,7 +12,9 @@
{
"name": "grpc++_base",
"public_headers": [
"include/grpc++/async_generic_service.h",
"include/grpc++/async_unary_call.h",
"include/grpc++/byte_buffer.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/client_context.h",
@ -30,6 +32,7 @@
"include/grpc++/server_builder.h",
"include/grpc++/server_context.h",
"include/grpc++/server_credentials.h",
"include/grpc++/slice.h",
"include/grpc++/status.h",
"include/grpc++/status_code_enum.h",
"include/grpc++/stream.h",
@ -54,12 +57,15 @@
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
"src/cpp/proto/proto_utils.cc",
"src/cpp/server/async_generic_service.cc",
"src/cpp/server/insecure_server_credentials.cc",
"src/cpp/server/server.cc",
"src/cpp/server/server_builder.cc",
"src/cpp/server/server_context.cc",
"src/cpp/server/server_credentials.cc",
"src/cpp/server/thread_pool.cc",
"src/cpp/util/byte_buffer.cc",
"src/cpp/util/slice.cc",
"src/cpp/util/status.cc",
"src/cpp/util/time.cc"
]
@ -1698,6 +1704,22 @@
"gpr"
]
},
{
"name": "generic_end2end_test",
"build": "test",
"language": "c++",
"src": [
"test/cpp/end2end/generic_end2end_test.cc"
],
"deps": [
"grpc++_test_util",
"grpc_test_util",
"grpc++",
"grpc",
"gpr_test_util",
"gpr"
]
},
{
"name": "grpc_cpp_plugin",
"build": "protoc",

@ -0,0 +1,79 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_ASYNC_GENERIC_SERVICE_H
#define GRPCXX_ASYNC_GENERIC_SERVICE_H
#include <grpc++/byte_buffer.h>
#include <grpc++/stream.h>
struct grpc_server;
namespace grpc {
typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> GenericServerAsyncReaderWriter;
class GenericServerContext GRPC_FINAL : public ServerContext {
public:
const grpc::string& method() const { return method_; }
const grpc::string& host() const { return host_; }
private:
friend class Server;
grpc::string method_;
grpc::string host_;
};
class AsyncGenericService GRPC_FINAL {
public:
// TODO(yangg) Once we can add multiple completion queues to the server
// in c core, add a CompletionQueue* argument to the ctor here.
// TODO(yangg) support methods list.
AsyncGenericService(const grpc::string& methods) : server_(nullptr) {}
void RequestCall(GenericServerContext* ctx,
GenericServerAsyncReaderWriter* reader_writer,
CompletionQueue* cq, void* tag);
// The new rpc event should be obtained from this completion queue.
CompletionQueue* completion_queue();
private:
friend class Server;
Server* server_;
};
} // namespace grpc
#endif // GRPCXX_ASYNC_GENERIC_SERVICE_H

@ -0,0 +1,84 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_BYTE_BUFFER_H
#define GRPCXX_BYTE_BUFFER_H
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>
#include <vector>
namespace grpc {
class ByteBuffer GRPC_FINAL {
public:
ByteBuffer() : buffer_(nullptr) {}
ByteBuffer(Slice* slices, size_t nslices);
~ByteBuffer() {
if (buffer_) {
grpc_byte_buffer_destroy(buffer_);
}
}
void Dump(std::vector<Slice>* slices);
void Clear();
size_t Length();
private:
friend class CallOpBuffer;
// takes ownership
void set_buffer(grpc_byte_buffer* buf) {
if (buffer_) {
gpr_log(GPR_ERROR, "Overriding existing buffer");
Clear();
}
buffer_ = buf;
}
grpc_byte_buffer* buffer() const {
return buffer_;
}
grpc_byte_buffer* buffer_;
};
} // namespace grpc
#endif // GRPCXX_BYTE_BUFFER_H

@ -0,0 +1,62 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_GENERIC_STUB_H
#define GRPCXX_GENERIC_STUB_H
#include <grpc++/byte_buffer.h>
#include <grpc++/stream.h>
namespace grpc {
typedef ClientAsyncReaderWriter<ByteBuffer, ByteBuffer>
GenericClientAsyncReaderWriter;
// Generic stubs provide a type-unsafe interface to call gRPC methods
// by name.
class GenericStub GRPC_FINAL {
public:
explicit GenericStub(std::shared_ptr<ChannelInterface> channel)
: channel_(channel) {}
// begin a call to a named method
std::unique_ptr<GenericClientAsyncReaderWriter> Call(
ClientContext* context, const grpc::string& method);
private:
std::shared_ptr<ChannelInterface> channel_;
};
} // namespace grpc
#endif // GRPCXX_GENERIC_STUB_H

@ -35,9 +35,9 @@
#define GRPCXX_IMPL_CALL_H
#include <grpc/grpc.h>
#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/status.h>
#include <grpc++/completion_queue.h>
#include <memory>
#include <map>
@ -47,6 +47,7 @@ struct grpc_op;
namespace grpc {
class ByteBuffer;
class Call;
class CallOpBuffer : public CompletionQueueTag {
@ -62,7 +63,9 @@ class CallOpBuffer : public CompletionQueueTag {
void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(ClientContext *ctx);
void AddSendMessage(const grpc::protobuf::Message &message);
void AddSendMessage(const ByteBuffer& message);
void AddRecvMessage(grpc::protobuf::Message *message);
void AddRecvMessage(ByteBuffer *message);
void AddClientSendClose();
void AddClientRecvStatus(ClientContext *ctx, Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
@ -90,10 +93,12 @@ class CallOpBuffer : public CompletionQueueTag {
grpc_metadata_array recv_initial_metadata_arr_;
// Send message
const grpc::protobuf::Message *send_message_;
grpc_byte_buffer *send_message_buf_;
const ByteBuffer *send_message_buffer_;
grpc_byte_buffer *send_buf_;
// Recv message
grpc::protobuf::Message *recv_message_;
grpc_byte_buffer *recv_message_buf_;
ByteBuffer *recv_message_buffer_;
grpc_byte_buffer *recv_buf_;
// Client send close
bool client_send_close_;
// Client recv status

@ -49,6 +49,8 @@ struct grpc_server;
namespace grpc {
class AsynchronousService;
class GenericServerContext;
class AsyncGenericService;
class RpcService;
class RpcServiceMethod;
class ServerCredentials;
@ -69,6 +71,7 @@ class Server GRPC_FINAL : private CallHook,
void Wait();
private:
friend class AsyncGenericService;
friend class ServerBuilder;
class SyncRequest;
@ -81,6 +84,7 @@ class Server GRPC_FINAL : private CallHook,
// The service must exist for the lifetime of the Server instance.
bool RegisterService(RpcService* service);
bool RegisterAsyncService(AsynchronousService* service);
void RegisterAsyncGenericService(AsyncGenericService* service);
// Add a listening port. Can be called multiple times.
int AddPort(const grpc::string& addr, ServerCredentials* creds);
// Start the server.
@ -98,6 +102,10 @@ class Server GRPC_FINAL : private CallHook,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) GRPC_OVERRIDE;
void RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag);
// Completion queue.
CompletionQueue cq_;

@ -41,6 +41,7 @@
namespace grpc {
class AsyncGenericService;
class AsynchronousService;
class CompletionQueue;
class RpcService;
@ -64,6 +65,9 @@ class ServerBuilder {
// instance returned by BuildAndStart().
void RegisterAsyncService(AsynchronousService* service);
// Register a generic service.
void RegisterAsyncGenericService(AsyncGenericService* service);
// Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
@ -87,6 +91,7 @@ class ServerBuilder {
std::vector<AsynchronousService*> async_services_;
std::vector<Port> ports_;
std::shared_ptr<ServerCredentials> creds_;
AsyncGenericService* generic_service_;
ThreadPoolInterface* thread_pool_;
};

@ -66,7 +66,7 @@ class CompletionQueue;
class Server;
// Interface of server side rpc context.
class ServerContext GRPC_FINAL {
class ServerContext {
public:
ServerContext(); // for async calls
~ServerContext();

@ -0,0 +1,74 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPCXX_SLICE_H
#define GRPCXX_SLICE_H
#include <grpc/support/slice.h>
#include <grpc++/config.h>
namespace grpc {
class Slice GRPC_FINAL {
public:
// construct empty slice
Slice();
// destructor - drops one ref
~Slice();
// construct slice from grpc slice, adding a ref
enum AddRef { ADD_REF };
Slice(gpr_slice slice, AddRef);
// construct slice from grpc slice, stealing a ref
enum StealRef { STEAL_REF };
Slice(gpr_slice slice, StealRef);
// copy constructor - adds a ref
Slice(const Slice& other);
// assignment - ref count is unchanged
Slice& operator=(Slice other) {
std::swap(slice_, other.slice_);
return *this;
}
size_t size() const { return GPR_SLICE_LENGTH(slice_); }
const gpr_uint8* begin() const { return GPR_SLICE_START_PTR(slice_); }
const gpr_uint8* end() const { return GPR_SLICE_END_PTR(slice_); }
private:
friend class ByteBuffer;
gpr_slice slice_;
};
} // namespace grpc
#endif // GRPCXX_SLICE_H

@ -31,8 +31,10 @@
*
*/
#include <grpc/support/alloc.h>
#include <grpc++/impl/call.h>
#include <grpc/support/alloc.h>
#include <grpc++/byte_buffer.h>
#include <grpc++/client_context.h>
#include <grpc++/channel_interface.h>
@ -48,9 +50,11 @@ CallOpBuffer::CallOpBuffer()
recv_initial_metadata_(nullptr),
recv_initial_metadata_arr_{0, 0, nullptr},
send_message_(nullptr),
send_message_buf_(nullptr),
send_message_buffer_(nullptr),
send_buf_(nullptr),
recv_message_(nullptr),
recv_message_buf_(nullptr),
recv_message_buffer_(nullptr),
recv_buf_(nullptr),
client_send_close_(false),
recv_trailing_metadata_(nullptr),
recv_status_(nullptr),
@ -74,18 +78,20 @@ void CallOpBuffer::Reset(void* next_return_tag) {
recv_initial_metadata_ = nullptr;
recv_initial_metadata_arr_.count = 0;
send_message_ = nullptr;
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
if (send_buf_ && send_message_) {
grpc_byte_buffer_destroy(send_buf_);
}
send_message_ = nullptr;
send_message_buffer_ = nullptr;
send_buf_ = nullptr;
recv_message_ = nullptr;
got_message = false;
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
if (recv_buf_ && recv_message_) {
grpc_byte_buffer_destroy(recv_buf_);
}
recv_message_ = nullptr;
recv_message_buffer_ = nullptr;
recv_buf_ = nullptr;
client_send_close_ = false;
@ -106,11 +112,11 @@ CallOpBuffer::~CallOpBuffer() {
gpr_free(status_details_);
gpr_free(recv_initial_metadata_arr_.metadata);
gpr_free(recv_trailing_metadata_arr_.metadata);
if (recv_message_buf_) {
grpc_byte_buffer_destroy(recv_message_buf_);
if (recv_buf_ && recv_message_) {
grpc_byte_buffer_destroy(recv_buf_);
}
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
if (send_buf_ && send_message_) {
grpc_byte_buffer_destroy(send_buf_);
}
}
@ -166,11 +172,19 @@ void CallOpBuffer::AddSendMessage(const grpc::protobuf::Message& message) {
send_message_ = &message;
}
void CallOpBuffer::AddSendMessage(const ByteBuffer& message) {
send_message_buffer_ = &message;
}
void CallOpBuffer::AddRecvMessage(grpc::protobuf::Message* message) {
recv_message_ = message;
recv_message_->Clear();
}
void CallOpBuffer::AddRecvMessage(ByteBuffer* message) {
recv_message_buffer_ = message;
}
void CallOpBuffer::AddClientSendClose() { client_send_close_ = true; }
void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
@ -206,19 +220,23 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
ops[*nops].data.recv_initial_metadata = &recv_initial_metadata_arr_;
(*nops)++;
}
if (send_message_ || send_message_buffer_) {
if (send_message_) {
bool success = SerializeProto(*send_message_, &send_message_buf_);
bool success = SerializeProto(*send_message_, &send_buf_);
if (!success) {
abort();
// TODO handle parse failure
}
} else {
send_buf_ = send_message_buffer_->buffer();
}
ops[*nops].op = GRPC_OP_SEND_MESSAGE;
ops[*nops].data.send_message = send_message_buf_;
ops[*nops].data.send_message = send_buf_;
(*nops)++;
}
if (recv_message_) {
if (recv_message_ || recv_message_buffer_) {
ops[*nops].op = GRPC_OP_RECV_MESSAGE;
ops[*nops].data.recv_message = &recv_message_buf_;
ops[*nops].data.recv_message = &recv_buf_;
(*nops)++;
}
if (client_send_close_) {
@ -256,9 +274,11 @@ void CallOpBuffer::FillOps(grpc_op* ops, size_t* nops) {
bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
// Release send buffers.
if (send_message_buf_) {
grpc_byte_buffer_destroy(send_message_buf_);
send_message_buf_ = nullptr;
if (send_buf_ && send_message_) {
if (send_message_) {
grpc_byte_buffer_destroy(send_buf_);
}
send_buf_ = nullptr;
}
if (initial_metadata_) {
gpr_free(initial_metadata_);
@ -275,12 +295,16 @@ bool CallOpBuffer::FinalizeResult(void** tag, bool* status) {
FillMetadataMap(&recv_initial_metadata_arr_, recv_initial_metadata_);
}
// Parse received message if any.
if (recv_message_) {
if (recv_message_buf_) {
if (recv_message_ || recv_message_buffer_) {
if (recv_buf_) {
got_message = *status;
*status = *status && DeserializeProto(recv_message_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_message_buf_);
recv_message_buf_ = nullptr;
if (recv_message_) {
*status = *status && DeserializeProto(recv_buf_, recv_message_);
grpc_byte_buffer_destroy(recv_buf_);
} else {
recv_message_buffer_->set_buffer(recv_buf_);
}
recv_buf_ = nullptr;
} else {
// Read failed
got_message = false;

@ -0,0 +1,51 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/async_generic_service.h>
#include <grpc++/server.h>
namespace grpc {
void AsyncGenericService::RequestCall(
GenericServerContext* ctx, GenericServerAsyncReaderWriter* reader_writer,
CompletionQueue* cq, void* tag) {
server_->RequestAsyncGenericCall(ctx, reader_writer, cq, tag);
}
CompletionQueue* AsyncGenericService::completion_queue() {
return &server_->cq_;
}
} // namespace grpc

@ -36,8 +36,10 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc++/completion_queue.h>
#include <grpc++/async_generic_service.h>
#include <grpc++/impl/rpc_service_method.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server_context.h>
@ -226,6 +228,12 @@ bool Server::RegisterAsyncService(AsynchronousService* service) {
return true;
}
void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
}
int Server::AddPort(const grpc::string& addr, ServerCredentials* creds) {
GPR_ASSERT(!started_);
return creds->AddPortToServer(addr, server_);
@ -289,15 +297,36 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
stream_(stream),
cq_(cq),
ctx_(ctx),
generic_ctx_(nullptr),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
grpc_server_request_registered_call(
server->server_, registered_method, &call_, &deadline_, &array_,
request ? &payload_ : nullptr, cq->cq(), this);
server->server_, registered_method, &call_, &call_details_.deadline,
&array_, request ? &payload_ : nullptr, cq->cq(), this);
}
AsyncRequest(Server* server, GenericServerContext* ctx,
ServerAsyncStreamingInterface* stream, CompletionQueue* cq,
void* tag)
: tag_(tag),
request_(nullptr),
stream_(stream),
cq_(cq),
ctx_(nullptr),
generic_ctx_(ctx),
server_(server),
call_(nullptr),
payload_(nullptr) {
memset(&array_, 0, sizeof(array_));
grpc_call_details_init(&call_details_);
grpc_server_request_call(
server->server_, &call_, &call_details_, &array_, cq->cq(), this);
}
~AsyncRequest() {
if (payload_) {
grpc_byte_buffer_destroy(payload_);
@ -315,20 +344,29 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
*status = false;
}
}
ServerContext* ctx = ctx_ ? ctx_ : generic_ctx_;
GPR_ASSERT(ctx);
if (*status) {
ctx_->deadline_ = Timespec2Timepoint(deadline_);
ctx->deadline_ = Timespec2Timepoint(call_details_.deadline);
for (size_t i = 0; i < array_.count; i++) {
ctx_->client_metadata_.insert(std::make_pair(
ctx->client_metadata_.insert(std::make_pair(
grpc::string(array_.metadata[i].key),
grpc::string(
array_.metadata[i].value,
array_.metadata[i].value + array_.metadata[i].value_length)));
}
if (generic_ctx_) {
// TODO(yangg) remove the copy here.
generic_ctx_->method_ = call_details_.method;
generic_ctx_->host_ = call_details_.host;
gpr_free(call_details_.method);
gpr_free(call_details_.host);
}
}
ctx_->call_ = call_;
ctx->call_ = call_;
Call call(call_, server_, cq_);
if (orig_status && call_) {
ctx_->BeginCompletionOp(&call);
ctx->BeginCompletionOp(&call);
}
// just the pointers inside call are copied here
stream_->BindCall(&call);
@ -342,9 +380,10 @@ class Server::AsyncRequest GRPC_FINAL : public CompletionQueueTag {
ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const cq_;
ServerContext* const ctx_;
GenericServerContext* const generic_ctx_;
Server* const server_;
grpc_call* call_;
gpr_timespec deadline_;
grpc_call_details call_details_;
grpc_metadata_array array_;
grpc_byte_buffer* payload_;
};
@ -356,6 +395,12 @@ void Server::RequestAsyncCall(void* registered_method, ServerContext* context,
new AsyncRequest(this, registered_method, context, request, stream, cq, tag);
}
void Server::RequestAsyncGenericCall(GenericServerContext* context,
ServerAsyncStreamingInterface* stream,
CompletionQueue* cq, void* tag) {
new AsyncRequest(this, context, stream, cq, tag);
}
void Server::ScheduleCallback() {
{
std::unique_lock<std::mutex> lock(mu_);

@ -41,7 +41,8 @@
namespace grpc {
ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {}
ServerBuilder::ServerBuilder()
: generic_service_(nullptr), thread_pool_(nullptr) {}
void ServerBuilder::RegisterService(SynchronousService* service) {
services_.push_back(service->service());
@ -51,6 +52,16 @@ void ServerBuilder::RegisterAsyncService(AsynchronousService* service) {
async_services_.push_back(service);
}
void ServerBuilder::RegisterAsyncGenericService(AsyncGenericService* service) {
if (generic_service_) {
gpr_log(GPR_ERROR,
"Adding multiple AsyncGenericService is unsupported for now. "
"Dropping the service %p", service);
return;
}
generic_service_ = service;
}
void ServerBuilder::AddPort(const grpc::string& addr,
std::shared_ptr<ServerCredentials> creds,
int* selected_port) {
@ -84,6 +95,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
}
for (auto& port : ports_) {
int r = server->AddPort(port.addr, port.creds.get());
if (!r) return nullptr;

@ -0,0 +1,76 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/byte_buffer.h>
namespace grpc {
ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
// TODO(yangg) maybe expose some core API to simplify this
std::vector<gpr_slice> c_slices(nslices);
for (size_t i = 0; i < nslices; i++) {
c_slices[i] = slices[i].slice_;
}
buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
}
void ByteBuffer::Clear() {
if (buffer_) {
grpc_byte_buffer_destroy(buffer_);
buffer_ = nullptr;
}
}
void ByteBuffer::Dump(std::vector<Slice>* slices) {
slices->clear();
if (!buffer_) {
return;
}
grpc_byte_buffer_reader* reader = grpc_byte_buffer_reader_create(buffer_);
gpr_slice s;
while (grpc_byte_buffer_reader_next(reader, &s)) {
slices->push_back(Slice(s, Slice::STEAL_REF));
gpr_slice_unref(s);
}
grpc_byte_buffer_reader_destroy(reader);
}
size_t ByteBuffer::Length() {
if (buffer_) {
return grpc_byte_buffer_length(buffer_);
} else {
return 0;
}
}
} // namespace grpc

@ -0,0 +1,50 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc++/slice.h>
namespace grpc {
Slice::Slice() : slice_(gpr_empty_slice()) {}
Slice::~Slice() {
gpr_slice_unref(slice_);
}
Slice::Slice(gpr_slice slice, AddRef) : slice_(gpr_slice_ref(slice)) {}
Slice::Slice(gpr_slice slice, StealRef) : slice_(slice) {}
Slice::Slice(const Slice& other) : slice_(gpr_slice_ref(other.slice_)) {}
} // namespace grpc

@ -0,0 +1,271 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <chrono>
#include <memory>
#include "src/cpp/proto/proto_utils.h"
#include "src/cpp/util/time.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/echo.pb.h"
#include <grpc++/async_generic_service.h>
#include <grpc++/async_unary_call.h>
#include <grpc++/byte_buffer.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/channel_interface.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
#include <grpc++/slice.h>
#include <grpc++/status.h>
#include <grpc++/stream.h>
#include <gtest/gtest.h>
#include <grpc/grpc.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
using grpc::cpp::test::util::EchoRequest;
using grpc::cpp::test::util::EchoResponse;
using std::chrono::system_clock;
namespace grpc {
namespace testing {
namespace {
void* tag(int i) { return (void*)(gpr_intptr)i; }
void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
bool ok;
void* got_tag;
EXPECT_TRUE(cq->Next(&got_tag, &ok));
EXPECT_EQ(expect_ok, ok);
EXPECT_EQ(tag(i), got_tag);
}
bool ParseFromByteBuffer(ByteBuffer* buffer, grpc::protobuf::Message* message) {
std::vector<Slice> slices;
buffer->Dump(&slices);
grpc::string buf;
buf.reserve(buffer->Length());
for (const Slice& s : slices) {
buf.append(reinterpret_cast<const char*>(s.begin()), s.size());
}
return message->ParseFromString(buf);
}
class GenericEnd2endTest : public ::testing::Test {
protected:
GenericEnd2endTest() : generic_service_("*") {}
void SetUp() GRPC_OVERRIDE {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddPort(server_address_.str(), InsecureServerCredentials());
builder.RegisterAsyncGenericService(&generic_service_);
server_ = builder.BuildAndStart();
}
void TearDown() GRPC_OVERRIDE {
server_->Shutdown();
void* ignored_tag;
bool ignored_ok;
cli_cq_.Shutdown();
srv_cq_.Shutdown();
while (cli_cq_.Next(&ignored_tag, &ignored_ok))
;
while (srv_cq_.Next(&ignored_tag, &ignored_ok))
;
}
void ResetStub() {
std::shared_ptr<ChannelInterface> channel =
CreateChannel(server_address_.str(), InsecureCredentials(), ChannelArguments());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
void server_ok(int i) { verify_ok(&srv_cq_, i, true); }
void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
void server_fail(int i) { verify_ok(&srv_cq_, i, false); }
void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
void SendRpc(int num_rpcs) {
for (int i = 0; i < num_rpcs; i++) {
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter stream(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, &cli_cq_, tag(1)));
client_ok(1);
generic_service_.RequestCall(&srv_ctx, &stream, &srv_cq_, tag(2));
verify_ok(generic_service_.completion_queue(), 2, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ("/grpc.cpp.test.util.TestService/Echo", srv_ctx.method());
ByteBuffer recv_buffer;
stream.Read(&recv_buffer, tag(3));
server_ok(3);
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
grpc::string buf;
send_response.SerializeToString(&buf);
gpr_slice s = gpr_slice_from_copied_string(buf.c_str());
Slice slice(s, Slice::STEAL_REF);
ByteBuffer send_buffer(&slice, 1);
stream.Write(send_buffer, tag(4));
server_ok(4);
stream.Finish(Status::OK, tag(5));
server_ok(5);
response_reader->Finish(&recv_response, &recv_status, tag(4));
client_ok(4);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
}
}
CompletionQueue cli_cq_;
CompletionQueue srv_cq_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server_;
AsyncGenericService generic_service_;
std::ostringstream server_address_;
};
TEST_F(GenericEnd2endTest, SimpleRpc) {
ResetStub();
SendRpc(1);
}
TEST_F(GenericEnd2endTest, SequentialRpcs) {
ResetStub();
SendRpc(10);
}
// One ping, one pong.
TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
cli_stream(stub_->AsyncBidiStream(&cli_ctx, &cli_cq_, tag(1)));
client_ok(1);
generic_service_.RequestCall(&srv_ctx, &srv_stream, &srv_cq_, tag(2));
verify_ok(generic_service_.completion_queue(), 2, true);
EXPECT_EQ(server_address_.str(), srv_ctx.host());
EXPECT_EQ("/grpc.cpp.test.util.TestService/BidiStream", srv_ctx.method());
cli_stream->Write(send_request, tag(3));
client_ok(3);
ByteBuffer recv_buffer;
srv_stream.Read(&recv_buffer, tag(4));
server_ok(4);
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
grpc::string buf;
send_response.SerializeToString(&buf);
gpr_slice s = gpr_slice_from_copied_string(buf.c_str());
Slice slice(s, Slice::STEAL_REF);
ByteBuffer send_buffer(&slice, 1);
srv_stream.Write(send_buffer, tag(5));
server_ok(5);
cli_stream->Read(&recv_response, tag(6));
client_ok(6);
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
client_ok(7);
recv_buffer.Clear();
srv_stream.Read(&recv_buffer, tag(8));
server_fail(8);
srv_stream.Finish(Status::OK, tag(9));
server_ok(9);
cli_stream->Finish(&recv_status, tag(10));
client_ok(10);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();
grpc_shutdown();
google::protobuf::ShutdownProtobufLibrary();
return result;
}

@ -356,6 +356,11 @@
"language": "c++",
"name": "end2end_test"
},
{
"flaky": false,
"language": "c++",
"name": "generic_end2end_test"
},
{
"flaky": false,
"language": "c++",

Loading…
Cancel
Save