pull/9964/head
Yuxuan Li 8 years ago
parent 34894e4b1c
commit bdc76ab37b
  1. 4
      include/grpc++/impl/codegen/async_stream.h
  2. 4
      include/grpc++/impl/codegen/client_context.h
  3. 4
      include/grpc++/impl/codegen/sync_stream.h
  4. 4
      src/cpp/client/client_context.cc
  5. 6
      test/cpp/end2end/async_end2end_test.cc
  6. 6
      test/cpp/end2end/end2end_test.cc
  7. 2
      test/cpp/end2end/mock_test.cc
  8. 4
      test/cpp/microbenchmarks/bm_fullstack.cc
  9. 2
      third_party/protobuf

@ -244,7 +244,7 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
void Write(const W& msg, WriteOptions options, void* tag) { void Write(const W& msg, WriteOptions options, void* tag) override {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();
@ -341,7 +341,7 @@ class ClientAsyncReaderWriter final
call_.PerformOps(&write_ops_); call_.PerformOps(&write_ops_);
} }
void Write(const W& msg, WriteOptions options, void* tag) { void Write(const W& msg, WriteOptions options, void* tag) override {
write_ops_.set_output_tag(tag); write_ops_.set_output_tag(tag);
if (options.is_last_message()) { if (options.is_last_message()) {
options.set_buffer_hint(); options.set_buffer_hint();

@ -288,7 +288,9 @@ class ClientContext {
/// ///
/// \param corked The flag indicating whether the initial metadata is to be /// \param corked The flag indicating whether the initial metadata is to be
/// corked or not. /// corked or not.
void sent_initial_metadata_corked(bool corked); void set_initial_metadata_corked(bool corked) {
initial_metadata_corked_ = corked;
}
/// Return the peer uri in a string. /// Return the peer uri in a string.
/// ///

@ -262,7 +262,7 @@ class ClientWriter : public ClientWriterInterface<W> {
if (context_->initial_metadata_corked_) { if (context_->initial_metadata_corked_) {
ops.SendInitialMetadata(context_->send_initial_metadata_, ops.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
context_->sent_initial_metadata_corked(false); context_->set_initial_metadata_corked(false);
} }
if (!ops.SendMessage(msg, options).ok()) { if (!ops.SendMessage(msg, options).ok()) {
return false; return false;
@ -372,7 +372,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
if (context_->initial_metadata_corked_) { if (context_->initial_metadata_corked_) {
ops.SendInitialMetadata(context_->send_initial_metadata_, ops.SendInitialMetadata(context_->send_initial_metadata_,
context_->initial_metadata_flags()); context_->initial_metadata_flags());
context_->sent_initial_metadata_corked(false); context_->set_initial_metadata_corked(false);
} }
if (!ops.SendMessage(msg, options).ok()) { if (!ops.SendMessage(msg, options).ok()) {
return false; return false;

@ -119,10 +119,6 @@ void ClientContext::set_compression_algorithm(
AddMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name); AddMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
} }
void ClientContext::sent_initial_metadata_corked(bool corked) {
initial_metadata_corked_ = corked;
}
void ClientContext::TryCancel() { void ClientContext::TryCancel() {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
if (call_) { if (call_) {

@ -498,7 +498,7 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx); ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message(GetParam().message_content); send_request.set_message(GetParam().message_content);
cli_ctx.sent_initial_metadata_corked(true); cli_ctx.set_initial_metadata_corked(true);
// tag:1 never comes up since no op is performed // tag:1 never comes up since no op is performed
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream( std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
@ -794,7 +794,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message(GetParam().message_content); send_request.set_message(GetParam().message_content);
cli_ctx.sent_initial_metadata_corked(true); cli_ctx.set_initial_metadata_corked(true);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
@ -862,7 +862,7 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx); ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message(GetParam().message_content); send_request.set_message(GetParam().message_content);
cli_ctx.sent_initial_metadata_corked(true); cli_ctx.set_initial_metadata_corked(true);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>> std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));

@ -708,7 +708,7 @@ TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
EchoResponse response; EchoResponse response;
ClientContext context; ClientContext context;
context.sent_initial_metadata_corked(true); context.set_initial_metadata_corked(true);
auto stream = stub_->RequestStream(&context, &response); auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello"); request.set_message("hello");
stream->WriteLast(request, WriteOptions()); stream->WriteLast(request, WriteOptions());
@ -739,7 +739,7 @@ TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
EchoResponse response; EchoResponse response;
ClientContext context; ClientContext context;
context.sent_initial_metadata_corked(true); context.set_initial_metadata_corked(true);
auto stream = stub_->RequestStream(&context, &response); auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello"); request.set_message("hello");
EXPECT_TRUE(stream->Write(request)); EXPECT_TRUE(stream->Write(request));
@ -828,7 +828,7 @@ TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
EchoResponse response; EchoResponse response;
ClientContext context; ClientContext context;
context.AddMetadata(kServerFinishAfterNReads, "3"); context.AddMetadata(kServerFinishAfterNReads, "3");
context.sent_initial_metadata_corked(true); context.set_initial_metadata_corked(true);
grpc::string msg("hello"); grpc::string msg("hello");
auto stream = stub_->BidiStream(&context); auto stream = stub_->BidiStream(&context);

@ -89,7 +89,7 @@ class MockClientReaderWriter<EchoRequest, EchoResponse> final
return true; return true;
} }
bool Write(const EchoRequest& msg, const WriteOptions& options) override { bool Write(const EchoRequest& msg, WriteOptions options) override {
gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str()); gpr_log(GPR_INFO, "mock recv msg %s", msg.message().c_str());
last_message_ = msg.message(); last_message_ = msg.message();
return true; return true;

@ -758,7 +758,7 @@ static void BM_StreamingPingPongMsgs(benchmark::State& state) {
// Repeatedly makes Streaming Bidi calls (exchanging a configurable number of // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
// messages in each call) in a loop on a single channel. Different from // messages in each call) in a loop on a single channel. Different from
// BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast, // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
// WriteAndFinish, sent_initial_metadata_corked. These apis aim at saving // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
// sendmsg syscalls for streaming by coalescing 1. initial metadata with first // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
// message; 2. final streaming message with trailing metadata. // message; 2. final streaming message with trailing metadata.
// //
@ -803,7 +803,7 @@ static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
ClientContext cli_ctx; ClientContext cli_ctx;
ClientContextMutator cli_ctx_mut(&cli_ctx); ClientContextMutator cli_ctx_mut(&cli_ctx);
cli_ctx.sent_initial_metadata_corked(true); cli_ctx.set_initial_metadata_corked(true);
// tag:1 here will never comes up, since we are not performing any op due // tag:1 here will never comes up, since we are not performing any op due
// to initial metadata coalescing. // to initial metadata coalescing.
auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1)); auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));

@ -1 +1 @@
Subproject commit 593e917c176b5bc5aafa57bf9f6030d749d91cd5 Subproject commit a428e42072765993ff674fda72863c9f1aa2d268
Loading…
Cancel
Save