Merge github.com:grpc/grpc into http-reland

pull/29182/head
Craig Tiller 3 years ago
commit c367398740
  1. 53
      src/core/ext/filters/client_channel/health/health_check_client.cc
  2. 17
      src/core/ext/filters/client_channel/subchannel_stream_client.cc
  3. 7
      src/core/ext/filters/client_channel/subchannel_stream_client.h
  4. 18
      test/core/transport/chttp2/graceful_shutdown_test.cc
  5. 8
      tools/internal_ci/windows/grpc_distribtests_csharp.bat
  6. 3
      tools/interop_matrix/client_matrix.py

@ -26,41 +26,15 @@
#include <grpc/status.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/error_utils.h"
#include "src/proto/grpc/health/v1/health.upb.h"
#define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
#define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
#define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
#define HEALTH_CHECK_RECONNECT_JITTER 0.2
namespace grpc_core {
TraceFlag grpc_health_check_client_trace(false, "health_check_client");
namespace {
// Returns true if healthy.
absl::StatusOr<bool> DecodeResponse(char* message, size_t size) {
// If message is empty, assume unhealthy.
if (size == 0) {
return absl::InvalidArgumentError("health check response was empty");
}
// Deserialize message.
upb::Arena arena;
auto* response_struct = grpc_health_v1_HealthCheckResponse_parse(
reinterpret_cast<char*>(message), size, arena.ptr());
if (response_struct == nullptr) {
// Can't parse message; assume unhealthy.
return absl::InvalidArgumentError("cannot parse health check response");
}
int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
return status == grpc_health_v1_HealthCheckResponse_SERVING;
}
class HealthStreamEventHandler
: public SubchannelStreamClient::CallEventHandler {
public:
@ -101,18 +75,22 @@ class HealthStreamEventHandler
return request_slice;
}
void RecvMessageReadyLocked(SubchannelStreamClient* client, char* message,
size_t size) override {
auto healthy = DecodeResponse(message, size);
absl::Status RecvMessageReadyLocked(
SubchannelStreamClient* client,
absl::string_view serialized_message) override {
auto healthy = DecodeResponse(serialized_message);
if (!healthy.ok()) {
SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
healthy.status().ToString().c_str());
} else if (!*healthy) {
return healthy.status();
}
if (!*healthy) {
SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
"backend unhealthy");
} else {
SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
}
return absl::OkStatus();
}
void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
@ -132,6 +110,21 @@ class HealthStreamEventHandler
}
private:
// Returns true if healthy.
static absl::StatusOr<bool> DecodeResponse(
absl::string_view serialized_message) {
// Deserialize message.
upb::Arena arena;
auto* response = grpc_health_v1_HealthCheckResponse_parse(
serialized_message.data(), serialized_message.size(), arena.ptr());
if (response == nullptr) {
// Can't parse message; assume unhealthy.
return absl::InvalidArgumentError("cannot parse health check response");
}
int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
return status == grpc_health_v1_HealthCheckResponse_SERVING;
}
void SetHealthStatusLocked(SubchannelStreamClient* client,
grpc_connectivity_state state,
const char* reason) {

@ -391,9 +391,22 @@ void SubchannelStreamClient::CallState::DoneReadingRecvMessage(
{
MutexLock lock(&subchannel_stream_client_->mu_);
if (subchannel_stream_client_->event_handler_ != nullptr) {
subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
subchannel_stream_client_.get(),
absl::string_view serialized_message(
reinterpret_cast<char*>(recv_message), recv_message_buffer_.length);
absl::Status status =
subchannel_stream_client_->event_handler_->RecvMessageReadyLocked(
subchannel_stream_client_.get(), serialized_message);
if (!status.ok()) {
if (GPR_UNLIKELY(subchannel_stream_client_->tracer_ != nullptr)) {
gpr_log(GPR_INFO,
"%s %p: SubchannelStreamClient CallState %p: failed to "
"parse response message: %s",
subchannel_stream_client_->tracer_,
subchannel_stream_client_.get(), this,
status.ToString().c_str());
}
Cancel();
}
}
}
seen_response_.store(true, std::memory_order_release);

@ -21,6 +21,9 @@
#include <atomic>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "src/core/ext/filters/client_channel/subchannel.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/gprpp/orphanable.h"
@ -73,8 +76,8 @@ class SubchannelStreamClient
virtual grpc_slice EncodeSendMessageLocked()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
// Called whenever a message is received from the server.
virtual void RecvMessageReadyLocked(SubchannelStreamClient* client,
char* message, size_t size)
virtual absl::Status RecvMessageReadyLocked(
SubchannelStreamClient* client, absl::string_view serialized_message)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&SubchannelStreamClient::mu_) = 0;
// Called when a stream fails.
virtual void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,

@ -85,22 +85,28 @@ class GracefulShutdownTest : public ::testing::Test {
nullptr) == GRPC_ERROR_NONE);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
// Start polling on the client
client_poll_thread_ = absl::make_unique<std::thread>([this]() {
absl::Notification client_poller_thread_started_notification;
client_poll_thread_ = absl::make_unique<std::thread>(
[this, &client_poller_thread_started_notification]() {
grpc_completion_queue* client_cq =
grpc_completion_queue_create_for_next(nullptr);
{
ExecCtx exec_ctx;
grpc_endpoint_add_to_pollset(fds_.client, grpc_cq_pollset(client_cq));
grpc_endpoint_add_to_pollset(fds_.server, grpc_cq_pollset(client_cq));
grpc_endpoint_add_to_pollset(fds_.client,
grpc_cq_pollset(client_cq));
grpc_endpoint_add_to_pollset(fds_.server,
grpc_cq_pollset(client_cq));
}
client_poller_thread_started_notification.Notify();
while (!shutdown_) {
GPR_ASSERT(
grpc_completion_queue_next(
client_cq, grpc_timeout_milliseconds_to_deadline(10), nullptr)
GPR_ASSERT(grpc_completion_queue_next(
client_cq, grpc_timeout_milliseconds_to_deadline(10),
nullptr)
.type == GRPC_QUEUE_TIMEOUT);
}
grpc_completion_queue_destroy(client_cq);
});
client_poller_thread_started_notification.WaitForNotification();
// Write connection prefix and settings frame
constexpr char kPrefix[] =
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n\x00\x00\x00\x04\x00\x00\x00\x00\x00";

@ -30,6 +30,14 @@ call tools/internal_ci/helper_scripts/prepare_build_windows.bat || exit /b 1
call tools/internal_ci/helper_scripts/prepare_ccache.bat || exit /b 1
@rem Install Msys2 zip to avoid crash when using cygwin's zip on grpc-win2016 kokoro workers.
@rem Downloading from GCS should be very reliables when on a GCP VM.
@rem TODO(jtattermusch): find a better way of making the build_packages step work on windows workers.
mkdir C:\zip
curl -sSL --fail -o C:\zip\zip.exe https://storage.googleapis.com/grpc-build-helper/zip-3.0-1-x86_64/zip.exe || goto :error
set PATH=C:\zip;%PATH%
zip --version
@rem Build all C# windows artifacts
python tools/run_tests/task_runner.py -f artifact windows csharp %TASK_RUNNER_EXTRA_FILTERS% -j 4 --inner_jobs 4 -x build_artifacts_csharp/sponge_log.xml || set FAILED=true

@ -274,7 +274,8 @@ LANG_RELEASE_MATRIX = {
('v1.40.2', ReleaseInfo()),
('v1.41.1', ReleaseInfo()),
('v1.42.1', ReleaseInfo()),
('v1.43.1', ReleaseInfo()),
('v1.43.2', ReleaseInfo()),
('v1.44.0', ReleaseInfo()),
('v1.45.0', ReleaseInfo()),
]),
'python':

Loading…
Cancel
Save