add generic test

pull/4714/head
yang-g 9 years ago
parent 0bbc87f599
commit 269b8be406
  1. 3
      include/grpc++/generic/async_generic_service.h
  2. 3
      src/cpp/server/server.cc
  3. 125
      test/cpp/end2end/hybrid_end2end_test.cc

@ -58,9 +58,6 @@ class GenericServerContext GRPC_FINAL : public ServerContext {
class AsyncGenericService GRPC_FINAL {
public:
// TODO(yangg) Once we can add multiple completion queues to the server
// in c core, add a CompletionQueue* argument to the ctor here.
// TODO(yangg) support methods list.
AsyncGenericService() : server_(nullptr) {}
AsyncGenericService(const grpc::string& methods) : server_(nullptr) {}

@ -324,6 +324,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
}
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
if (it->get() == nullptr) { // Handled by generic service if any.
continue;
}
RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(server_, method->name(),
host ? host->c_str() : nullptr);

@ -37,6 +37,7 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/generic/async_generic_service.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
@ -47,7 +48,6 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/test_service_impl.h"
// #include "test/cpp/util/string_ref_helper.h"
namespace grpc {
namespace testing {
@ -68,6 +68,26 @@ void Verify(CompletionQueue* cq, int i, bool expect_ok) {
EXPECT_EQ(expect_ok, VerifyReturnSuccess(cq, i));
}
bool ParseFromByteBuffer(ByteBuffer* buffer, grpc::protobuf::Message* message) {
std::vector<Slice> slices;
buffer->Dump(&slices);
grpc::string buf;
buf.reserve(buffer->Length());
for (auto s = slices.begin(); s != slices.end(); s++) {
buf.append(reinterpret_cast<const char*>(s->begin()), s->size());
}
return message->ParseFromString(buf);
}
std::unique_ptr<ByteBuffer> SerializeToByteBuffer(
grpc::protobuf::Message* message) {
grpc::string buf;
message->SerializeToString(&buf);
gpr_slice s = gpr_slice_from_copied_string(buf.c_str());
Slice slice(s, Slice::STEAL_REF);
return std::unique_ptr<ByteBuffer>(new ByteBuffer(&slice, 1));
}
// Handlers to handle async request at a server. To be run in a separate thread.
template <class Service>
void HandleEcho(Service* service, ServerCompletionQueue* cq) {
@ -75,7 +95,8 @@ void HandleEcho(Service* service, ServerCompletionQueue* cq) {
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
EchoRequest recv_request;
EchoResponse send_response;
service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq, tag(1));
service->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq, cq,
tag(1));
Verify(cq, 1, true);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(2));
@ -122,11 +143,43 @@ void HandleServerStreaming(Service* service, ServerCompletionQueue* cq) {
Verify(cq, 5, true);
}
void HandleGenericEcho(GenericServerAsyncReaderWriter* stream,
CompletionQueue* cq) {
ByteBuffer recv_buffer;
stream->Read(&recv_buffer, tag(2));
Verify(cq, 2, true);
EchoRequest recv_request;
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
EchoResponse send_response;
send_response.set_message(recv_request.message());
auto send_buffer = SerializeToByteBuffer(&send_response);
stream->Write(*send_buffer, tag(3));
Verify(cq, 3, true);
stream->Finish(Status::OK, tag(4));
Verify(cq, 4, true);
}
// Request and handle one generic call.
void HandleGenericCall(AsyncGenericService* service,
ServerCompletionQueue* cq) {
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter stream(&srv_ctx);
service->RequestCall(&srv_ctx, &stream, cq, cq, tag(1));
Verify(cq, 1, true);
if (srv_ctx.method() == "/grpc.testing.EchoTestService/Echo") {
HandleGenericEcho(&stream, cq);
} else { // other methods not handled yet.
gpr_log(GPR_ERROR, "method: %s", srv_ctx.method().c_str());
GPR_ASSERT(0);
}
}
class HybridEnd2endTest : public ::testing::Test {
protected:
HybridEnd2endTest() {}
void SetUpServer(::grpc::Service* service) {
void SetUpServer(::grpc::Service* service,
AsyncGenericService* generic_service) {
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
@ -135,6 +188,9 @@ class HybridEnd2endTest : public ::testing::Test {
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
builder.RegisterService(service);
if (generic_service) {
builder.RegisterAsyncGenericService(generic_service);
}
// Create a separate cq for each potential handler.
for (int i = 0; i < 5; i++) {
cqs_.push_back(std::move(builder.AddCompletionQueue()));
@ -159,6 +215,7 @@ class HybridEnd2endTest : public ::testing::Test {
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
// Test all rpc methods.
void TestAllMethods() {
SendEcho();
SendSimpleClientStreaming();
@ -251,7 +308,7 @@ class HybridEnd2endTest : public ::testing::Test {
TEST_F(HybridEnd2endTest, AsyncEcho) {
EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> service;
SetUpServer(&service);
SetUpServer(&service, nullptr);
ResetStub();
std::thread echo_handler_thread(
[this, &service] { HandleEcho(&service, cqs_[0].get()); });
@ -260,8 +317,10 @@ TEST_F(HybridEnd2endTest, AsyncEcho) {
}
TEST_F(HybridEnd2endTest, AsyncEchoRequestStream) {
EchoTestService::WithAsyncMethod_RequestStream<EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> > service;
SetUpServer(&service);
EchoTestService::WithAsyncMethod_RequestStream<
EchoTestService::WithAsyncMethod_Echo<TestServiceImpl> >
service;
SetUpServer(&service, nullptr);
ResetStub();
std::thread echo_handler_thread(
[this, &service] { HandleEcho(&service, cqs_[0].get()); });
@ -276,15 +335,65 @@ TEST_F(HybridEnd2endTest, AsyncRequestStreamResponseStream) {
EchoTestService::WithAsyncMethod_RequestStream<
EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> >
service;
SetUpServer(&service);
SetUpServer(&service, nullptr);
ResetStub();
std::thread echo_handler_thread(
std::thread response_stream_handler_thread(
[this, &service] { HandleServerStreaming(&service, cqs_[0].get()); });
std::thread request_stream_handler_thread(
[this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
TestAllMethods();
response_stream_handler_thread.join();
request_stream_handler_thread.join();
}
TEST_F(HybridEnd2endTest, GenericEcho) {
EchoTestService::WithGenericMethod_Echo<TestServiceImpl> service;
AsyncGenericService generic_service;
SetUpServer(&service, &generic_service);
ResetStub();
std::thread echo_handler_thread([this, &generic_service] {
HandleGenericCall(&generic_service, cqs_[0].get());
});
TestAllMethods();
echo_handler_thread.join();
}
TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStream) {
EchoTestService::WithAsyncMethod_RequestStream<
EchoTestService::WithGenericMethod_Echo<TestServiceImpl> >
service;
AsyncGenericService generic_service;
SetUpServer(&service, &generic_service);
ResetStub();
std::thread echo_handler_thread([this, &generic_service] {
HandleGenericCall(&generic_service, cqs_[0].get());
});
std::thread request_stream_handler_thread(
[this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
TestAllMethods();
echo_handler_thread.join();
request_stream_handler_thread.join();
}
TEST_F(HybridEnd2endTest, GenericEchoAsyncRequestStreamResponseStream) {
EchoTestService::WithAsyncMethod_RequestStream<
EchoTestService::WithGenericMethod_Echo<
EchoTestService::WithAsyncMethod_ResponseStream<TestServiceImpl> > >
service;
AsyncGenericService generic_service;
SetUpServer(&service, &generic_service);
ResetStub();
std::thread echo_handler_thread([this, &generic_service] {
HandleGenericCall(&generic_service, cqs_[0].get());
});
std::thread request_stream_handler_thread(
[this, &service] { HandleClientStreaming(&service, cqs_[1].get()); });
std::thread response_stream_handler_thread(
[this, &service] { HandleServerStreaming(&service, cqs_[2].get()); });
TestAllMethods();
echo_handler_thread.join();
request_stream_handler_thread.join();
response_stream_handler_thread.join();
}
} // namespace

Loading…
Cancel
Save