diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 5cf197563ca..5adbd772f69 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -26,41 +26,15 @@ #include #include "src/core/lib/debug/trace.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/resource_quota/api.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/transport/error_utils.h" #include "src/proto/grpc/health/v1/health.upb.h" -#define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1 -#define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6 -#define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120 -#define HEALTH_CHECK_RECONNECT_JITTER 0.2 - namespace grpc_core { TraceFlag grpc_health_check_client_trace(false, "health_check_client"); namespace { -// Returns true if healthy. -absl::StatusOr DecodeResponse(char* message, size_t size) { - // If message is empty, assume unhealthy. - if (size == 0) { - return absl::InvalidArgumentError("health check response was empty"); - } - // Deserialize message. - upb::Arena arena; - auto* response_struct = grpc_health_v1_HealthCheckResponse_parse( - reinterpret_cast(message), size, arena.ptr()); - if (response_struct == nullptr) { - // Can't parse message; assume unhealthy. - return absl::InvalidArgumentError("cannot parse health check response"); - } - int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct); - return status == grpc_health_v1_HealthCheckResponse_SERVING; -} - class HealthStreamEventHandler : public SubchannelStreamClient::CallEventHandler { public: @@ -101,18 +75,22 @@ class HealthStreamEventHandler return request_slice; } - void RecvMessageReadyLocked(SubchannelStreamClient* client, char* message, - size_t size) override { - auto healthy = DecodeResponse(message, size); + absl::Status RecvMessageReadyLocked( + SubchannelStreamClient* client, + absl::string_view serialized_message) override { + auto healthy = DecodeResponse(serialized_message); if (!healthy.ok()) { SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE, healthy.status().ToString().c_str()); - } else if (!*healthy) { + return healthy.status(); + } + if (!*healthy) { SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE, "backend unhealthy"); } else { SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK"); } + return absl::OkStatus(); } void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client, @@ -132,6 +110,21 @@ class HealthStreamEventHandler } private: + // Returns true if healthy. + static absl::StatusOr DecodeResponse( + absl::string_view serialized_message) { + // Deserialize message. + upb::Arena arena; + auto* response = grpc_health_v1_HealthCheckResponse_parse( + serialized_message.data(), serialized_message.size(), arena.ptr()); + if (response == nullptr) { + // Can't parse message; assume unhealthy. + return absl::InvalidArgumentError("cannot parse health check response"); + } + int32_t status = grpc_health_v1_HealthCheckResponse_status(response); + return status == grpc_health_v1_HealthCheckResponse_SERVING; + } + void SetHealthStatusLocked(SubchannelStreamClient* client, grpc_connectivity_state state, const char* reason) { diff --git a/src/core/ext/filters/client_channel/subchannel_stream_client.cc b/src/core/ext/filters/client_channel/subchannel_stream_client.cc index 17fd26c6514..a596e044568 100644 --- a/src/core/ext/filters/client_channel/subchannel_stream_client.cc +++ b/src/core/ext/filters/client_channel/subchannel_stream_client.cc @@ -391,9 +391,22 @@ void SubchannelStreamClient::CallState::DoneReadingRecvMessage( { MutexLock lock(&subchannel_stream_client_->mu_); if (subchannel_stream_client_->event_handler_ != nullptr) { - subchannel_stream_client_->event_handler_->RecvMessageReadyLocked( - subchannel_stream_client_.get(), + absl::string_view serialized_message( reinterpret_cast(recv_message), recv_message_buffer_.length); + absl::Status status = + subchannel_stream_client_->event_handler_->RecvMessageReadyLocked( + subchannel_stream_client_.get(), serialized_message); + if (!status.ok()) { + if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) { + gpr_log(GPR_INFO, + "%s %p: SubchannelStreamClient CallState %p: failed to " + "parse response message: %s", + subchannel_stream_client_->tracer_, + subchannel_stream_client_.get(), this, + status.ToString().c_str()); + } + Cancel(); + } } } seen_response_.store(true, std::memory_order_release); diff --git a/src/core/ext/filters/client_channel/subchannel_stream_client.h b/src/core/ext/filters/client_channel/subchannel_stream_client.h index 65277eeecdd..b48ab223e4c 100644 --- a/src/core/ext/filters/client_channel/subchannel_stream_client.h +++ b/src/core/ext/filters/client_channel/subchannel_stream_client.h @@ -21,6 +21,9 @@ #include +#include "absl/status/status.h" +#include "absl/strings/string_view.h" + #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/gprpp/orphanable.h" @@ -73,8 +76,8 @@ class SubchannelStreamClient virtual grpc_slice EncodeSendMessageLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; // Called whenever a message is received from the server. - virtual void RecvMessageReadyLocked(SubchannelStreamClient* client, - char* message, size_t size) + virtual absl::Status RecvMessageReadyLocked( + SubchannelStreamClient* client, absl::string_view serialized_message) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0; // Called when a stream fails. virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client, diff --git a/test/core/transport/chttp2/graceful_shutdown_test.cc b/test/core/transport/chttp2/graceful_shutdown_test.cc index 41a8b34d8dd..89184651bd0 100644 --- a/test/core/transport/chttp2/graceful_shutdown_test.cc +++ b/test/core/transport/chttp2/graceful_shutdown_test.cc @@ -85,22 +85,28 @@ class GracefulShutdownTest : public ::testing::Test { nullptr) == GRPC_ERROR_NONE); grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); // Start polling on the client - client_poll_thread_ = absl::make_unique([this]() { - grpc_completion_queue* client_cq = - grpc_completion_queue_create_for_next(nullptr); - { - ExecCtx exec_ctx; - grpc_endpoint_add_to_pollset(fds_.client, grpc_cq_pollset(client_cq)); - grpc_endpoint_add_to_pollset(fds_.server, grpc_cq_pollset(client_cq)); - } - while (!shutdown_) { - GPR_ASSERT( - grpc_completion_queue_next( - client_cq, grpc_timeout_milliseconds_to_deadline(10), nullptr) - .type == GRPC_QUEUE_TIMEOUT); - } - grpc_completion_queue_destroy(client_cq); - }); + absl::Notification client_poller_thread_started_notification; + client_poll_thread_ = absl::make_unique( + [this, &client_poller_thread_started_notification]() { + grpc_completion_queue* client_cq = + grpc_completion_queue_create_for_next(nullptr); + { + ExecCtx exec_ctx; + grpc_endpoint_add_to_pollset(fds_.client, + grpc_cq_pollset(client_cq)); + grpc_endpoint_add_to_pollset(fds_.server, + grpc_cq_pollset(client_cq)); + } + client_poller_thread_started_notification.Notify(); + while (!shutdown_) { + GPR_ASSERT(grpc_completion_queue_next( + client_cq, grpc_timeout_milliseconds_to_deadline(10), + nullptr) + .type == GRPC_QUEUE_TIMEOUT); + } + grpc_completion_queue_destroy(client_cq); + }); + client_poller_thread_started_notification.WaitForNotification(); // Write connection prefix and settings frame constexpr char kPrefix[] = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00"; diff --git a/tools/internal_ci/windows/grpc_distribtests_csharp.bat b/tools/internal_ci/windows/grpc_distribtests_csharp.bat index 4d5ce3fa9b3..8ba209c1ba4 100644 --- a/tools/internal_ci/windows/grpc_distribtests_csharp.bat +++ b/tools/internal_ci/windows/grpc_distribtests_csharp.bat @@ -30,6 +30,14 @@ call tools/internal_ci/helper_scripts/prepare_build_windows.bat || exit /b 1 call tools/internal_ci/helper_scripts/prepare_ccache.bat || exit /b 1 +@rem Install Msys2 zip to avoid crash when using cygwin's zip on grpc-win2016 kokoro workers. +@rem Downloading from GCS should be very reliables when on a GCP VM. +@rem TODO(jtattermusch): find a better way of making the build_packages step work on windows workers. +mkdir C:\zip +curl -sSL --fail -o C:\zip\zip.exe https://storage.googleapis.com/grpc-build-helper/zip-3.0-1-x86_64/zip.exe || goto :error +set PATH=C:\zip;%PATH% +zip --version + @rem Build all C# windows artifacts python tools/run_tests/task_runner.py -f artifact windows csharp %TASK_RUNNER_EXTRA_FILTERS% -j 4 --inner_jobs 4 -x build_artifacts_csharp/sponge_log.xml || set FAILED=true diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index c866324eac3..e3bdef194d0 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -274,7 +274,8 @@ LANG_RELEASE_MATRIX = { ('v1.40.2', ReleaseInfo()), ('v1.41.1', ReleaseInfo()), ('v1.42.1', ReleaseInfo()), - ('v1.43.1', ReleaseInfo()), + ('v1.43.2', ReleaseInfo()), + ('v1.44.0', ReleaseInfo()), ('v1.45.0', ReleaseInfo()), ]), 'python':