Merge pull request #14080 from yang-g/return_early

Clear pending data when stream is closed by server.
pull/14151/head
Yang Gao 7 years ago committed by GitHub
commit ef7d312be3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      CMakeLists.txt
  2. 48
      Makefile
  3. 13
      build.yaml
  4. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  5. 18
      test/cpp/end2end/BUILD
  6. 25
      test/cpp/end2end/end2end_test.cc
  7. 233
      test/cpp/end2end/server_early_return_test.cc
  8. 13
      test/cpp/end2end/test_service_impl.cc
  9. 1
      test/cpp/end2end/test_service_impl.h
  10. 19
      tools/run_tests/generated/sources_and_headers.json
  11. 24
      tools/run_tests/generated/tests.json

@ -590,6 +590,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx server_crash_test)
endif()
add_dependencies(buildtests_cxx server_crash_test_client)
add_dependencies(buildtests_cxx server_early_return_test)
add_dependencies(buildtests_cxx server_request_call_test)
add_dependencies(buildtests_cxx shutdown_test)
add_dependencies(buildtests_cxx stats_test)
@ -11711,6 +11712,44 @@ target_link_libraries(server_crash_test_client
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(server_early_return_test
test/cpp/end2end/server_early_return_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(server_early_return_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(server_early_return_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc++_test_util
grpc_test_util
grpc++
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(server_request_call_test
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.pb.cc
${_gRPC_PROTO_GENS_DIR}/src/proto/grpc/testing/echo_messages.grpc.pb.cc

@ -1179,6 +1179,7 @@ server_builder_test: $(BINDIR)/$(CONFIG)/server_builder_test
server_context_test_spouse_test: $(BINDIR)/$(CONFIG)/server_context_test_spouse_test
server_crash_test: $(BINDIR)/$(CONFIG)/server_crash_test
server_crash_test_client: $(BINDIR)/$(CONFIG)/server_crash_test_client
server_early_return_test: $(BINDIR)/$(CONFIG)/server_early_return_test
server_request_call_test: $(BINDIR)/$(CONFIG)/server_request_call_test
shutdown_test: $(BINDIR)/$(CONFIG)/shutdown_test
stats_test: $(BINDIR)/$(CONFIG)/stats_test
@ -1622,6 +1623,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_context_test_spouse_test \
$(BINDIR)/$(CONFIG)/server_crash_test \
$(BINDIR)/$(CONFIG)/server_crash_test_client \
$(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_request_call_test \
$(BINDIR)/$(CONFIG)/shutdown_test \
$(BINDIR)/$(CONFIG)/stats_test \
@ -1754,6 +1756,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/server_context_test_spouse_test \
$(BINDIR)/$(CONFIG)/server_crash_test \
$(BINDIR)/$(CONFIG)/server_crash_test_client \
$(BINDIR)/$(CONFIG)/server_early_return_test \
$(BINDIR)/$(CONFIG)/server_request_call_test \
$(BINDIR)/$(CONFIG)/shutdown_test \
$(BINDIR)/$(CONFIG)/stats_test \
@ -2171,6 +2174,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/server_context_test_spouse_test || ( echo test server_context_test_spouse_test failed ; exit 1 )
$(E) "[RUN] Testing server_crash_test"
$(Q) $(BINDIR)/$(CONFIG)/server_crash_test || ( echo test server_crash_test failed ; exit 1 )
$(E) "[RUN] Testing server_early_return_test"
$(Q) $(BINDIR)/$(CONFIG)/server_early_return_test || ( echo test server_early_return_test failed ; exit 1 )
$(E) "[RUN] Testing server_request_call_test"
$(Q) $(BINDIR)/$(CONFIG)/server_request_call_test || ( echo test server_request_call_test failed ; exit 1 )
$(E) "[RUN] Testing shutdown_test"
@ -16990,6 +16995,49 @@ endif
endif
SERVER_EARLY_RETURN_TEST_SRC = \
test/cpp/end2end/server_early_return_test.cc \
SERVER_EARLY_RETURN_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(SERVER_EARLY_RETURN_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/server_early_return_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/server_early_return_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/server_early_return_test: $(PROTOBUF_DEP) $(SERVER_EARLY_RETURN_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(SERVER_EARLY_RETURN_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/server_early_return_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/server_early_return_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_server_early_return_test: $(SERVER_EARLY_RETURN_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(SERVER_EARLY_RETURN_TEST_OBJS:.o=.dep)
endif
endif
SERVER_REQUEST_CALL_TEST_SRC = \
$(GENDIR)/src/proto/grpc/testing/echo_messages.pb.cc $(GENDIR)/src/proto/grpc/testing/echo_messages.grpc.pb.cc \
$(GENDIR)/src/proto/grpc/testing/echo.pb.cc $(GENDIR)/src/proto/grpc/testing/echo.grpc.pb.cc \

@ -4704,6 +4704,19 @@ targets:
- grpc
- gpr_test_util
- gpr
- name: server_early_return_test
gtest: true
build: test
language: c++
src:
- test/cpp/end2end/server_early_return_test.cc
deps:
- grpc++_test_util
- grpc_test_util
- grpc++
- grpc
- gpr_test_util
- gpr
- name: server_request_call_test
gtest: true
build: test

@ -1886,7 +1886,7 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
grpc_chttp2_maybe_complete_recv_message(t, s);
if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
if (s->seen_error || !t->is_client) {
grpc_slice_buffer_reset_and_unref_internal(&s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(

@ -119,6 +119,24 @@ grpc_cc_library(
],
)
grpc_cc_test(
name = "server_early_return_test",
srcs = ["server_early_return_test.cc"],
external_deps = [
"gtest",
],
deps = [
"//:gpr",
"//:grpc",
"//:grpc++",
"//src/proto/grpc/testing:echo_messages_proto",
"//src/proto/grpc/testing:echo_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
"//test/cpp/util:test_util",
],
)
grpc_cc_test(
name = "end2end_test",
deps = [

@ -1087,31 +1087,6 @@ TEST_P(End2endTest, RpcMaxMessageSize) {
EXPECT_FALSE(s.ok());
}
// Client sends 20 requests and the server returns CANCELLED status after
// reading 10 requests.
TEST_P(End2endTest, RequestStreamServerEarlyCancelTest) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerCancelAfterReads, "10");
auto stream = stub_->RequestStream(&context, &response);
request.set_message("hello");
int send_messages = 20;
while (send_messages > 10) {
EXPECT_TRUE(stream->Write(request));
send_messages--;
}
while (send_messages > 0) {
stream->Write(request);
send_messages--;
}
stream->WritesDone();
Status s = stream->Finish();
EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
}
void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
gpr_event* ev) {
EchoResponse resp;

@ -0,0 +1,233 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
#include <grpc++/security/credentials.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/string_ref_helper.h"
#include <gtest/gtest.h>
namespace grpc {
namespace testing {
namespace {
const char kServerReturnStatusCode[] = "server_return_status_code";
const char kServerDelayBeforeReturnUs[] = "server_delay_before_return_us";
const char kServerReturnAfterNReads[] = "server_return_after_n_reads";
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
// Unused methods are not implemented.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
EchoResponse* response) override {
int server_return_status_code =
GetIntValueFromMetadata(context, kServerReturnStatusCode, 0);
int server_delay_before_return_us =
GetIntValueFromMetadata(context, kServerDelayBeforeReturnUs, 0);
int server_return_after_n_reads =
GetIntValueFromMetadata(context, kServerReturnAfterNReads, 0);
EchoRequest request;
while (server_return_after_n_reads--) {
EXPECT_TRUE(reader->Read(&request));
}
response->set_message("response msg");
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(server_delay_before_return_us, GPR_TIMESPAN)));
return Status(static_cast<StatusCode>(server_return_status_code), "");
}
Status BidiStream(
ServerContext* context,
ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
int server_return_status_code =
GetIntValueFromMetadata(context, kServerReturnStatusCode, 0);
int server_delay_before_return_us =
GetIntValueFromMetadata(context, kServerDelayBeforeReturnUs, 0);
int server_return_after_n_reads =
GetIntValueFromMetadata(context, kServerReturnAfterNReads, 0);
EchoRequest request;
EchoResponse response;
while (server_return_after_n_reads--) {
EXPECT_TRUE(stream->Read(&request));
response.set_message(request.message());
EXPECT_TRUE(stream->Write(response));
}
gpr_sleep_until(gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_micros(server_delay_before_return_us, GPR_TIMESPAN)));
return Status(static_cast<StatusCode>(server_return_status_code), "");
}
int GetIntValueFromMetadata(ServerContext* context, const char* key,
int default_value) {
auto metadata = context->client_metadata();
if (metadata.find(key) != metadata.end()) {
std::istringstream iss(ToString(metadata.find(key)->second));
iss >> default_value;
}
return default_value;
}
};
class ServerEarlyReturnTest : public ::testing::Test {
protected:
ServerEarlyReturnTest() : picked_port_(0) {}
void SetUp() override {
int port = grpc_pick_unused_port_or_die();
picked_port_ = port;
server_address_ << "127.0.0.1:" << port;
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
channel_ =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
}
void TearDown() override {
server_->Shutdown();
if (picked_port_ > 0) {
grpc_recycle_unused_port(picked_port_);
}
}
// Client sends 20 requests and the server returns after reading 10 requests.
// If return_cancel is true, server returns CANCELLED status. Otherwise it
// returns OK.
void DoBidiStream(bool return_cancelled) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerReturnAfterNReads, "10");
if (return_cancelled) {
// "1" means CANCELLED
context.AddMetadata(kServerReturnStatusCode, "1");
}
context.AddMetadata(kServerDelayBeforeReturnUs, "10000");
auto stream = stub_->BidiStream(&context);
for (int i = 0; i < 20; i++) {
request.set_message(grpc::string("hello") + grpc::to_string(i));
bool write_ok = stream->Write(request);
bool read_ok = stream->Read(&response);
if (i < 10) {
EXPECT_TRUE(write_ok);
EXPECT_TRUE(read_ok);
EXPECT_EQ(response.message(), request.message());
} else {
EXPECT_FALSE(read_ok);
}
}
stream->WritesDone();
EXPECT_FALSE(stream->Read(&response));
Status s = stream->Finish();
if (return_cancelled) {
EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
} else {
EXPECT_TRUE(s.ok());
}
}
void DoRequestStream(bool return_cancelled) {
EchoRequest request;
EchoResponse response;
ClientContext context;
context.AddMetadata(kServerReturnAfterNReads, "10");
if (return_cancelled) {
// "1" means CANCELLED
context.AddMetadata(kServerReturnStatusCode, "1");
}
context.AddMetadata(kServerDelayBeforeReturnUs, "10000");
auto stream = stub_->RequestStream(&context, &response);
for (int i = 0; i < 20; i++) {
request.set_message(grpc::string("hello") + grpc::to_string(i));
bool written = stream->Write(request);
if (i < 10) {
EXPECT_TRUE(written);
}
}
stream->WritesDone();
Status s = stream->Finish();
if (return_cancelled) {
EXPECT_EQ(s.error_code(), StatusCode::CANCELLED);
} else {
EXPECT_TRUE(s.ok());
}
}
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
std::ostringstream server_address_;
TestServiceImpl service_;
int picked_port_;
};
TEST_F(ServerEarlyReturnTest, BidiStreamEarlyOk) { DoBidiStream(false); }
TEST_F(ServerEarlyReturnTest, BidiStreamEarlyCancel) { DoBidiStream(true); }
TEST_F(ServerEarlyReturnTest, RequestStreamEarlyOK) { DoRequestStream(false); }
TEST_F(ServerEarlyReturnTest, RequestStreamEarlyCancel) {
DoRequestStream(true);
}
} // namespace
} // namespace testing
} // namespace grpc
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -181,13 +181,6 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
int server_try_cancel = GetIntValueFromMetadata(
kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
// If 'cancel_after_reads' is set in the metadata AND non-zero, the server
// will cancel the RPC (by just returning Status::CANCELLED - doesn't call
// ServerContext::TryCancel()) after reading the number of records specified
// by the 'cancel_after_reads' value set in the metadata.
int cancel_after_reads = GetIntValueFromMetadata(
kServerCancelAfterReads, context->client_metadata(), 0);
EchoRequest request;
response->set_message("");
@ -204,12 +197,6 @@ Status TestServiceImpl::RequestStream(ServerContext* context,
int num_msgs_read = 0;
while (reader->Read(&request)) {
if (cancel_after_reads == 1) {
gpr_log(GPR_INFO, "return cancel status");
return Status::CANCELLED;
} else if (cancel_after_reads > 0) {
cancel_after_reads--;
}
response->mutable_message()->append(request.message());
}
gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);

@ -31,7 +31,6 @@ namespace testing {
const int kServerDefaultResponseStreamsToSend = 3;
const char* const kServerResponseStreamsToSend = "server_responses_to_send";
const char* const kServerCancelAfterReads = "cancel_after_reads";
const char* const kServerTryCancelRequest = "server_try_cancel";
const char* const kDebugInfoTrailerKey = "debug-info-bin";
const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";

@ -4118,6 +4118,25 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc++",
"grpc++_test_util",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "server_early_return_test",
"src": [
"test/cpp/end2end/server_early_return_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -4383,6 +4383,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "server_early_return_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save