Support server streaming

Skip unparsable input

Add tests for uni-directional stream calls

Simplify client stream handling
pull/8066/head
Yuchen Zeng 8 years ago
parent f9329217b1
commit d37f642f35
  1. 42
      test/cpp/util/cli_call.cc
  2. 3
      test/cpp/util/cli_call.h
  3. 4
      test/cpp/util/grpc_cli.cc
  4. 105
      test/cpp/util/grpc_tool.cc
  5. 95
      test/cpp/util/grpc_tool_test.cc
  6. 1
      test/cpp/util/proto_file_parser.cc

@ -48,8 +48,6 @@ namespace {
void* tag(int i) { return (void*)(intptr_t)i; } void* tag(int i) { return (void*)(intptr_t)i; }
} // namespace } // namespace
enum CliCall::CallStatus : intptr_t { CREATE, PROCESS, FINISH };
Status CliCall::Call(std::shared_ptr<grpc::Channel> channel, Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
const grpc::string& method, const grpc::string& request, const grpc::string& method, const grpc::string& request,
grpc::string* response, grpc::string* response,
@ -59,7 +57,9 @@ Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
CliCall call(channel, method, metadata); CliCall call(channel, method, metadata);
call.Write(request); call.Write(request);
call.WritesDone(); call.WritesDone();
call.Read(response, server_initial_metadata); if (!call.Read(response, server_initial_metadata)) {
fprintf(stderr, "Failed to read response.\n");
}
return call.Finish(server_trailing_metadata); return call.Finish(server_trailing_metadata);
} }
@ -92,36 +92,36 @@ void CliCall::Write(const grpc::string& request) {
GPR_ASSERT(ok); GPR_ASSERT(ok);
} }
void CliCall::Read(grpc::string* response, bool CliCall::Read(grpc::string* response,
IncomingMetadataContainer* server_initial_metadata) { IncomingMetadataContainer* server_initial_metadata) {
void* got_tag; void* got_tag;
bool ok; bool ok;
grpc::ByteBuffer recv_buffer; grpc::ByteBuffer recv_buffer;
call_->Read(&recv_buffer, tag(4)); call_->Read(&recv_buffer, tag(3));
cq_.Next(&got_tag, &ok);
if (!ok) { if (!cq_.Next(&got_tag, &ok) || !ok) {
fprintf(stderr, "Failed to read response."); return false;
} else { }
std::vector<grpc::Slice> slices; std::vector<grpc::Slice> slices;
(void)recv_buffer.Dump(&slices); recv_buffer.Dump(&slices);
response->clear(); response->clear();
for (size_t i = 0; i < slices.size(); i++) { for (size_t i = 0; i < slices.size(); i++) {
response->append(reinterpret_cast<const char*>(slices[i].begin()), response->append(reinterpret_cast<const char*>(slices[i].begin()),
slices[i].size()); slices[i].size());
} }
if (server_initial_metadata) { if (server_initial_metadata) {
*server_initial_metadata = ctx_.GetServerInitialMetadata(); *server_initial_metadata = ctx_.GetServerInitialMetadata();
}
} }
return true;
} }
void CliCall::WritesDone() { void CliCall::WritesDone() {
void* got_tag; void* got_tag;
bool ok; bool ok;
call_->WritesDone(tag(3)); call_->WritesDone(tag(4));
cq_.Next(&got_tag, &ok); cq_.Next(&got_tag, &ok);
GPR_ASSERT(ok); GPR_ASSERT(ok);
} }

@ -68,13 +68,12 @@ class CliCall final {
void WritesDone(); void WritesDone();
void Read(grpc::string* response, bool Read(grpc::string* response,
IncomingMetadataContainer* server_initial_metadata); IncomingMetadataContainer* server_initial_metadata);
Status Finish(IncomingMetadataContainer* server_trailing_metadata); Status Finish(IncomingMetadataContainer* server_trailing_metadata);
private: private:
enum CallStatus : intptr_t;
std::unique_ptr<grpc::GenericStub> stub_; std::unique_ptr<grpc::GenericStub> stub_;
grpc::ClientContext ctx_; grpc::ClientContext ctx_;
std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_; std::unique_ptr<grpc::GenericClientAsyncReaderWriter> call_;

@ -83,10 +83,10 @@ DEFINE_string(outfile, "", "Output file (default is stdout)");
static bool SimplePrint(const grpc::string& outfile, static bool SimplePrint(const grpc::string& outfile,
const grpc::string& output) { const grpc::string& output) {
if (outfile.empty()) { if (outfile.empty()) {
std::cout << output; std::cout << output << std::endl;
} else { } else {
std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary); std::ofstream output_file(outfile, std::ios::trunc | std::ios::binary);
output_file << output; output_file << output << std::endl;
output_file.close(); output_file.close();
} }
return true; return true;

@ -418,6 +418,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string formatted_method_name; grpc::string formatted_method_name;
std::unique_ptr<grpc::testing::ProtoFileParser> parser; std::unique_ptr<grpc::testing::ProtoFileParser> parser;
grpc::string serialized_request_proto; grpc::string serialized_request_proto;
bool print_mode = false;
std::shared_ptr<grpc::Channel> channel = std::shared_ptr<grpc::Channel> channel =
FLAGS_remotedb FLAGS_remotedb
@ -435,17 +436,19 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
} }
if (parser->IsStreaming(method_name, true /* is_request */)) { if (parser->IsStreaming(method_name, true /* is_request */)) {
fprintf(stderr, "streaming request\n"); // TODO(zyc): Support BidiStream
if (parser->IsStreaming(method_name, false /* is_request */)) {
fprintf(stderr,
"Bidirectional-streaming method is not supported.");
return false;
}
std::istream* input_stream; std::istream* input_stream;
std::ifstream input_file; std::ifstream input_file;
if (argc == 3) { if (argc == 3) {
request_text = argv[2]; request_text = argv[2];
if (!FLAGS_infile.empty()) {
fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
}
} }
// std::stringstream input_stream;
std::multimap<grpc::string, grpc::string> client_metadata; std::multimap<grpc::string, grpc::string> client_metadata;
ParseMetadataFlag(&client_metadata); ParseMetadataFlag(&client_metadata);
@ -455,47 +458,47 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
if (FLAGS_infile.empty()) { if (FLAGS_infile.empty()) {
if (isatty(STDIN_FILENO)) { if (isatty(STDIN_FILENO)) {
fprintf(stderr, "reading request message from stdin...\n"); print_mode = true;
fprintf(stderr, "reading streaming request message from stdin...\n");
} }
input_stream = &std::cin; input_stream = &std::cin;
// rdbuf = std::cin.rdbuf();
// input_stream.rdbuf(std::cin.rdbuf());
// input_stream << std::cin.rdbuf();
} else { } else {
input_file.open(FLAGS_infile, std::ios::in | std::ios::binary); input_file.open(FLAGS_infile, std::ios::in | std::ios::binary);
// rdbuf = input_file.rdbuf();
// input_stream.rdbuf(input_file.rdbuf());
input_stream = &input_file; input_stream = &input_file;
// input_file.close();
} }
// request_text = input_stream.str();
std::stringstream request_ss; std::stringstream request_ss;
grpc::string line; grpc::string line;
while (!input_stream->eof() && getline(*input_stream, line)) { while (!request_text.empty() ||
if (line.length() == 0) { (!input_stream->eof() && getline(*input_stream, line))) {
// request_text = request_ss.str(); if (!request_text.empty()) {
if (FLAGS_binary_input) { if (FLAGS_binary_input) {
serialized_request_proto = request_ss.str(); serialized_request_proto = request_text;
request_text.clear();
} else { } else {
serialized_request_proto = parser->GetSerializedProtoFromMethod( serialized_request_proto = parser->GetSerializedProtoFromMethod(
method_name, request_ss.str(), true /* is_request */); method_name, request_text, true /* is_request */);
request_text.clear();
if (parser->HasError()) { if (parser->HasError()) {
return false; if (print_mode) {
fprintf(stderr, "Failed to parse request.\n");
}
continue;
} }
} }
request_ss.str(grpc::string());
request_ss.clear();
grpc::string response_text = parser->GetTextFormatFromMethod(
method_name, serialized_request_proto, true /* is_request */);
call.Write(serialized_request_proto); call.Write(serialized_request_proto);
if (print_mode) {
fprintf(stderr, "%s", response_text.c_str()); fprintf(stderr, "Request sent.\n");
}
} else { } else {
request_ss << line << ' '; if (line.length() == 0) {
request_text = request_ss.str();
request_ss.str(grpc::string());
request_ss.clear();
} else {
request_ss << line << ' ';
}
} }
} }
if (input_file.is_open()) { if (input_file.is_open()) {
@ -507,7 +510,9 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
grpc::string serialized_response_proto; grpc::string serialized_response_proto;
std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata, std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
server_trailing_metadata; server_trailing_metadata;
call.Read(&serialized_response_proto, &server_initial_metadata); if (!call.Read(&serialized_response_proto, &server_trailing_metadata)) {
fprintf(stderr, "Failed to read response.\n");
}
Status status = call.Finish(&server_trailing_metadata); Status status = call.Finish(&server_trailing_metadata);
PrintMetadata(server_initial_metadata, PrintMetadata(server_initial_metadata,
@ -524,7 +529,7 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
if (parser->HasError()) { if (parser->HasError()) {
return false; return false;
} }
output_ss << "Response: \n " << response_text << std::endl; output_ss << response_text;
} }
} else { } else {
fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
@ -569,32 +574,40 @@ bool GrpcTool::CallMethod(int argc, const char** argv,
server_trailing_metadata; server_trailing_metadata;
ParseMetadataFlag(&client_metadata); ParseMetadataFlag(&client_metadata);
PrintMetadata(client_metadata, "Sending client initial metadata:"); PrintMetadata(client_metadata, "Sending client initial metadata:");
grpc::Status status = grpc::testing::CliCall::Call(
channel, formated_method_name, serialized_request_proto, CliCall call(channel, formated_method_name, client_metadata);
&serialized_response_proto, client_metadata, &server_initial_metadata, call.Write(serialized_request_proto);
&server_trailing_metadata); call.WritesDone();
PrintMetadata(server_initial_metadata,
"Received initial metadata from server:"); for (bool receive_initial_metadata = true; call.Read(
PrintMetadata(server_trailing_metadata, &serialized_response_proto,
"Received trailing metadata from server:"); receive_initial_metadata ? &server_initial_metadata : nullptr);
if (status.ok()) { receive_initial_metadata = false) {
fprintf(stderr, "Rpc succeeded with OK status\n"); if (!FLAGS_binary_output) {
if (FLAGS_binary_output) { serialized_response_proto = parser->GetTextFormatFromMethod(
output_ss << serialized_response_proto;
} else {
grpc::string response_text = parser->GetTextFormatFromMethod(
method_name, serialized_response_proto, false /* is_request */); method_name, serialized_response_proto, false /* is_request */);
if (parser->HasError()) { if (parser->HasError()) {
return false; return false;
} }
output_ss << "Response: \n " << response_text << std::endl;
} }
if (receive_initial_metadata) {
PrintMetadata(server_initial_metadata,
"Received initial metadata from server:");
}
if (!callback(serialized_response_proto)) {
return false;
}
}
Status status = call.Finish(&server_trailing_metadata);
if (status.ok()) {
fprintf(stderr, "Rpc succeeded with OK status\n");
return true;
} else { } else {
fprintf(stderr, "Rpc failed with status code %d, error message: %s\n", fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
status.error_code(), status.error_message().c_str()); status.error_code(), status.error_message().c_str());
return false;
} }
} }
return callback(output_ss.str()); return callback(output_ss.str());
} }

@ -102,6 +102,8 @@ DECLARE_bool(l);
namespace { namespace {
const int kNumResponseStreamsMsgs = 3;
class TestCliCredentials final : public grpc::testing::CliCredentials { class TestCliCredentials final : public grpc::testing::CliCredentials {
public: public:
std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override { std::shared_ptr<grpc::ChannelCredentials> GetCredentials() const override {
@ -137,6 +139,48 @@ class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
response->set_message(request->message()); response->set_message(request->message());
return Status::OK; return Status::OK;
} }
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) GRPC_OVERRIDE {
EchoRequest request;
response->set_message("");
if (!context->client_metadata().empty()) {
for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
iter = context->client_metadata().begin();
iter != context->client_metadata().end(); ++iter) {
context->AddInitialMetadata(ToString(iter->first),
ToString(iter->second));
}
}
context->AddTrailingMetadata("trailing_key", "trailing_value");
while (reader->Read(&request)) {
response->mutable_message()->append(request.message());
}
return Status::OK;
}
Status ResponseStream(ServerContext* context, const EchoRequest* request,
ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
if (!context->client_metadata().empty()) {
for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
iter = context->client_metadata().begin();
iter != context->client_metadata().end(); ++iter) {
context->AddInitialMetadata(ToString(iter->first),
ToString(iter->second));
}
}
context->AddTrailingMetadata("trailing_key", "trailing_value");
EchoResponse response;
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
response.set_message(request->message() + grpc::to_string(i));
writer->Write(response);
}
return Status::OK;
}
}; };
} // namespace } // namespace
@ -388,6 +432,57 @@ TEST_F(GrpcToolTest, ParseCommand) {
ShutdownServer(); ShutdownServer();
} }
TEST_F(GrpcToolTest, CallCommandRequestStream) {
// Test input: grpc_cli call localhost:<port> RequestStream "message:
// 'Hello0'"
std::stringstream output_stream;
const grpc::string server_address = SetUpServer();
const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
"RequestStream", "message: 'Hello0'"};
// Mock std::cin input "message: 'Hello1'\n\n message: 'Hello2'\n\n"
std::streambuf* orig = std::cin.rdbuf();
std::istringstream ss("message: 'Hello1'\n\n message: 'Hello2'\n\n");
std::cin.rdbuf(ss.rdbuf());
EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
// Expected output: "message: \"Hello0Hello1Hello2\""
EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
"message: \"Hello0Hello1Hello2\""));
std::cin.rdbuf(orig);
ShutdownServer();
}
TEST_F(GrpcToolTest, CallCommandResponseStream) {
// Test input: grpc_cli call localhost:<port> ResponseStream "message:
// 'Hello'"
std::stringstream output_stream;
const grpc::string server_address = SetUpServer();
const char* argv[] = {"grpc_cli", "call", server_address.c_str(),
"ResponseStream", "message: 'Hello'"};
EXPECT_TRUE(0 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
fprintf(stderr, "%s\n", output_stream.str().c_str());
// Expected output: "message: \"Hello{n}\""
for (int i = 0; i < kNumResponseStreamsMsgs; i++) {
grpc::string expected_response_text =
"message: \"Hello" + grpc::to_string(i) + "\"\n\n";
EXPECT_TRUE(NULL != strstr(output_stream.str().c_str(),
expected_response_text.c_str()));
}
ShutdownServer();
}
TEST_F(GrpcToolTest, TooFewArguments) { TEST_F(GrpcToolTest, TooFewArguments) {
// Test input "grpc_cli call Echo" // Test input "grpc_cli call Echo"
std::stringstream output_stream; std::stringstream output_stream;

@ -155,7 +155,6 @@ grpc::string ProtoFileParser::GetFullMethodName(const grpc::string& method) {
const auto* service_desc = *it; const auto* service_desc = *it;
for (int j = 0; j < service_desc->method_count(); j++) { for (int j = 0; j < service_desc->method_count(); j++) {
const auto* method_desc = service_desc->method(j); const auto* method_desc = service_desc->method(j);
fprintf(stderr, "%s\n", method_desc->full_name().c_str());
if (MethodNameMatch(method_desc->full_name(), method)) { if (MethodNameMatch(method_desc->full_name(), method)) {
if (method_descriptor) { if (method_descriptor) {
std::ostringstream error_stream; std::ostringstream error_stream;

Loading…
Cancel
Save