mirror of https://github.com/grpc/grpc.git
Merge pull request #15771 from ncteisen/serialization
Support Per-Method Codegen Generic on Server Sidepull/15938/head
commit
024c663312
32 changed files with 1220 additions and 249 deletions
@ -0,0 +1,112 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPC_IMPL_CODEGEN_LOG_H |
||||||
|
#define GRPC_IMPL_CODEGEN_LOG_H |
||||||
|
|
||||||
|
#include <grpc/impl/codegen/port_platform.h> |
||||||
|
|
||||||
|
#include <stdarg.h> |
||||||
|
#include <stdlib.h> /* for abort() */ |
||||||
|
|
||||||
|
#ifdef __cplusplus |
||||||
|
extern "C" { |
||||||
|
#endif |
||||||
|
|
||||||
|
/** GPR log API.
|
||||||
|
|
||||||
|
Usage (within grpc): |
||||||
|
|
||||||
|
int argument1 = 3; |
||||||
|
char* argument2 = "hello"; |
||||||
|
gpr_log(GPR_DEBUG, "format string %d", argument1); |
||||||
|
gpr_log(GPR_INFO, "hello world"); |
||||||
|
gpr_log(GPR_ERROR, "%d %s!!", argument1, argument2); */ |
||||||
|
|
||||||
|
/** The severity of a log message - use the #defines below when calling into
|
||||||
|
gpr_log to additionally supply file and line data */ |
||||||
|
typedef enum gpr_log_severity { |
||||||
|
GPR_LOG_SEVERITY_DEBUG, |
||||||
|
GPR_LOG_SEVERITY_INFO, |
||||||
|
GPR_LOG_SEVERITY_ERROR |
||||||
|
} gpr_log_severity; |
||||||
|
|
||||||
|
#define GPR_LOG_VERBOSITY_UNSET -1 |
||||||
|
|
||||||
|
/** Returns a string representation of the log severity */ |
||||||
|
GPRAPI const char* gpr_log_severity_string(gpr_log_severity severity); |
||||||
|
|
||||||
|
/** Macros to build log contexts at various severity levels */ |
||||||
|
#define GPR_DEBUG __FILE__, __LINE__, GPR_LOG_SEVERITY_DEBUG |
||||||
|
#define GPR_INFO __FILE__, __LINE__, GPR_LOG_SEVERITY_INFO |
||||||
|
#define GPR_ERROR __FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR |
||||||
|
|
||||||
|
/** Log a message. It's advised to use GPR_xxx above to generate the context
|
||||||
|
* for each message */ |
||||||
|
GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity, |
||||||
|
const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5); |
||||||
|
|
||||||
|
GPRAPI int gpr_should_log(gpr_log_severity severity); |
||||||
|
|
||||||
|
GPRAPI void gpr_log_message(const char* file, int line, |
||||||
|
gpr_log_severity severity, const char* message); |
||||||
|
|
||||||
|
/** Set global log verbosity */ |
||||||
|
GPRAPI void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print); |
||||||
|
|
||||||
|
GPRAPI void gpr_log_verbosity_init(void); |
||||||
|
|
||||||
|
/** Log overrides: applications can use this API to intercept logging calls
|
||||||
|
and use their own implementations */ |
||||||
|
|
||||||
|
struct gpr_log_func_args { |
||||||
|
const char* file; |
||||||
|
int line; |
||||||
|
gpr_log_severity severity; |
||||||
|
const char* message; |
||||||
|
}; |
||||||
|
|
||||||
|
typedef struct gpr_log_func_args gpr_log_func_args; |
||||||
|
|
||||||
|
typedef void (*gpr_log_func)(gpr_log_func_args* args); |
||||||
|
GPRAPI void gpr_set_log_function(gpr_log_func func); |
||||||
|
|
||||||
|
/** abort() the process if x is zero, having written a line to the log.
|
||||||
|
|
||||||
|
Intended for internal invariants. If the error can be recovered from, |
||||||
|
without the possibility of corruption, or might best be reflected via |
||||||
|
an exception in a higher-level language, consider returning error code. */ |
||||||
|
#define GPR_ASSERT(x) \ |
||||||
|
do { \
|
||||||
|
if (GPR_UNLIKELY(!(x))) { \
|
||||||
|
gpr_log(GPR_ERROR, "assertion failed: %s", #x); \
|
||||||
|
abort(); \
|
||||||
|
} \
|
||||||
|
} while (0) |
||||||
|
|
||||||
|
#ifndef NDEBUG |
||||||
|
#define GPR_DEBUG_ASSERT(x) GPR_ASSERT(x) |
||||||
|
#else |
||||||
|
#define GPR_DEBUG_ASSERT(x) |
||||||
|
#endif |
||||||
|
|
||||||
|
#ifdef __cplusplus |
||||||
|
} |
||||||
|
#endif |
||||||
|
|
||||||
|
#endif /* GRPC_IMPL_CODEGEN_LOG_H */ |
@ -0,0 +1,81 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#ifndef GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H |
||||||
|
#define GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H |
||||||
|
|
||||||
|
#include <grpcpp/impl/codegen/async_stream.h> |
||||||
|
#include <grpcpp/impl/codegen/byte_buffer.h> |
||||||
|
|
||||||
|
struct grpc_server; |
||||||
|
|
||||||
|
namespace grpc { |
||||||
|
|
||||||
|
typedef ServerAsyncReaderWriter<ByteBuffer, ByteBuffer> |
||||||
|
GenericServerAsyncReaderWriter; |
||||||
|
typedef ServerAsyncResponseWriter<ByteBuffer> GenericServerAsyncResponseWriter; |
||||||
|
typedef ServerAsyncReader<ByteBuffer, ByteBuffer> GenericServerAsyncReader; |
||||||
|
typedef ServerAsyncWriter<ByteBuffer> GenericServerAsyncWriter; |
||||||
|
|
||||||
|
class GenericServerContext final : public ServerContext { |
||||||
|
public: |
||||||
|
const grpc::string& method() const { return method_; } |
||||||
|
const grpc::string& host() const { return host_; } |
||||||
|
|
||||||
|
private: |
||||||
|
friend class Server; |
||||||
|
friend class ServerInterface; |
||||||
|
|
||||||
|
grpc::string method_; |
||||||
|
grpc::string host_; |
||||||
|
}; |
||||||
|
|
||||||
|
// A generic service at the server side accepts all RPC methods and hosts. It is
|
||||||
|
// typically used in proxies. The generic service can be registered to a server
|
||||||
|
// which also has other services.
|
||||||
|
// Sample usage:
|
||||||
|
// ServerBuilder builder;
|
||||||
|
// auto cq = builder.AddCompletionQueue();
|
||||||
|
// AsyncGenericService generic_service;
|
||||||
|
// builder.RegisterAsyncGeneicService(&generic_service);
|
||||||
|
// auto server = builder.BuildAndStart();
|
||||||
|
//
|
||||||
|
// // request a new call
|
||||||
|
// GenericServerContext context;
|
||||||
|
// GenericAsyncReaderWriter stream;
|
||||||
|
// generic_service.RequestCall(&context, &stream, cq.get(), cq.get(), tag);
|
||||||
|
//
|
||||||
|
// When tag is retrieved from cq->Next(), context.method() can be used to look
|
||||||
|
// at the method and the RPC can be handled accordingly.
|
||||||
|
class AsyncGenericService final { |
||||||
|
public: |
||||||
|
AsyncGenericService() : server_(nullptr) {} |
||||||
|
|
||||||
|
void RequestCall(GenericServerContext* ctx, |
||||||
|
GenericServerAsyncReaderWriter* reader_writer, |
||||||
|
CompletionQueue* call_cq, |
||||||
|
ServerCompletionQueue* notification_cq, void* tag); |
||||||
|
|
||||||
|
private: |
||||||
|
friend class Server; |
||||||
|
Server* server_; |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace grpc
|
||||||
|
|
||||||
|
#endif // GRPCPP_IMPL_CODEGEN_ASYNC_GENERIC_SERVICE_H
|
@ -0,0 +1,370 @@ |
|||||||
|
/*
|
||||||
|
* |
||||||
|
* Copyright 2015 gRPC authors. |
||||||
|
* |
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||||
|
* you may not use this file except in compliance with the License. |
||||||
|
* You may obtain a copy of the License at |
||||||
|
* |
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* |
||||||
|
* Unless required by applicable law or agreed to in writing, software |
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, |
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||||
|
* See the License for the specific language governing permissions and |
||||||
|
* limitations under the License. |
||||||
|
* |
||||||
|
*/ |
||||||
|
|
||||||
|
#include <cinttypes> |
||||||
|
#include <memory> |
||||||
|
#include <thread> |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/time.h> |
||||||
|
#include <grpcpp/channel.h> |
||||||
|
#include <grpcpp/client_context.h> |
||||||
|
#include <grpcpp/create_channel.h> |
||||||
|
#include <grpcpp/server.h> |
||||||
|
#include <grpcpp/server_builder.h> |
||||||
|
#include <grpcpp/server_context.h> |
||||||
|
|
||||||
|
#include "src/core/lib/gpr/env.h" |
||||||
|
#include "src/core/lib/iomgr/port.h" |
||||||
|
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" |
||||||
|
#include "src/proto/grpc/testing/echo.grpc.pb.h" |
||||||
|
#include "test/core/util/port.h" |
||||||
|
#include "test/core/util/test_config.h" |
||||||
|
#include "test/cpp/util/byte_buffer_proto_helper.h" |
||||||
|
#include "test/cpp/util/string_ref_helper.h" |
||||||
|
|
||||||
|
#include <gtest/gtest.h> |
||||||
|
|
||||||
|
using grpc::testing::EchoRequest; |
||||||
|
using grpc::testing::EchoResponse; |
||||||
|
|
||||||
|
namespace grpc { |
||||||
|
namespace testing { |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
void* tag(int i) { return (void*)static_cast<intptr_t>(i); } |
||||||
|
int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } |
||||||
|
|
||||||
|
class Verifier { |
||||||
|
public: |
||||||
|
Verifier() {} |
||||||
|
|
||||||
|
// Expect sets the expected ok value for a specific tag
|
||||||
|
Verifier& Expect(int i, bool expect_ok) { |
||||||
|
expectations_[tag(i)] = expect_ok; |
||||||
|
return *this; |
||||||
|
} |
||||||
|
|
||||||
|
// Next waits for 1 async tag to complete, checks its
|
||||||
|
// expectations, and returns the tag
|
||||||
|
int Next(CompletionQueue* cq, bool ignore_ok) { |
||||||
|
bool ok; |
||||||
|
void* got_tag; |
||||||
|
EXPECT_TRUE(cq->Next(&got_tag, &ok)); |
||||||
|
GotTag(got_tag, ok, ignore_ok); |
||||||
|
return detag(got_tag); |
||||||
|
} |
||||||
|
|
||||||
|
// Verify keeps calling Next until all currently set
|
||||||
|
// expected tags are complete
|
||||||
|
void Verify(CompletionQueue* cq) { |
||||||
|
GPR_ASSERT(!expectations_.empty()); |
||||||
|
while (!expectations_.empty()) { |
||||||
|
Next(cq, false); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
void GotTag(void* got_tag, bool ok, bool ignore_ok) { |
||||||
|
auto it = expectations_.find(got_tag); |
||||||
|
if (it != expectations_.end()) { |
||||||
|
if (!ignore_ok) { |
||||||
|
EXPECT_EQ(it->second, ok); |
||||||
|
} |
||||||
|
expectations_.erase(it); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
std::map<void*, bool> expectations_; |
||||||
|
}; |
||||||
|
|
||||||
|
class RawEnd2EndTest : public ::testing::Test { |
||||||
|
protected: |
||||||
|
RawEnd2EndTest() {} |
||||||
|
|
||||||
|
void SetUp() override { |
||||||
|
port_ = grpc_pick_unused_port_or_die(); |
||||||
|
server_address_ << "localhost:" << port_; |
||||||
|
} |
||||||
|
|
||||||
|
void TearDown() override { |
||||||
|
server_->Shutdown(); |
||||||
|
void* ignored_tag; |
||||||
|
bool ignored_ok; |
||||||
|
cq_->Shutdown(); |
||||||
|
while (cq_->Next(&ignored_tag, &ignored_ok)) |
||||||
|
; |
||||||
|
stub_.reset(); |
||||||
|
grpc_recycle_unused_port(port_); |
||||||
|
} |
||||||
|
|
||||||
|
template <typename ServerType> |
||||||
|
std::unique_ptr<ServerType> BuildAndStartServer() { |
||||||
|
ServerBuilder builder; |
||||||
|
builder.AddListeningPort(server_address_.str(), |
||||||
|
grpc::InsecureServerCredentials()); |
||||||
|
std::unique_ptr<ServerType> service(new ServerType()); |
||||||
|
builder.RegisterService(service.get()); |
||||||
|
cq_ = builder.AddCompletionQueue(); |
||||||
|
server_ = builder.BuildAndStart(); |
||||||
|
return service; |
||||||
|
} |
||||||
|
|
||||||
|
void ResetStub() { |
||||||
|
ChannelArguments args; |
||||||
|
std::shared_ptr<Channel> channel = CreateChannel( |
||||||
|
server_address_.str(), grpc::InsecureChannelCredentials()); |
||||||
|
stub_ = grpc::testing::EchoTestService::NewStub(channel); |
||||||
|
} |
||||||
|
|
||||||
|
std::unique_ptr<ServerCompletionQueue> cq_; |
||||||
|
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; |
||||||
|
std::unique_ptr<Server> server_; |
||||||
|
std::ostringstream server_address_; |
||||||
|
int port_; |
||||||
|
|
||||||
|
// For the client application to populate and send to server.
|
||||||
|
EchoRequest send_request_; |
||||||
|
::grpc::ByteBuffer send_request_buffer_; |
||||||
|
|
||||||
|
// For the server to give to gRPC to be populated by incoming request
|
||||||
|
// from client.
|
||||||
|
EchoRequest recv_request_; |
||||||
|
::grpc::ByteBuffer recv_request_buffer_; |
||||||
|
|
||||||
|
// For the server application to populate and send back to client.
|
||||||
|
EchoResponse send_response_; |
||||||
|
::grpc::ByteBuffer send_response_buffer_; |
||||||
|
|
||||||
|
// For the client to give to gRPC to be populated by incoming response
|
||||||
|
// from server.
|
||||||
|
EchoResponse recv_response_; |
||||||
|
::grpc::ByteBuffer recv_response_buffer_; |
||||||
|
Status recv_status_; |
||||||
|
|
||||||
|
// Both sides need contexts
|
||||||
|
ClientContext cli_ctx_; |
||||||
|
ServerContext srv_ctx_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Regular Async, both peers use proto
|
||||||
|
TEST_F(RawEnd2EndTest, PureAsyncService) { |
||||||
|
typedef grpc::testing::EchoTestService::AsyncService SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_); |
||||||
|
|
||||||
|
send_request_.set_message("hello"); |
||||||
|
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( |
||||||
|
stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get())); |
||||||
|
service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(), |
||||||
|
cq_.get(), tag(2)); |
||||||
|
response_reader->Finish(&recv_response_, &recv_status_, tag(4)); |
||||||
|
Verifier().Expect(2, true).Verify(cq_.get()); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
send_response_.set_message(recv_request_.message()); |
||||||
|
response_writer.Finish(send_response_, Status::OK, tag(3)); |
||||||
|
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
EXPECT_TRUE(recv_status_.ok()); |
||||||
|
} |
||||||
|
|
||||||
|
// Client uses proto, server uses generic codegen, unary
|
||||||
|
TEST_F(RawEnd2EndTest, RawServerUnary) { |
||||||
|
typedef grpc::testing::EchoTestService::WithRawMethod_Echo< |
||||||
|
grpc::testing::EchoTestService::Service> |
||||||
|
SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_); |
||||||
|
|
||||||
|
send_request_.set_message("hello unary"); |
||||||
|
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( |
||||||
|
stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get())); |
||||||
|
service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer, |
||||||
|
cq_.get(), cq_.get(), tag(2)); |
||||||
|
response_reader->Finish(&recv_response_, &recv_status_, tag(4)); |
||||||
|
Verifier().Expect(2, true).Verify(cq_.get()); |
||||||
|
EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_)); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
send_response_.set_message(recv_request_.message()); |
||||||
|
EXPECT_TRUE( |
||||||
|
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_)); |
||||||
|
response_writer.Finish(send_response_buffer_, Status::OK, tag(3)); |
||||||
|
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
EXPECT_TRUE(recv_status_.ok()); |
||||||
|
} |
||||||
|
|
||||||
|
// Client uses proto, server uses generic codegen, client streaming
|
||||||
|
TEST_F(RawEnd2EndTest, RawServerClientStreaming) { |
||||||
|
typedef grpc::testing::EchoTestService::WithRawMethod_RequestStream< |
||||||
|
grpc::testing::EchoTestService::Service> |
||||||
|
SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
|
||||||
|
grpc::GenericServerAsyncReader srv_stream(&srv_ctx_); |
||||||
|
|
||||||
|
send_request_.set_message("hello client streaming"); |
||||||
|
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( |
||||||
|
stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1))); |
||||||
|
|
||||||
|
service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(), |
||||||
|
tag(2)); |
||||||
|
|
||||||
|
Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
cli_stream->Write(send_request_, tag(3)); |
||||||
|
srv_stream.Read(&recv_request_buffer_, tag(4)); |
||||||
|
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||||
|
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
|
||||||
|
cli_stream->Write(send_request_, tag(5)); |
||||||
|
srv_stream.Read(&recv_request_buffer_, tag(6)); |
||||||
|
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
cli_stream->WritesDone(tag(7)); |
||||||
|
srv_stream.Read(&recv_request_buffer_, tag(8)); |
||||||
|
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||||
|
|
||||||
|
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||||
|
send_response_.set_message(recv_request_.message()); |
||||||
|
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||||
|
srv_stream.Finish(send_response_buffer_, Status::OK, tag(9)); |
||||||
|
cli_stream->Finish(&recv_status_, tag(10)); |
||||||
|
Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
EXPECT_TRUE(recv_status_.ok()); |
||||||
|
} |
||||||
|
|
||||||
|
// Client uses proto, server uses generic codegen, server streaming
|
||||||
|
TEST_F(RawEnd2EndTest, RawServerServerStreaming) { |
||||||
|
typedef grpc::testing::EchoTestService::WithRawMethod_ResponseStream< |
||||||
|
grpc::testing::EchoTestService::Service> |
||||||
|
SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_); |
||||||
|
|
||||||
|
send_request_.set_message("hello server streaming"); |
||||||
|
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( |
||||||
|
stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1))); |
||||||
|
|
||||||
|
service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream, |
||||||
|
cq_.get(), cq_.get(), tag(2)); |
||||||
|
|
||||||
|
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); |
||||||
|
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
|
||||||
|
send_response_.set_message(recv_request_.message()); |
||||||
|
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||||
|
srv_stream.Write(send_response_buffer_, tag(3)); |
||||||
|
cli_stream->Read(&recv_response_, tag(4)); |
||||||
|
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
|
||||||
|
srv_stream.Write(send_response_buffer_, tag(5)); |
||||||
|
cli_stream->Read(&recv_response_, tag(6)); |
||||||
|
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
|
||||||
|
srv_stream.Finish(Status::OK, tag(7)); |
||||||
|
cli_stream->Read(&recv_response_, tag(8)); |
||||||
|
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||||
|
|
||||||
|
cli_stream->Finish(&recv_status_, tag(9)); |
||||||
|
Verifier().Expect(9, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
EXPECT_TRUE(recv_status_.ok()); |
||||||
|
} |
||||||
|
|
||||||
|
// Client uses proto, server uses generic codegen, bidi streaming
|
||||||
|
TEST_F(RawEnd2EndTest, RawServerBidiStreaming) { |
||||||
|
typedef grpc::testing::EchoTestService::WithRawMethod_BidiStream< |
||||||
|
grpc::testing::EchoTestService::Service> |
||||||
|
SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
|
||||||
|
grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_); |
||||||
|
|
||||||
|
send_request_.set_message("hello bidi streaming"); |
||||||
|
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> |
||||||
|
cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1))); |
||||||
|
|
||||||
|
service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(), |
||||||
|
tag(2)); |
||||||
|
|
||||||
|
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
cli_stream->Write(send_request_, tag(3)); |
||||||
|
srv_stream.Read(&recv_request_buffer_, tag(4)); |
||||||
|
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||||
|
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||||
|
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||||
|
|
||||||
|
send_response_.set_message(recv_request_.message()); |
||||||
|
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||||
|
srv_stream.Write(send_response_buffer_, tag(5)); |
||||||
|
cli_stream->Read(&recv_response_, tag(6)); |
||||||
|
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||||
|
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||||
|
|
||||||
|
cli_stream->WritesDone(tag(7)); |
||||||
|
srv_stream.Read(&recv_request_buffer_, tag(8)); |
||||||
|
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||||
|
|
||||||
|
srv_stream.Finish(Status::OK, tag(9)); |
||||||
|
cli_stream->Finish(&recv_status_, tag(10)); |
||||||
|
Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get()); |
||||||
|
|
||||||
|
EXPECT_TRUE(recv_status_.ok()); |
||||||
|
} |
||||||
|
|
||||||
|
// Testing that this pattern compiles
|
||||||
|
TEST_F(RawEnd2EndTest, CompileTest) { |
||||||
|
typedef grpc::testing::EchoTestService::WithRawMethod_Echo< |
||||||
|
grpc::testing::EchoTestService::AsyncService> |
||||||
|
SType; |
||||||
|
ResetStub(); |
||||||
|
auto service = BuildAndStartServer<SType>(); |
||||||
|
} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace testing
|
||||||
|
} // namespace grpc
|
||||||
|
|
||||||
|
int main(int argc, char** argv) { |
||||||
|
// Change the backup poll interval from 5s to 100ms to speed up the
|
||||||
|
// ReconnectChannel test
|
||||||
|
grpc_test_init(argc, argv); |
||||||
|
::testing::InitGoogleTest(&argc, argv); |
||||||
|
int ret = RUN_ALL_TESTS(); |
||||||
|
return ret; |
||||||
|
} |
Loading…
Reference in new issue