Add metadata test with rpc. Adding/fixing things to make it work

pull/573/head
Yang Gao 10 years ago
parent 1ece67cba2
commit 2b7f537546
  1. 10
      include/grpc++/client_context.h
  2. 6
      include/grpc++/impl/call.h
  3. 48
      include/grpc++/stream.h
  4. 8
      src/cpp/client/client_unary_call.cc
  5. 11
      src/cpp/common/call.cc
  6. 10
      src/cpp/server/server_context.cc
  7. 149
      test/cpp/end2end/async_end2end_test.cc

@ -119,16 +119,6 @@ class ClientContext {
friend class ::grpc::ClientAsyncWriter;
template <class R, class W>
friend class ::grpc::ClientAsyncReaderWriter;
friend Status BlockingUnaryCall(ChannelInterface *channel,
const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result);
friend void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
ClientContext *context,
const google::protobuf::Message &request,
google::protobuf::Message *result, Status *status,
CompletionQueue *cq, void *tag);
grpc_call *call() { return call_; }
void set_call(grpc_call *call) {

@ -65,13 +65,11 @@ class CallOpBuffer : public CompletionQueueTag {
void AddSendInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddSendInitialMetadata(ClientContext *ctx);
void AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string> *metadata);
void AddRecvInitialMetadata(ClientContext* ctx);
void AddSendMessage(const google::protobuf::Message &message);
void AddRecvMessage(google::protobuf::Message *message);
void AddClientSendClose();
void AddClientRecvStatus(std::multimap<grpc::string, grpc::string> *metadata,
Status *status);
void AddClientRecvStatus(ClientContext *ctx, Status *status);
void AddServerSendStatus(std::multimap<grpc::string, grpc::string> *metadata,
const Status &status);
void AddServerRecvClose(bool *cancelled);

@ -106,17 +106,15 @@ class ClientReader final : public ClientStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
context_->initial_metadata_received_ = true;
}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
buf.AddRecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
@ -126,7 +124,7 @@ class ClientReader final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@ -173,7 +171,7 @@ class ClientWriter final : public ClientStreamingInterface,
CallOpBuffer buf;
Status status;
buf.AddRecvMessage(response_);
buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf) && buf.got_message);
return status;
@ -210,17 +208,15 @@ class ClientReaderWriter final : public ClientStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
CallOpBuffer buf;
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
buf.AddRecvInitialMetadata(context_);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
context_->initial_metadata_received_ = true;
}
virtual bool Read(R* msg) override {
CallOpBuffer buf;
if (!context_->initial_metadata_received_) {
buf.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
buf.AddRecvInitialMetadata(context_);
}
buf.AddRecvMessage(msg);
call_.PerformOps(&buf);
@ -244,7 +240,7 @@ class ClientReaderWriter final : public ClientStreamingInterface,
virtual Status Finish() override {
CallOpBuffer buf;
Status status;
buf.AddClientRecvStatus(&context_->trailing_metadata_, &status);
buf.AddClientRecvStatus(context_, &status);
call_.PerformOps(&buf);
GPR_ASSERT(cq_.Pluck(&buf));
return status;
@ -403,16 +399,14 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
context_->initial_metadata_received_ = true;
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
read_buf_.AddRecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
@ -421,10 +415,9 @@ class ClientAsyncReader final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
@ -456,9 +449,8 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
context_->initial_metadata_received_ = true;
}
void Write(const W& msg, void* tag) override {
@ -476,11 +468,10 @@ class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddRecvMessage(response_);
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}
@ -514,16 +505,14 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
GPR_ASSERT(!context_->initial_metadata_received_);
meta_buf_.Reset(tag);
meta_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
meta_buf_.AddRecvInitialMetadata(context_);
call_.PerformOps(&meta_buf_);
context_->initial_metadata_received_ = true;
}
void Read(R* msg, void* tag) override {
read_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
read_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
read_buf_.AddRecvInitialMetadata(context_);
}
read_buf_.AddRecvMessage(msg);
call_.PerformOps(&read_buf_);
@ -544,10 +533,9 @@ class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
void Finish(Status* status, void* tag) override {
finish_buf_.Reset(tag);
if (!context_->initial_metadata_received_) {
finish_buf_.AddRecvInitialMetadata(&context_->recv_initial_metadata_);
context_->initial_metadata_received_ = true;
finish_buf_.AddRecvInitialMetadata(context_);
}
finish_buf_.AddClientRecvStatus(&context_->trailing_metadata_, status);
finish_buf_.AddClientRecvStatus(context_, status);
call_.PerformOps(&finish_buf_);
}

@ -52,10 +52,10 @@ Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Status status;
buf.AddSendInitialMetadata(context);
buf.AddSendMessage(request);
buf.AddRecvInitialMetadata(&context->recv_initial_metadata_);
buf.AddRecvInitialMetadata(context);
buf.AddRecvMessage(result);
buf.AddClientSendClose();
buf.AddClientRecvStatus(&context->trailing_metadata_, &status);
buf.AddClientRecvStatus(context, &status);
call.PerformOps(&buf);
GPR_ASSERT((cq.Pluck(&buf) && buf.got_message) || !status.IsOk());
return status;
@ -79,10 +79,10 @@ void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
Call call(channel->CreateCall(method, context, cq));
buf->AddSendInitialMetadata(context);
buf->AddSendMessage(request);
buf->AddRecvInitialMetadata(&context->recv_initial_metadata_);
buf->AddRecvInitialMetadata(context);
buf->AddRecvMessage(result);
buf->AddClientSendClose();
buf->AddClientRecvStatus(&context->trailing_metadata_, status);
buf->AddClientRecvStatus(context, status);
call.PerformOps(buf);
}

@ -130,9 +130,9 @@ void CallOpBuffer::AddSendInitialMetadata(
initial_metadata_ = FillMetadataArray(metadata);
}
void CallOpBuffer::AddRecvInitialMetadata(
std::multimap<grpc::string, grpc::string>* metadata) {
recv_initial_metadata_ = metadata;
void CallOpBuffer::AddRecvInitialMetadata(ClientContext* ctx) {
ctx->initial_metadata_received_ = true;
recv_initial_metadata_ = &ctx->recv_initial_metadata_;
}
void CallOpBuffer::AddSendInitialMetadata(ClientContext* ctx) {
@ -154,9 +154,8 @@ void CallOpBuffer::AddServerRecvClose(bool* cancelled) {
recv_closed_ = cancelled;
}
void CallOpBuffer::AddClientRecvStatus(
std::multimap<grpc::string, grpc::string>* metadata, Status* status) {
recv_trailing_metadata_ = metadata;
void CallOpBuffer::AddClientRecvStatus(ClientContext* context, Status* status) {
recv_trailing_metadata_ = &context->trailing_metadata_;
recv_status_ = status;
}

@ -57,4 +57,14 @@ ServerContext::~ServerContext() {
}
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
const grpc::string& value) {
initial_metadata_.insert(std::make_pair(key, value));
}
void ServerContext::AddTrailingMetadata(const grpc::string& key,
const grpc::string& value) {
trailing_metadata_.insert(std::make_pair(key, value));
}
} // namespace grpc

@ -364,6 +364,155 @@ TEST_F(AsyncEnd2endTest, ClientInitialMetadataRpc) {
EXPECT_TRUE(recv_status.IsOk());
}
TEST_F(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(4));
server_ok(4);
client_ok(1);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second, server_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
}
TEST_F(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
server_ok(4);
client_ok(1);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
EXPECT_EQ(meta1.second, server_trailing_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, server_trailing_metadata.find(meta2.first)->second);
EXPECT_EQ(2, server_trailing_metadata.size());
}
TEST_F(AsyncEnd2endTest, MetadataRpc) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2-bin", {"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13});
std::pair<grpc::string, grpc::string> meta3("key3", "val3");
std::pair<grpc::string, grpc::string> meta6("key4-bin", {"\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d", 14});
std::pair<grpc::string, grpc::string> meta5("key5", "val5");
std::pair<grpc::string, grpc::string> meta4("key6-bin", {"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15});
cli_ctx.AddMetadata(meta1.first, meta1.second);
cli_ctx.AddMetadata(meta2.first, meta2.second);
stub_->Echo(
&cli_ctx, send_request, &recv_response, &recv_status, &cli_cq_, tag(1));
service_.RequestEcho(
&srv_ctx, &recv_request, &response_writer, &srv_cq_, tag(2));
server_ok(2);
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second, client_initial_metadata.find(meta1.first)->second);
EXPECT_EQ(meta2.second, client_initial_metadata.find(meta2.first)->second);
EXPECT_EQ(2, client_initial_metadata.size());
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
server_ok(3);
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(4));
server_ok(4);
client_ok(1);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second, server_initial_metadata.find(meta3.first)->second);
EXPECT_EQ(meta4.second, server_initial_metadata.find(meta4.first)->second);
EXPECT_EQ(2, server_initial_metadata.size());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
EXPECT_EQ(meta5.second, server_trailing_metadata.find(meta5.first)->second);
EXPECT_EQ(meta6.second, server_trailing_metadata.find(meta6.first)->second);
EXPECT_EQ(2, server_trailing_metadata.size());
}
} // namespace
} // namespace testing
} // namespace grpc

Loading…
Cancel
Save