mirror of https://github.com/grpc/grpc.git
parent
93a90e6132
commit
bacd625d37
8 changed files with 614 additions and 0 deletions
@ -0,0 +1,454 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2015 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <cinttypes> |
||||
#include <memory> |
||||
#include <thread> |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/log.h> |
||||
#include <grpc/support/time.h> |
||||
#include <grpcpp/channel.h> |
||||
#include <grpcpp/client_context.h> |
||||
#include <grpcpp/create_channel.h> |
||||
#include <grpcpp/server.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
|
||||
#include "src/core/lib/gpr/env.h" |
||||
#include "src/core/lib/iomgr/port.h" |
||||
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" |
||||
#include "src/proto/grpc/testing/echo.grpc.pb.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/util/byte_buffer_proto_helper.h" |
||||
#include "test/cpp/util/string_ref_helper.h" |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
using grpc::testing::EchoRequest; |
||||
using grpc::testing::EchoResponse; |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
|
||||
namespace { |
||||
|
||||
void* tag(int i) { return (void*)static_cast<intptr_t>(i); } |
||||
int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } |
||||
|
||||
class Verifier { |
||||
public: |
||||
Verifier() : lambda_run_(false) {} |
||||
// Expect sets the expected ok value for a specific tag
|
||||
Verifier& Expect(int i, bool expect_ok) { |
||||
return ExpectUnless(i, expect_ok, false); |
||||
} |
||||
// ExpectUnless sets the expected ok value for a specific tag
|
||||
// unless the tag was already marked seen (as a result of ExpectMaybe)
|
||||
Verifier& ExpectUnless(int i, bool expect_ok, bool seen) { |
||||
if (!seen) { |
||||
expectations_[tag(i)] = expect_ok; |
||||
} |
||||
return *this; |
||||
} |
||||
// ExpectMaybe sets the expected ok value for a specific tag, but does not
|
||||
// require it to appear
|
||||
// If it does, sets *seen to true
|
||||
Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) { |
||||
if (!*seen) { |
||||
maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen}; |
||||
} |
||||
return *this; |
||||
} |
||||
|
||||
// Next waits for 1 async tag to complete, checks its
|
||||
// expectations, and returns the tag
|
||||
int Next(CompletionQueue* cq, bool ignore_ok) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_TRUE(cq->Next(&got_tag, &ok)); |
||||
GotTag(got_tag, ok, ignore_ok); |
||||
return detag(got_tag); |
||||
} |
||||
|
||||
template <typename T> |
||||
CompletionQueue::NextStatus DoOnceThenAsyncNext( |
||||
CompletionQueue* cq, void** got_tag, bool* ok, T deadline, |
||||
std::function<void(void)> lambda) { |
||||
if (lambda_run_) { |
||||
return cq->AsyncNext(got_tag, ok, deadline); |
||||
} else { |
||||
lambda_run_ = true; |
||||
return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); |
||||
} |
||||
} |
||||
|
||||
// Verify keeps calling Next until all currently set
|
||||
// expected tags are complete
|
||||
void Verify(CompletionQueue* cq) { Verify(cq, false); } |
||||
|
||||
// This version of Verify allows optionally ignoring the
|
||||
// outcome of the expectation
|
||||
void Verify(CompletionQueue* cq, bool ignore_ok) { |
||||
GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty()); |
||||
while (!expectations_.empty()) { |
||||
Next(cq, ignore_ok); |
||||
} |
||||
} |
||||
|
||||
// This version of Verify stops after a certain deadline
|
||||
void Verify(CompletionQueue* cq, |
||||
std::chrono::system_clock::time_point deadline) { |
||||
if (expectations_.empty()) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), |
||||
CompletionQueue::TIMEOUT); |
||||
} else { |
||||
while (!expectations_.empty()) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), |
||||
CompletionQueue::GOT_EVENT); |
||||
GotTag(got_tag, ok, false); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// This version of Verify stops after a certain deadline, and uses the
|
||||
// DoThenAsyncNext API
|
||||
// to call the lambda
|
||||
void Verify(CompletionQueue* cq, |
||||
std::chrono::system_clock::time_point deadline, |
||||
std::function<void(void)> lambda) { |
||||
if (expectations_.empty()) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), |
||||
CompletionQueue::TIMEOUT); |
||||
} else { |
||||
while (!expectations_.empty()) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), |
||||
CompletionQueue::GOT_EVENT); |
||||
GotTag(got_tag, ok, false); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private: |
||||
void GotTag(void* got_tag, bool ok, bool ignore_ok) { |
||||
auto it = expectations_.find(got_tag); |
||||
if (it != expectations_.end()) { |
||||
if (!ignore_ok) { |
||||
EXPECT_EQ(it->second, ok); |
||||
} |
||||
expectations_.erase(it); |
||||
} else { |
||||
auto it2 = maybe_expectations_.find(got_tag); |
||||
if (it2 != maybe_expectations_.end()) { |
||||
if (it2->second.seen != nullptr) { |
||||
EXPECT_FALSE(*it2->second.seen); |
||||
*it2->second.seen = true; |
||||
} |
||||
if (!ignore_ok) { |
||||
EXPECT_EQ(it2->second.ok, ok); |
||||
} |
||||
} else { |
||||
gpr_log(GPR_ERROR, "Unexpected tag: %p", tag); |
||||
abort(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
struct MaybeExpect { |
||||
bool ok; |
||||
bool* seen; |
||||
}; |
||||
|
||||
std::map<void*, bool> expectations_; |
||||
std::map<void*, MaybeExpect> maybe_expectations_; |
||||
bool lambda_run_; |
||||
}; |
||||
|
||||
class CodegenGenericEnd2EndTest : public ::testing::Test { |
||||
protected: |
||||
CodegenGenericEnd2EndTest() {} |
||||
|
||||
void SetUp() override { |
||||
port_ = grpc_pick_unused_port_or_die(); |
||||
server_address_ << "localhost:" << port_; |
||||
} |
||||
|
||||
void TearDown() override { |
||||
server_->Shutdown(); |
||||
void* ignored_tag; |
||||
bool ignored_ok; |
||||
cq_->Shutdown(); |
||||
while (cq_->Next(&ignored_tag, &ignored_ok)) |
||||
; |
||||
stub_.reset(); |
||||
grpc_recycle_unused_port(port_); |
||||
} |
||||
|
||||
template <typename ServerType> |
||||
std::unique_ptr<ServerType> BuildAndStartServer() { |
||||
ServerBuilder builder; |
||||
builder.AddListeningPort(server_address_.str(), |
||||
grpc::InsecureServerCredentials()); |
||||
std::unique_ptr<ServerType> service(new ServerType()); |
||||
builder.RegisterService(service.get()); |
||||
cq_ = builder.AddCompletionQueue(); |
||||
server_ = builder.BuildAndStart(); |
||||
return service; |
||||
} |
||||
|
||||
void ResetStub() { |
||||
ChannelArguments args; |
||||
std::shared_ptr<Channel> channel = CreateChannel( |
||||
server_address_.str(), grpc::InsecureChannelCredentials()); |
||||
stub_ = grpc::testing::EchoTestService::NewStub(channel); |
||||
} |
||||
|
||||
std::unique_ptr<ServerCompletionQueue> cq_; |
||||
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; |
||||
std::unique_ptr<Server> server_; |
||||
std::ostringstream server_address_; |
||||
int port_; |
||||
|
||||
// For the client application to populate and send to server.
|
||||
EchoRequest send_request_; |
||||
::grpc::ByteBuffer send_request_buffer_; |
||||
|
||||
// For the server to give to gRPC to be populated by incoming request
|
||||
// from client.
|
||||
EchoRequest recv_request_; |
||||
::grpc::ByteBuffer recv_request_buffer_; |
||||
|
||||
// For the server application to populate and send back to client.
|
||||
EchoResponse send_response_; |
||||
::grpc::ByteBuffer send_response_buffer_; |
||||
|
||||
// For the client to give to gRPC to be populated by incoming response
|
||||
// from server.
|
||||
EchoResponse recv_response_; |
||||
::grpc::ByteBuffer recv_response_buffer_; |
||||
Status recv_status_; |
||||
|
||||
// Both sides need contexts
|
||||
ClientContext cli_ctx_; |
||||
ServerContext srv_ctx_; |
||||
}; |
||||
|
||||
// Regular Async, both peers use proto
|
||||
TEST_F(CodegenGenericEnd2EndTest, PureAsyncService) { |
||||
typedef grpc::testing::EchoTestService::AsyncService SType; |
||||
ResetStub(); |
||||
auto service = BuildAndStartServer<SType>(); |
||||
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx_); |
||||
|
||||
send_request_.set_message("hello"); |
||||
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( |
||||
stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get())); |
||||
service->RequestEcho(&srv_ctx_, &recv_request_, &response_writer, cq_.get(), |
||||
cq_.get(), tag(2)); |
||||
response_reader->Finish(&recv_response_, &recv_status_, tag(4)); |
||||
Verifier().Expect(2, true).Verify(cq_.get()); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
send_response_.set_message(recv_request_.message()); |
||||
response_writer.Finish(send_response_, Status::OK, tag(3)); |
||||
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||
|
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
EXPECT_TRUE(recv_status_.ok()); |
||||
} |
||||
|
||||
// Client uses proto, server uses generic codegen, unary
|
||||
TEST_F(CodegenGenericEnd2EndTest, CodegenGenericServerUnary) { |
||||
typedef grpc::testing::EchoTestService::WithCodegenGenericMethod_Echo< |
||||
grpc::testing::EchoTestService::Service> |
||||
SType; |
||||
ResetStub(); |
||||
auto service = BuildAndStartServer<SType>(); |
||||
grpc::GenericServerAsyncResponseWriter response_writer(&srv_ctx_); |
||||
|
||||
send_request_.set_message("hello unary"); |
||||
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( |
||||
stub_->AsyncEcho(&cli_ctx_, send_request_, cq_.get())); |
||||
service->RequestEcho(&srv_ctx_, &recv_request_buffer_, &response_writer, |
||||
cq_.get(), cq_.get(), tag(2)); |
||||
response_reader->Finish(&recv_response_, &recv_status_, tag(4)); |
||||
Verifier().Expect(2, true).Verify(cq_.get()); |
||||
EXPECT_TRUE(ParseFromByteBuffer(&recv_request_buffer_, &recv_request_)); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
send_response_.set_message(recv_request_.message()); |
||||
EXPECT_TRUE( |
||||
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_)); |
||||
response_writer.Finish(send_response_buffer_, Status::OK, tag(3)); |
||||
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||
|
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
EXPECT_TRUE(recv_status_.ok()); |
||||
} |
||||
|
||||
// Client uses proto, server uses generic codegen, client streaming
|
||||
TEST_F(CodegenGenericEnd2EndTest, CodegenGenericServerClientStreaming) { |
||||
typedef grpc::testing::EchoTestService::WithCodegenGenericMethod_RequestStream< |
||||
grpc::testing::EchoTestService::Service> |
||||
SType; |
||||
ResetStub(); |
||||
auto service = BuildAndStartServer<SType>(); |
||||
|
||||
grpc::GenericServerAsyncReader srv_stream(&srv_ctx_); |
||||
|
||||
send_request_.set_message("hello client streaming"); |
||||
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( |
||||
stub_->AsyncRequestStream(&cli_ctx_, &recv_response_, cq_.get(), tag(1))); |
||||
|
||||
service->RequestRequestStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(), |
||||
tag(2)); |
||||
|
||||
Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get()); |
||||
|
||||
cli_stream->Write(send_request_, tag(3)); |
||||
srv_stream.Read(&recv_request_buffer_, tag(4)); |
||||
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
|
||||
cli_stream->Write(send_request_, tag(5)); |
||||
srv_stream.Read(&recv_request_buffer_, tag(6)); |
||||
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||
|
||||
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
cli_stream->WritesDone(tag(7)); |
||||
srv_stream.Read(&recv_request_buffer_, tag(8)); |
||||
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||
|
||||
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||
send_response_.set_message(recv_request_.message()); |
||||
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||
srv_stream.Finish(send_response_buffer_, Status::OK, tag(9)); |
||||
cli_stream->Finish(&recv_status_, tag(10)); |
||||
Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get()); |
||||
|
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
EXPECT_TRUE(recv_status_.ok()); |
||||
} |
||||
|
||||
// Client uses proto, server uses generic codegen, server streaming
|
||||
TEST_F(CodegenGenericEnd2EndTest, CodegenGenericServerServerStreaming) { |
||||
typedef grpc::testing::EchoTestService::WithCodegenGenericMethod_ResponseStream< |
||||
grpc::testing::EchoTestService::Service> |
||||
SType; |
||||
ResetStub(); |
||||
auto service = BuildAndStartServer<SType>(); |
||||
grpc::GenericServerAsyncWriter srv_stream(&srv_ctx_); |
||||
|
||||
send_request_.set_message("hello server streaming"); |
||||
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream( |
||||
stub_->AsyncResponseStream(&cli_ctx_, send_request_, cq_.get(), tag(1))); |
||||
|
||||
service->RequestResponseStream(&srv_ctx_, &recv_request_buffer_, &srv_stream, |
||||
cq_.get(), cq_.get(), tag(2)); |
||||
|
||||
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); |
||||
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
|
||||
send_response_.set_message(recv_request_.message()); |
||||
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||
srv_stream.Write(send_response_buffer_, tag(3)); |
||||
cli_stream->Read(&recv_response_, tag(4)); |
||||
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
|
||||
srv_stream.Write(send_response_buffer_, tag(5)); |
||||
cli_stream->Read(&recv_response_, tag(6)); |
||||
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
|
||||
srv_stream.Finish(Status::OK, tag(7)); |
||||
cli_stream->Read(&recv_response_, tag(8)); |
||||
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||
|
||||
cli_stream->Finish(&recv_status_, tag(9)); |
||||
Verifier().Expect(9, true).Verify(cq_.get()); |
||||
|
||||
EXPECT_TRUE(recv_status_.ok()); |
||||
} |
||||
|
||||
// Client uses proto, server uses generic codegen, bidi streaming
|
||||
TEST_F(CodegenGenericEnd2EndTest, CodegenGenericServerBidiStreaming) { |
||||
typedef grpc::testing::EchoTestService::WithCodegenGenericMethod_BidiStream< |
||||
grpc::testing::EchoTestService::Service> |
||||
SType; |
||||
ResetStub(); |
||||
auto service = BuildAndStartServer<SType>(); |
||||
|
||||
grpc::GenericServerAsyncReaderWriter srv_stream(&srv_ctx_); |
||||
|
||||
send_request_.set_message("hello bidi streaming"); |
||||
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> |
||||
cli_stream(stub_->AsyncBidiStream(&cli_ctx_, cq_.get(), tag(1))); |
||||
|
||||
service->RequestBidiStream(&srv_ctx_, &srv_stream, cq_.get(), cq_.get(), |
||||
tag(2)); |
||||
|
||||
Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get()); |
||||
|
||||
cli_stream->Write(send_request_, tag(3)); |
||||
srv_stream.Read(&recv_request_buffer_, tag(4)); |
||||
Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get()); |
||||
ParseFromByteBuffer(&recv_request_buffer_, &recv_request_); |
||||
EXPECT_EQ(send_request_.message(), recv_request_.message()); |
||||
|
||||
send_response_.set_message(recv_request_.message()); |
||||
SerializeToByteBufferInPlace(&send_response_, &send_response_buffer_); |
||||
srv_stream.Write(send_response_buffer_, tag(5)); |
||||
cli_stream->Read(&recv_response_, tag(6)); |
||||
Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get()); |
||||
EXPECT_EQ(send_response_.message(), recv_response_.message()); |
||||
|
||||
cli_stream->WritesDone(tag(7)); |
||||
srv_stream.Read(&recv_request_buffer_, tag(8)); |
||||
Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get()); |
||||
|
||||
srv_stream.Finish(Status::OK, tag(9)); |
||||
cli_stream->Finish(&recv_status_, tag(10)); |
||||
Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get()); |
||||
|
||||
EXPECT_TRUE(recv_status_.ok()); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
// Change the backup poll interval from 5s to 100ms to speed up the
|
||||
// ReconnectChannel test
|
||||
grpc_test_init(argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
int ret = RUN_ALL_TESTS(); |
||||
return ret; |
||||
} |
Loading…
Reference in new issue