diff --git a/CMakeLists.txt b/CMakeLists.txt index ca5471ce3c8..efa7230862a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -864,6 +864,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx global_config_test) add_dependencies(buildtests_cxx google_c2p_resolver_test) add_dependencies(buildtests_cxx google_mesh_ca_certificate_provider_factory_test) + add_dependencies(buildtests_cxx graceful_shutdown_test) add_dependencies(buildtests_cxx grpc_authorization_engine_test) add_dependencies(buildtests_cxx grpc_authorization_policy_provider_test) add_dependencies(buildtests_cxx grpc_authz_end2end_test) @@ -10690,6 +10691,42 @@ target_link_libraries(google_mesh_ca_certificate_provider_factory_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(graceful_shutdown_test + test/core/end2end/cq_verifier.cc + test/core/transport/chttp2/graceful_shutdown_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(graceful_shutdown_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(graceful_shutdown_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index b7992335411..28993cf44fd 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -5826,6 +5826,17 @@ targets: - test/core/xds/google_mesh_ca_certificate_provider_factory_test.cc deps: - grpc_test_util +- name: graceful_shutdown_test + gtest: true + build: test + language: c++ + headers: + - test/core/end2end/cq_verifier.h + src: + - test/core/end2end/cq_verifier.cc + - test/core/transport/chttp2/graceful_shutdown_test.cc + deps: + - grpc_test_util - name: grpc_authorization_engine_test gtest: true build: test diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 1d6facdd176..eb91e1f10cb 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1006,8 +1006,8 @@ static void write_action_end_locked(void* tp, grpc_error_handle error) { closed = true; } - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED) { - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SENT; + if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) { + t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT; closed = true; if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { close_transport_locked( @@ -1759,18 +1759,120 @@ void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) { } } +namespace { + +// Fire and forget (deletes itself on completion). Does a graceful shutdown by +// sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping +// and waiting for an ack (effective waiting for an RTT) and then sending a +// final GOAWAY freame with an updated last stream identifier. This helps ensure +// that a connection can be cleanly shut down without losing requests. +// In the event, that the client does not respond to the ping for some reason, +// we add a 20 second deadline, after which we send the second goaway. +class GracefulGoaway : public grpc_core::RefCounted { + public: + static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); } + + ~GracefulGoaway() override { + GRPC_CHTTP2_UNREF_TRANSPORT(t_, "graceful goaway"); + } + + private: + explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t) { + t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY; + GRPC_CHTTP2_REF_TRANSPORT(t_, "graceful goaway"); + grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf); + send_ping_locked( + t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr)); + grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); + Ref().release(); // Ref for the timer + grpc_timer_init( + &timer_, + grpc_core::ExecCtx::Get()->Now() + grpc_core::Duration::Seconds(20), + GRPC_CLOSURE_INIT(&on_timer_, OnTimer, this, nullptr)); + } + + void MaybeSendFinalGoawayLocked() { + if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) { + // We already sent the final GOAWAY. + return; + } + if (t_->destroying || t_->closed_with_error != GRPC_ERROR_NONE) { + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_INFO, + "transport:%p %s peer:%s Transport already shutting down. " + "Graceful GOAWAY abandoned.", + t_, t_->is_client ? "CLIENT" : "SERVER", t_->peer_string.c_str())); + return; + } + // Ping completed. Send final goaway. + GRPC_CHTTP2_IF_TRACING( + gpr_log(GPR_INFO, + "transport:%p %s peer:%s Graceful shutdown: Ping received. " + "Sending final GOAWAY with stream_id:%d", + t_, t_->is_client ? "CLIENT" : "SERVER", + t_->peer_string.c_str(), t_->last_new_stream_id)); + t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED; + grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(), + &t_->qbuf); + grpc_chttp2_initiate_write(t_, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); + } + + static void OnPingAck(void* arg, grpc_error_handle /* error */) { + auto* self = static_cast(arg); + self->t_->combiner->Run( + GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr), + GRPC_ERROR_NONE); + } + + static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) { + auto* self = static_cast(arg); + grpc_timer_cancel(&self->timer_); + self->MaybeSendFinalGoawayLocked(); + self->Unref(); + } + + static void OnTimer(void* arg, grpc_error_handle error) { + auto* self = static_cast(arg); + if (error != GRPC_ERROR_NONE) { + self->Unref(); + return; + } + self->t_->combiner->Run( + GRPC_CLOSURE_INIT(&self->on_timer_, OnTimerLocked, self, nullptr), + GRPC_ERROR_NONE); + } + + static void OnTimerLocked(void* arg, grpc_error_handle /* error */) { + auto* self = static_cast(arg); + self->MaybeSendFinalGoawayLocked(); + self->Unref(); + } + + grpc_chttp2_transport* t_; + grpc_closure on_ping_ack_; + grpc_timer timer_; + grpc_closure on_timer_; +}; + +} // namespace + static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error) { - // We want to log this irrespective of whether http tracing is enabled - gpr_log(GPR_DEBUG, "%s: Sending goaway err=%s", t->peer_string.c_str(), - grpc_error_std_string(error).c_str()); - t->sent_goaway_state = GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED; grpc_http2_error_code http_error; std::string message; grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr, &message, &http_error, nullptr); - grpc_chttp2_goaway_append( - t->last_new_stream_id, static_cast(http_error), - grpc_slice_from_cpp_string(std::move(message)), &t->qbuf); + if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR) { + // Do a graceful shutdown. + GracefulGoaway::Start(t); + } else { + // We want to log this irrespective of whether http tracing is enabled + gpr_log(GPR_DEBUG, "%s: Sending goaway err=%s", t->peer_string.c_str(), + grpc_error_std_string(error).c_str()); + t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED; + grpc_chttp2_goaway_append( + t->last_new_stream_id, static_cast(http_error), + grpc_slice_from_cpp_string(std::move(message)), &t->qbuf); + } grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT); GRPC_ERROR_UNREF(error); } @@ -1999,7 +2101,7 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { post_benign_reclaimer(t); - if (t->sent_goaway_state == GRPC_CHTTP2_GOAWAY_SENT) { + if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) { close_transport_locked( t, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Last stream closed after sending GOAWAY", &error, 1)); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index fe2b437552f..48c62ef1d77 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -196,8 +196,9 @@ typedef enum { typedef enum { GRPC_CHTTP2_NO_GOAWAY_SEND, - GRPC_CHTTP2_GOAWAY_SEND_SCHEDULED, - GRPC_CHTTP2_GOAWAY_SENT, + GRPC_CHTTP2_GRACEFUL_GOAWAY, + GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED, + GRPC_CHTTP2_FINAL_GOAWAY_SENT, } grpc_chttp2_sent_goaway_state; typedef struct grpc_chttp2_write_cb { diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 1864a416a1e..7cac0850834 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -479,6 +479,14 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t, t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) { return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded"); + } else if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) { + GRPC_CHTTP2_IF_TRACING(gpr_log( + GPR_INFO, + "transport:%p SERVER peer:%s Final GOAWAY sent. Ignoring new " + "grpc_chttp2_stream request id=%d, last grpc_chttp2_stream id=%d", + t, t->peer_string.c_str(), t->incoming_stream_id, + t->last_new_stream_id)); + return init_header_skip_frame_parser(t, priority_type); } t->last_new_stream_id = t->incoming_stream_id; s = t->incoming_stream = diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 0c7b6760d6d..7a89986bfa7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -87,9 +87,10 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) { ? grpc_core::Duration::Hours(2) : grpc_core::Duration::Seconds(1); /* A second is added to deal with network delays and timing imprecision */ - } else { + } else if (t->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) { // The gRPC keepalive spec doesn't call for any throttling on the server - // side, but we are adding some throttling for protection anyway. + // side, but we are adding some throttling for protection anyway, unless + // we are doing a graceful GOAWAY in which case we don't want to wait. next_allowed_ping_interval = t->keepalive_time == grpc_core::Duration::Infinity() ? grpc_core::Duration::Seconds(20) diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index a276d9c6d50..4c0220837ee 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -479,7 +479,6 @@ class ChannelBroadcaster { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK) : GRPC_ERROR_NONE; - op->set_accept_stream = true; sc->slice = grpc_slice_from_copied_string("Server shutdown"); op->disconnect_with_error = send_disconnect; elem = diff --git a/test/core/transport/chttp2/BUILD b/test/core/transport/chttp2/BUILD index ffaa0e1a331..31e5c556c74 100644 --- a/test/core/transport/chttp2/BUILD +++ b/test/core/transport/chttp2/BUILD @@ -98,6 +98,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "graceful_shutdown_test", + srcs = ["graceful_shutdown_test.cc"], + external_deps = ["gtest"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/end2end:cq_verifier", + "//test/core/util:grpc_test_util", + ], +) + grpc_cc_test( name = "hpack_encoder_test", srcs = ["hpack_encoder_test.cc"], diff --git a/test/core/transport/chttp2/graceful_shutdown_test.cc b/test/core/transport/chttp2/graceful_shutdown_test.cc new file mode 100644 index 00000000000..89184651bd0 --- /dev/null +++ b/test/core/transport/chttp2/graceful_shutdown_test.cc @@ -0,0 +1,424 @@ +// +// +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +#include + +#include +#include + +#include + +#include + +#include "absl/synchronization/mutex.h" +#include "absl/synchronization/notification.h" + +#include +#include +#include + +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/ext/transport/chttp2/transport/frame_goaway.h" +#include "src/core/ext/transport/chttp2/transport/frame_ping.h" +#include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/config/core_configuration.h" +#include "src/core/lib/gprpp/host_port.h" +#include "src/core/lib/iomgr/endpoint_pair.h" +#include "src/core/lib/slice/slice.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/server.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/core/util/test_tcp_server.h" + +namespace grpc_core { +namespace { + +void* Tag(intptr_t t) { return reinterpret_cast(t); } + +class GracefulShutdownTest : public ::testing::Test { + protected: + GracefulShutdownTest() { SetupAndStart(); } + + ~GracefulShutdownTest() override { ShutdownAndDestroy(); } + + // Sets up the client and server + void SetupAndStart() { + ExecCtx exec_ctx; + cq_ = grpc_completion_queue_create_for_next(nullptr); + cqv_ = cq_verifier_create(cq_); + grpc_arg server_args[] = { + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_HTTP2_BDP_PROBE), 0), + grpc_channel_arg_integer_create( + const_cast(GRPC_ARG_KEEPALIVE_TIME_MS), INT_MAX)}; + grpc_channel_args server_channel_args = {GPR_ARRAY_SIZE(server_args), + server_args}; + // Create server + server_ = grpc_server_create(&server_channel_args, nullptr); + auto* core_server = Server::FromC(server_); + grpc_server_register_completion_queue(server_, cq_, nullptr); + grpc_server_start(server_); + fds_ = grpc_iomgr_create_endpoint_pair("fixture", nullptr); + auto* transport = grpc_create_chttp2_transport(core_server->channel_args(), + fds_.server, false); + grpc_endpoint_add_to_pollset(fds_.server, grpc_cq_pollset(cq_)); + GPR_ASSERT(core_server->SetupTransport(transport, nullptr, + core_server->channel_args(), + nullptr) == GRPC_ERROR_NONE); + grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); + // Start polling on the client + absl::Notification client_poller_thread_started_notification; + client_poll_thread_ = absl::make_unique( + [this, &client_poller_thread_started_notification]() { + grpc_completion_queue* client_cq = + grpc_completion_queue_create_for_next(nullptr); + { + ExecCtx exec_ctx; + grpc_endpoint_add_to_pollset(fds_.client, + grpc_cq_pollset(client_cq)); + grpc_endpoint_add_to_pollset(fds_.server, + grpc_cq_pollset(client_cq)); + } + client_poller_thread_started_notification.Notify(); + while (!shutdown_) { + GPR_ASSERT(grpc_completion_queue_next( + client_cq, grpc_timeout_milliseconds_to_deadline(10), + nullptr) + .type == GRPC_QUEUE_TIMEOUT); + } + grpc_completion_queue_destroy(client_cq); + }); + client_poller_thread_started_notification.WaitForNotification(); + // Write connection prefix and settings frame + constexpr char kPrefix[] = + "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00"; + Write(absl::string_view(kPrefix, sizeof(kPrefix) - 1)); + // Start reading on the client + grpc_slice_buffer_init(&read_buffer_); + GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr); + grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false); + } + + // Shuts down and destroys the client and server. + void ShutdownAndDestroy() { + shutdown_ = true; + ExecCtx exec_ctx; + grpc_endpoint_shutdown( + fds_.client, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Client shutdown")); + ExecCtx::Get()->Flush(); + client_poll_thread_->join(); + GPR_ASSERT(read_end_notification_.WaitForNotificationWithTimeout( + absl::Seconds(5))); + grpc_endpoint_destroy(fds_.client); + ExecCtx::Get()->Flush(); + // Shutdown and destroy server + grpc_server_shutdown_and_notify(server_, cq_, Tag(1000)); + CQ_EXPECT_COMPLETION(cqv_, Tag(1000), true); + cq_verify(cqv_); + grpc_server_destroy(server_); + cq_verifier_destroy(cqv_); + grpc_completion_queue_destroy(cq_); + } + + static void OnReadDone(void* arg, grpc_error_handle error) { + GracefulShutdownTest* self = static_cast(arg); + if (error == GRPC_ERROR_NONE) { + { + absl::MutexLock lock(&self->mu_); + for (size_t i = 0; i < self->read_buffer_.count; ++i) { + absl::StrAppend(&self->read_bytes_, + StringViewFromSlice(self->read_buffer_.slices[i])); + } + self->read_cv_.SignalAll(); + } + grpc_slice_buffer_reset_and_unref(&self->read_buffer_); + grpc_endpoint_read(self->fds_.client, &self->read_buffer_, + &self->on_read_done_, false); + } else { + grpc_slice_buffer_destroy(&self->read_buffer_); + self->read_end_notification_.Notify(); + } + } + + // Waits for \a bytes to show up in read_bytes_ + void WaitForReadBytes(absl::string_view bytes) { + std::atomic done{false}; + { + absl::MutexLock lock(&mu_); + while (!absl::StrContains(read_bytes_, bytes)) { + read_cv_.WaitWithTimeout(&mu_, absl::Seconds(5)); + } + } + done = true; + } + + void WaitForGoaway(uint32_t last_stream_id) { + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_chttp2_goaway_append(last_stream_id, 0, grpc_empty_slice(), &buffer); + std::string expected_bytes; + for (size_t i = 0; i < buffer.count; ++i) { + absl::StrAppend(&expected_bytes, StringViewFromSlice(buffer.slices[i])); + } + grpc_slice_buffer_destroy(&buffer); + WaitForReadBytes(expected_bytes); + } + + void WaitForPing(uint64_t opaque_data) { + grpc_slice ping_slice = grpc_chttp2_ping_create(0, opaque_data); + WaitForReadBytes(StringViewFromSlice(ping_slice)); + } + + void SendPingAck(uint64_t opaque_data) { + grpc_slice ping_slice = grpc_chttp2_ping_create(1, opaque_data); + Write(StringViewFromSlice(ping_slice)); + } + + // This is a blocking call. It waits for the write callback to be invoked + // before returning. (In other words, do not call this from a thread that + // should not be blocked, for example, a polling thread.) + void Write(absl::string_view bytes) { + ExecCtx exec_ctx; + grpc_slice slice = + StaticSlice::FromStaticBuffer(bytes.data(), bytes.size()).TakeCSlice(); + grpc_slice_buffer buffer; + grpc_slice_buffer_init(&buffer); + grpc_slice_buffer_add(&buffer, slice); + WriteBuffer(&buffer); + grpc_slice_buffer_destroy(&buffer); + } + + void WriteBuffer(grpc_slice_buffer* buffer) { + absl::Notification on_write_done_notification_; + GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone, + &on_write_done_notification_, nullptr); + grpc_endpoint_write(fds_.client, buffer, &on_write_done_, nullptr); + ExecCtx::Get()->Flush(); + GPR_ASSERT(on_write_done_notification_.WaitForNotificationWithTimeout( + absl::Seconds(5))); + } + + static void OnWriteDone(void* arg, grpc_error_handle error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + absl::Notification* on_write_done_notification_ = + static_cast(arg); + on_write_done_notification_->Notify(); + } + + grpc_endpoint_pair fds_; + grpc_server* server_ = nullptr; + grpc_completion_queue* cq_ = nullptr; + cq_verifier* cqv_ = nullptr; + std::unique_ptr client_poll_thread_; + std::atomic shutdown_{false}; + grpc_closure on_read_done_; + absl::Mutex mu_; + absl::CondVar read_cv_; + absl::Notification read_end_notification_; + grpc_slice_buffer read_buffer_; + std::string read_bytes_ ABSL_GUARDED_BY(mu_); + grpc_closure on_write_done_; +}; + +TEST_F(GracefulShutdownTest, GracefulGoaway) { + // Initiate shutdown on the server + grpc_server_shutdown_and_notify(server_, cq_, Tag(1)); + // Wait for first goaway + WaitForGoaway((1u << 31) - 1); + // Wait for the ping + WaitForPing(0); + // Reply to the ping + SendPingAck(0); + // Wait for final goaway + WaitForGoaway(0); + // The shutdown should successfully complete. + CQ_EXPECT_COMPLETION(cqv_, Tag(1), true); + cq_verify(cqv_); +} + +TEST_F(GracefulShutdownTest, RequestStartedBeforeFinalGoaway) { + grpc_call_error error; + grpc_call* s; + grpc_call_details call_details; + grpc_metadata_array request_metadata_recv; + grpc_call_details_init(&call_details); + grpc_metadata_array_init(&request_metadata_recv); + error = grpc_server_request_call(server_, &s, &call_details, + &request_metadata_recv, cq_, cq_, Tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + // Initiate shutdown on the server + grpc_server_shutdown_and_notify(server_, cq_, Tag(1)); + // Wait for first goaway + WaitForGoaway((1u << 31) - 1); + // Wait for the ping + WaitForPing(0); + // Start a request + constexpr char kRequestFrame[] = + "\x00\x00\xbe\x01\x05\x00\x00\x00\x01" + "\x10\x05:path\x08/foo/bar" + "\x10\x07:scheme\x04http" + "\x10\x07:method\x04POST" + "\x10\x0a:authority\x09localhost" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip" + "\x10\x02te\x08trailers" + "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)"; + Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1)); + // Reply to the ping + SendPingAck(0); + // Wait for final goaway with last stream ID 1 to show that the HTTP2 + // transport accepted the stream. + WaitForGoaway(1); + // TODO(yashykt): The surface layer automatically cancels calls received after + // shutdown has been called. Once that is fixed, this should be a success. + CQ_EXPECT_COMPLETION(cqv_, Tag(100), 0); + // The shutdown should successfully complete. + CQ_EXPECT_COMPLETION(cqv_, Tag(1), true); + cq_verify(cqv_); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); +} + +TEST_F(GracefulShutdownTest, RequestStartedAfterFinalGoawayIsIgnored) { + // Start a request before shutdown to make sure that the connection stays + // alive. + grpc_call_error error; + grpc_call* s; + grpc_call_details call_details; + grpc_metadata_array request_metadata_recv; + grpc_call_details_init(&call_details); + grpc_metadata_array_init(&request_metadata_recv); + error = grpc_server_request_call(server_, &s, &call_details, + &request_metadata_recv, cq_, cq_, Tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + // Send the request from the client. + constexpr char kRequestFrame[] = + "\x00\x00\xbe\x01\x05\x00\x00\x00\x01" + "\x10\x05:path\x08/foo/bar" + "\x10\x07:scheme\x04http" + "\x10\x07:method\x04POST" + "\x10\x0a:authority\x09localhost" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip" + "\x10\x02te\x08trailers" + "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)"; + Write(absl::string_view(kRequestFrame, sizeof(kRequestFrame) - 1)); + CQ_EXPECT_COMPLETION(cqv_, Tag(100), 1); + cq_verify(cqv_); + + // Initiate shutdown on the server + grpc_server_shutdown_and_notify(server_, cq_, Tag(1)); + // Wait for first goaway + WaitForGoaway((1u << 31) - 1); + // Wait for the ping + WaitForPing(0); + // Reply to the ping + SendPingAck(0); + // Wait for final goaway + WaitForGoaway(1); + + // Send another request from the client which should be ignored. + constexpr char kNewRequestFrame[] = + "\x00\x00\xbe\x01\x05\x00\x00\x00\x03" + "\x10\x05:path\x08/foo/bar" + "\x10\x07:scheme\x04http" + "\x10\x07:method\x04POST" + "\x10\x0a:authority\x09localhost" + "\x10\x0c" + "content-type\x10" + "application/grpc" + "\x10\x14grpc-accept-encoding\x15identity,deflate,gzip" + "\x10\x02te\x08trailers" + "\x10\x0auser-agent\x17grpc-c/0.12.0.0 (linux)"; + Write(absl::string_view(kNewRequestFrame, sizeof(kNewRequestFrame) - 1)); + + // Finish the accepted request. + grpc_op ops[3]; + grpc_op* op; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED; + grpc_slice status_details = grpc_slice_from_static_string("xyz"); + op->data.send_status_from_server.status_details = &status_details; + op->flags = 0; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + int was_cancelled = 2; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = nullptr; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), Tag(101), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv_, Tag(101), true); + // The shutdown should successfully complete. + CQ_EXPECT_COMPLETION(cqv_, Tag(1), true); + cq_verify(cqv_); + grpc_call_unref(s); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); +} + +// Make sure that the graceful goaway eventually makes progress even if a client +// does not respond to the ping. +TEST_F(GracefulShutdownTest, UnresponsiveClient) { + absl::Time initial_time = absl::Now(); + // Initiate shutdown on the server + grpc_server_shutdown_and_notify(server_, cq_, Tag(1)); + // Wait for first goaway + WaitForGoaway((1u << 31) - 1); + // Wait for the ping + WaitForPing(0); + // Wait for final goaway without sending a ping ACK. + WaitForGoaway(0); + EXPECT_GE(absl::Now() - initial_time, + absl::Seconds(20) - + absl::Seconds( + 1) /* clock skew between threads due to time caching */); + // The shutdown should successfully complete. + CQ_EXPECT_COMPLETION(cqv_, Tag(1), true); + cq_verify(cqv_); +} + +} // namespace +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + grpc_init(); + int result = RUN_ALL_TESTS(); + grpc_shutdown(); + return result; +} diff --git a/test/core/transport/chttp2/simple_request.headers b/test/core/transport/chttp2/simple_request.headers new file mode 100644 index 00000000000..6ca29e21a82 --- /dev/null +++ b/test/core/transport/chttp2/simple_request.headers @@ -0,0 +1,12 @@ +# headers used in graceful_shutdown_test.cc +# use tools/codegen/core/gen_header_frame.py to generate the binary strings +# contained in the source code +:path: /foo/bar +:scheme: http +:method: POST +:authority: localhost +content-type: application/grpc +grpc-accept-encoding: identity,deflate,gzip +te: trailers +user-agent: grpc-c/0.12.0.0 (linux) + diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 3e67534a1a8..58fd973c97a 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -4427,6 +4427,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "graceful_shutdown_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,