[endpoint] Remove grpc_endpoint_shutdown().

This gives grpc_endpoint the same destruction-is-shutdown semantic as
EventEngine::Endpoint, which will make the migration easier.
PiperOrigin-RevId: 639867616
pull/36195/head
Mark D. Roth 8 months ago committed by Copybara-Service
parent b2e14059a6
commit 9b1bb788aa
  1. 1
      BUILD
  2. 1
      src/core/ext/transport/chaotic_good/client/chaotic_good_connector.cc
  3. 28
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  4. 5
      src/core/ext/transport/chttp2/client/chttp2_connector.h
  5. 35
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  6. 48
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  7. 7
      src/core/ext/transport/chttp2/transport/chttp2_transport.h
  8. 5
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  9. 10
      src/core/ext/transport/chttp2/transport/internal.h
  10. 5
      src/core/handshaker/handshaker.cc
  11. 20
      src/core/handshaker/http_connect/http_connect_handshaker.cc
  12. 39
      src/core/handshaker/security/secure_endpoint.cc
  13. 19
      src/core/handshaker/security/security_handshaker.cc
  14. 3
      src/core/handshaker/tcp_connect/tcp_connect_handshaker.cc
  15. 4
      src/core/lib/iomgr/endpoint.cc
  16. 2
      src/core/lib/iomgr/endpoint.h
  17. 18
      src/core/lib/iomgr/endpoint_cfstream.cc
  18. 4
      src/core/lib/iomgr/ev_posix.cc
  19. 12
      src/core/lib/iomgr/ev_posix.h
  20. 23
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  21. 15
      src/core/lib/iomgr/tcp_posix.cc
  22. 33
      src/core/lib/iomgr/tcp_windows.cc
  23. 3
      src/core/util/http_client/httpcli.cc
  24. 1
      src/objective-c/tests/CFStreamTests/CFStreamClientTests.mm
  25. 8
      src/objective-c/tests/CFStreamTests/CFStreamEndpointTests.mm
  26. 4
      test/core/bad_client/bad_client.cc
  27. 6
      test/core/bad_connection/close_fd_test.cc
  28. 1
      test/core/end2end/bad_server_response_test.cc
  29. 44
      test/core/end2end/fixtures/http_proxy_fixture.cc
  30. 8
      test/core/end2end/fixtures/sockpair_fixture.h
  31. 128
      test/core/iomgr/endpoint_tests.cc
  32. 1
      test/core/iomgr/ios/CFStreamTests/CFStreamClientTests.mm
  33. 8
      test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
  34. 2
      test/core/iomgr/tcp_client_posix_test.cc
  35. 1
      test/core/iomgr/tcp_server_posix_test.cc
  36. 8
      test/core/security/secure_endpoint_test.cc
  37. 1
      test/core/security/ssl_server_fuzzer.cc
  38. 1
      test/core/surface/concurrent_connectivity_test.cc
  39. 1
      test/core/test_util/reconnect_server.cc
  40. 40
      test/core/transport/chttp2/graceful_shutdown_test.cc
  41. 5
      test/core/transport/chttp2/settings_timeout_test.cc
  42. 50
      test/core/transport/chttp2/streams_not_seen_test.cc
  43. 4
      test/cpp/microbenchmarks/fullstack_fixtures.h
  44. 6
      test/cpp/performance/writes_per_rpc_test.cc

@ -4792,6 +4792,7 @@ grpc_cc_library(
"chttp2_context_list_entry",
"chttp2_legacy_frame",
"chttp2_varint",
"config_vars",
"debug_location",
"exec_ctx",
"gpr",

@ -289,7 +289,6 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
}
}

@ -96,12 +96,6 @@ void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
}
} // namespace
Chttp2Connector::~Chttp2Connector() {
if (endpoint_ != nullptr) {
grpc_endpoint_destroy(endpoint_);
}
}
void Chttp2Connector::Connect(const Args& args, Result* result,
grpc_closure* notify) {
{
@ -110,7 +104,6 @@ void Chttp2Connector::Connect(const Args& args, Result* result,
args_ = args;
result_ = result;
notify_ = notify;
CHECK_EQ(endpoint_, nullptr);
event_engine_ = args_.channel_args.GetObject<EventEngine>();
}
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(args.address);
@ -153,11 +146,6 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
if (args->endpoint != nullptr) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, error);
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
@ -172,13 +160,12 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) {
self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
self->endpoint_ = args->endpoint;
self->Ref().release(); // Ref held by OnReceiveSettings()
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer,
&self->on_receive_settings_, nullptr);
grpc_chttp2_transport_start_reading(
self->result_->transport, args->read_buffer,
&self->on_receive_settings_, self->args_.interested_parties, nullptr);
self->timer_handle_ = self->event_engine_->RunAfter(
self->args_.deadline - Timestamp::Now(),
[self = self->RefAsSubclass<Chttp2Connector>()] {
@ -203,8 +190,6 @@ void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) {
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
grpc_endpoint_delete_from_pollset_set(self->endpoint_,
self->args_.interested_parties);
if (!error.ok()) {
// Transport got an error while waiting on SETTINGS frame.
self->result_->Reset();
@ -233,7 +218,6 @@ void Chttp2Connector::OnTimeout() {
if (!notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
result_->Reset();
MaybeNotify(GRPC_ERROR_CREATE(
"connection attempt timed out before receiving SETTINGS frame"));
@ -248,9 +232,6 @@ void Chttp2Connector::MaybeNotify(grpc_error_handle error) {
if (notify_error_.has_value()) {
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear state for a new Connect().
// Clear out the endpoint_, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
@ -409,7 +390,8 @@ grpc_channel* grpc_channel_create_from_fd(const char* target, int fd,
auto channel = grpc_core::ChannelCreate(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
if (channel.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
grpc_core::ExecCtx::Get()->Flush();
return channel->release()->c_ptr();
} else {

@ -37,8 +37,6 @@ namespace grpc_core {
class Chttp2Connector : public SubchannelConnector {
public:
~Chttp2Connector() override;
void Connect(const Args& args, Result* result, grpc_closure* notify) override;
void Shutdown(grpc_error_handle error) override;
@ -63,9 +61,6 @@ class Chttp2Connector : public SubchannelConnector {
Result* result_ = nullptr;
grpc_closure* notify_ = nullptr;
bool shutdown_ = false;
// Holds the endpoint when first created before being handed off to
// the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr;
grpc_closure on_receive_settings_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
timer_handle_ ABSL_GUARDED_BY(mu_);

@ -460,11 +460,6 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
if (error.ok() && args->endpoint != nullptr) {
// We were shut down or stopped serving after handshaking completed
// successfully, so destroy the endpoint here.
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args->endpoint, absl::OkStatus());
grpc_endpoint_destroy(args->endpoint);
grpc_slice_buffer_destroy(args->read_buffer);
gpr_free(args->read_buffer);
@ -517,7 +512,7 @@ void Chttp2ServerListener::ActiveConnection::HandshakingState::OnHandshakeDone(
}
grpc_chttp2_transport_start_reading(
transport.get(), args->read_buffer, &self->on_receive_settings_,
on_close);
nullptr, on_close);
self->timer_handle_ = self->connection_->event_engine_->RunAfter(
self->deadline_ - Timestamp::Now(),
[self = self->Ref()]() mutable {
@ -645,7 +640,6 @@ void Chttp2ServerListener::ActiveConnection::Start(
// owning Chttp2ServerListener and all associated ActiveConnections have
// been orphaned. The generated endpoints need to be shutdown here to
// ensure the tcp connections are closed appropriately.
grpc_endpoint_shutdown(endpoint, absl::OkStatus());
grpc_endpoint_destroy(endpoint);
return;
}
@ -856,36 +850,30 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
MutexLock lock(&self->mu_);
connection_manager = self->connection_manager_;
}
auto endpoint_cleanup = [&](grpc_error_handle error) {
grpc_endpoint_shutdown(tcp, error);
auto endpoint_cleanup = [&]() {
grpc_endpoint_destroy(tcp);
gpr_free(acceptor);
};
if (!self->connection_quota_->AllowIncomingConnection(
self->memory_quota_, grpc_endpoint_get_peer(tcp))) {
grpc_error_handle error = GRPC_ERROR_CREATE(
"Rejected incoming connection because configured connection quota "
"limits have been exceeded.");
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
if (self->config_fetcher_ != nullptr) {
if (connection_manager == nullptr) {
grpc_error_handle error = GRPC_ERROR_CREATE(
"No ConnectionManager configured. Closing connection.");
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
absl::StatusOr<ChannelArgs> args_result =
connection_manager->UpdateChannelArgsForConnection(args, tcp);
if (!args_result.ok()) {
endpoint_cleanup(GRPC_ERROR_CREATE(args_result.status().ToString()));
endpoint_cleanup();
return;
}
grpc_error_handle error;
args = self->args_modifier_(*args_result, &error);
if (!error.ok()) {
endpoint_cleanup(error);
endpoint_cleanup();
return;
}
}
@ -915,7 +903,7 @@ void Chttp2ServerListener::OnAccept(void* arg, grpc_endpoint* tcp,
}
}
if (connection != nullptr) {
endpoint_cleanup(absl::OkStatus());
endpoint_cleanup();
} else {
connection_ref->Start(std::move(listener_ref), tcp, args);
}
@ -1177,16 +1165,17 @@ void grpc_server_add_channel_from_fd(grpc_server* server, int fd,
grpc_fd_create(fd, name.c_str(), true),
grpc_event_engine::experimental::ChannelArgsEndpointConfig(server_args),
name);
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_core::Transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false // is_client
);
grpc_error_handle error =
core_server->SetupTransport(transport, nullptr, server_args, nullptr);
if (error.ok()) {
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
LOG(ERROR) << "Failed to create channel: "
<< grpc_core::StatusToString(error);

@ -74,6 +74,7 @@
#include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/experiments/experiments.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/crash.h"
@ -83,6 +84,7 @@
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/port.h"
@ -110,10 +112,6 @@
#include "src/core/util/string.h"
#include "src/core/util/useful.h"
#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/ev_posix.h"
#endif
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
#define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
@ -380,7 +378,7 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
channelz_socket.reset();
}
grpc_endpoint_destroy(ep);
if (ep != nullptr) grpc_endpoint_destroy(ep);
grpc_slice_buffer_destroy(&qbuf);
@ -749,9 +747,21 @@ static void close_transport_locked(grpc_chttp2_transport* t,
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
}
CHECK(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(t->ep, error);
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::MutexLock lock(&t->ep_destroy_mu);
grpc_endpoint_destroy(t->ep);
t->ep = nullptr;
}
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
error);
t->notify_on_receive_settings = nullptr;
@ -2997,12 +3007,22 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
grpc_pollset* pollset) {
grpc_endpoint_add_to_pollset(ep, pollset);
// We don't want the overhead of acquiring the mutex unless we're
// using the "poll" polling engine, which is the only one that
// actually uses pollsets.
if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset(ep, pollset);
}
void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
grpc_pollset_set* pollset_set) {
grpc_endpoint_add_to_pollset_set(ep, pollset_set);
// We don't want the overhead of acquiring the mutex unless we're
// using the "poll" polling engine, which is the only one that
// actually uses pollsets.
if (strcmp(grpc_get_poll_strategy_name(), "poll") != 0) return;
grpc_core::MutexLock lock(&ep_destroy_mu);
if (ep != nullptr) grpc_endpoint_add_to_pollset_set(ep, pollset_set);
}
//
@ -3188,7 +3208,9 @@ grpc_core::Transport* grpc_create_chttp2_transport(
void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close) {
auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
if (read_buffer != nullptr) {
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
@ -3197,9 +3219,15 @@ void grpc_chttp2_transport_start_reading(
auto* tp = t.get();
tp->combiner->Run(
grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
interested_parties_until_recv_settings,
notify_on_close](grpc_error_handle) mutable {
if (!t->closed_with_error.ok()) {
if (notify_on_receive_settings != nullptr) {
if (t->ep != nullptr &&
interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, interested_parties_until_recv_settings);
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
t->closed_with_error);
}
@ -3209,6 +3237,8 @@ void grpc_chttp2_transport_start_reading(
}
return;
}
t->interested_parties_until_recv_settings =
interested_parties_until_recv_settings;
t->notify_on_receive_settings = notify_on_receive_settings;
t->notify_on_close = notify_on_close;
read_action_locked(std::move(t), absl::OkStatus());

@ -59,9 +59,14 @@ grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport);
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
/// If non-null, \a notify_on_receive_settings will be scheduled when
/// HTTP/2 settings are received from the peer.
/// If non-null, the endpoint will be removed from
/// interested_parties_until_recv_settings before
/// notify_on_receive_settings is invoked.
void grpc_chttp2_transport_start_reading(
grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close);
grpc_closure* notify_on_receive_settings,
grpc_pollset_set* interested_parties_until_recv_settings,
grpc_closure* notify_on_close);
namespace grpc_core {
typedef void (*TestOnlyGlobalHttp2TransportInitCallback)();

@ -109,6 +109,11 @@ grpc_error_handle grpc_chttp2_settings_parser_parse(void* p,
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK);
if (t->notify_on_receive_settings != nullptr) {
if (t->interested_parties_until_recv_settings != nullptr) {
grpc_endpoint_delete_from_pollset_set(
t->ep, t->interested_parties_until_recv_settings);
t->interested_parties_until_recv_settings = nullptr;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
t->notify_on_receive_settings,
absl::OkStatus());

@ -258,6 +258,8 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
void PerformOp(grpc_transport_op* op) override;
grpc_endpoint* ep;
grpc_core::Mutex ep_destroy_mu; // Guards endpoint destruction only.
grpc_core::Slice peer_string;
grpc_core::MemoryOwner memory_owner;
@ -268,6 +270,14 @@ struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
grpc_core::Combiner* combiner;
absl::BitGen bitgen;
// On the client side, when the transport is first created, the
// endpoint will already have been added to this pollset_set, and it
// needs to stay there until the notify_on_receive_settings callback
// is invoked. After that, the polling will be coordinated via the
// bind_pollset_set transport op, sent by the subchannel when it
// starts a connectivity watch.
grpc_pollset_set* interested_parties_until_recv_settings = nullptr;
grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;

@ -113,11 +113,6 @@ bool HandshakeManager::CallNextHandshakerLocked(grpc_error_handle error) {
// a shutdown call while this callback was sitting on the ExecCtx
// with no error.
if (args_.endpoint != nullptr) {
// TODO(roth): It is currently necessary to shutdown endpoints
// before destroying then, even when we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args_.endpoint, error);
grpc_endpoint_destroy(args_.endpoint);
args_.endpoint = nullptr;
}

@ -80,8 +80,7 @@ class HttpConnectHandshaker : public Handshaker {
Mutex mu_;
bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false;
// Endpoint and read buffer to destroy after a shutdown.
grpc_endpoint* endpoint_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
// Read buffer to destroy after a shutdown.
grpc_slice_buffer* read_buffer_to_destroy_ ABSL_GUARDED_BY(mu_) = nullptr;
// State saved while performing the handshake.
@ -97,9 +96,6 @@ class HttpConnectHandshaker : public Handshaker {
};
HttpConnectHandshaker::~HttpConnectHandshaker() {
if (endpoint_to_destroy_ != nullptr) {
grpc_endpoint_destroy(endpoint_to_destroy_);
}
if (read_buffer_to_destroy_ != nullptr) {
grpc_slice_buffer_destroy(read_buffer_to_destroy_);
gpr_free(read_buffer_to_destroy_);
@ -112,8 +108,6 @@ HttpConnectHandshaker::~HttpConnectHandshaker() {
// Set args fields to nullptr, saving the endpoint and read buffer for
// later destruction.
void HttpConnectHandshaker::CleanupArgsForFailureLocked() {
endpoint_to_destroy_ = args_->endpoint;
args_->endpoint = nullptr;
read_buffer_to_destroy_ = args_->read_buffer;
args_->read_buffer = nullptr;
args_->args = ChannelArgs();
@ -129,13 +123,10 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error_handle error) {
error = GRPC_ERROR_CREATE("Handshaker shutdown");
}
if (!is_shutdown_) {
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args_->endpoint, error);
// Not shutting down, so the handshake failed. Clean up before
// invoking the callback.
grpc_endpoint_destroy(args_->endpoint);
args_->endpoint = nullptr;
CleanupArgsForFailureLocked();
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
@ -276,12 +267,13 @@ done:
// Public handshaker methods
//
void HttpConnectHandshaker::Shutdown(grpc_error_handle why) {
void HttpConnectHandshaker::Shutdown(grpc_error_handle /*why*/) {
{
MutexLock lock(&mu_);
if (!is_shutdown_) {
is_shutdown_ = true;
grpc_endpoint_shutdown(args_->endpoint, why);
grpc_endpoint_destroy(args_->endpoint);
args_->endpoint = nullptr;
CleanupArgsForFailureLocked();
}
}

@ -61,6 +61,7 @@
#define STAGING_BUFFER_SIZE 8192
static void on_read(void* user_data, grpc_error_handle error);
static void on_write(void* user_data, grpc_error_handle error);
namespace {
struct secure_endpoint {
@ -76,6 +77,7 @@ struct secure_endpoint {
base.vtable = vtable;
gpr_mu_init(&protector_mu);
GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_write, ::on_write, this, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&source_buffer);
grpc_slice_buffer_init(&leftover_bytes);
for (size_t i = 0; i < leftover_nslices; i++) {
@ -103,7 +105,7 @@ struct secure_endpoint {
}
~secure_endpoint() {
grpc_endpoint_destroy(wrapped_ep);
memory_owner.Reset();
tsi_frame_protector_destroy(protector);
tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
grpc_slice_buffer_destroy(&source_buffer);
@ -126,6 +128,7 @@ struct secure_endpoint {
grpc_closure* read_cb = nullptr;
grpc_closure* write_cb = nullptr;
grpc_closure on_read;
grpc_closure on_write;
grpc_slice_buffer* read_buffer = nullptr;
grpc_slice_buffer source_buffer;
// saved handshaker leftover data to unprotect.
@ -260,12 +263,7 @@ static void on_read(void* user_data, grpc_error_handle error) {
if (!error.ok()) {
grpc_slice_buffer_reset_and_unref(ep->read_buffer);
call_read_cb(
ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
return;
}
if (ep->zero_copy_protector != nullptr) {
} else if (ep->zero_copy_protector != nullptr) {
// Use zero-copy grpc protector to unprotect.
int min_progress_size = 1;
// Get the size of the last frame which is not yet fully decrypted.
@ -333,6 +331,12 @@ static void on_read(void* user_data, grpc_error_handle error) {
}
}
if (!error.ok()) {
call_read_cb(
ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
return;
}
// TODO(yangg) experiment with moving this block after read_cb to see if it
// helps latency
grpc_slice_buffer_reset_and_unref(&ep->source_buffer);
@ -379,6 +383,13 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
maybe_post_reclaimer(ep);
}
static void on_write(void* user_data, grpc_error_handle error) {
secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, std::exchange(ep->write_cb, nullptr),
std::move(error));
SECURE_ENDPOINT_UNREF(ep, "write");
}
static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* arg, int max_frame_size) {
unsigned i;
@ -489,18 +500,17 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
return;
}
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg,
// Need to hold a ref here, because the wrapped endpoint may access
// output_buffer at any time until the write completes.
SECURE_ENDPOINT_REF(ep, "write");
ep->write_cb = cb;
grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, &ep->on_write, arg,
max_frame_size);
}
static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error_handle why) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
grpc_endpoint_shutdown(ep->wrapped_ep, why);
}
static void endpoint_destroy(grpc_endpoint* secure_ep) {
secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
ep->memory_owner.Reset();
grpc_endpoint_destroy(ep->wrapped_ep);
SECURE_ENDPOINT_UNREF(ep, "destroy");
}
@ -547,7 +557,6 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
endpoint_add_to_pollset,
endpoint_add_to_pollset_set,
endpoint_delete_from_pollset_set,
endpoint_shutdown,
endpoint_destroy,
endpoint_get_peer,
endpoint_get_local_address,

@ -117,8 +117,7 @@ class SecurityHandshaker : public Handshaker {
Mutex mu_;
bool is_shutdown_ = false;
// Endpoint and read buffer to destroy after a shutdown.
grpc_endpoint* endpoint_to_destroy_ = nullptr;
// Read buffer to destroy after a shutdown.
grpc_slice_buffer* read_buffer_to_destroy_ = nullptr;
// State saved while performing the handshake.
@ -155,9 +154,6 @@ SecurityHandshaker::SecurityHandshaker(tsi_handshaker* handshaker,
SecurityHandshaker::~SecurityHandshaker() {
tsi_handshaker_destroy(handshaker_);
tsi_handshaker_result_destroy(handshaker_result_);
if (endpoint_to_destroy_ != nullptr) {
grpc_endpoint_destroy(endpoint_to_destroy_);
}
if (read_buffer_to_destroy_ != nullptr) {
grpc_slice_buffer_destroy(read_buffer_to_destroy_);
gpr_free(read_buffer_to_destroy_);
@ -189,8 +185,6 @@ size_t SecurityHandshaker::MoveReadBufferIntoHandshakeBuffer() {
// Set args_ fields to NULL, saving the endpoint and read buffer for
// later destruction.
void SecurityHandshaker::CleanupArgsForFailureLocked() {
endpoint_to_destroy_ = args_->endpoint;
args_->endpoint = nullptr;
read_buffer_to_destroy_ = args_->read_buffer;
args_->read_buffer = nullptr;
args_->args = ChannelArgs();
@ -206,11 +200,8 @@ void SecurityHandshaker::HandshakeFailedLocked(grpc_error_handle error) {
}
if (!is_shutdown_) {
tsi_handshaker_shutdown(handshaker_);
// TODO(ctiller): It is currently necessary to shutdown endpoints
// before destroying them, even if we know that there are no
// pending read/write callbacks. This should be fixed, at which
// point this can be removed.
grpc_endpoint_shutdown(args_->endpoint, error);
grpc_endpoint_destroy(args_->endpoint);
args_->endpoint = nullptr;
// Not shutting down, so the write failed. Clean up before
// invoking the callback.
CleanupArgsForFailureLocked();
@ -555,7 +546,8 @@ void SecurityHandshaker::Shutdown(grpc_error_handle why) {
is_shutdown_ = true;
connector_->cancel_check_peer(&on_peer_checked_, why);
tsi_handshaker_shutdown(handshaker_);
grpc_endpoint_shutdown(args_->endpoint, why);
grpc_endpoint_destroy(args_->endpoint);
args_->endpoint = nullptr;
CleanupArgsForFailureLocked();
}
}
@ -589,7 +581,6 @@ class FailHandshaker : public Handshaker {
void DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/,
grpc_closure* on_handshake_done,
HandshakerArgs* args) override {
grpc_endpoint_shutdown(args->endpoint, status_);
grpc_endpoint_destroy(args->endpoint);
args->endpoint = nullptr;
args->args = ChannelArgs();

@ -167,7 +167,8 @@ void TCPConnectHandshaker::Connected(void* arg, grpc_error_handle error) {
error = GRPC_ERROR_CREATE("tcp handshaker shutdown");
}
if (self->endpoint_to_destroy_ != nullptr) {
grpc_endpoint_shutdown(self->endpoint_to_destroy_, error);
grpc_endpoint_destroy(self->endpoint_to_destroy_);
self->endpoint_to_destroy_ = nullptr;
}
if (!self->shutdown_) {
self->CleanupArgsForFailureLocked();

@ -46,10 +46,6 @@ void grpc_endpoint_delete_from_pollset_set(grpc_endpoint* ep,
ep->vtable->delete_from_pollset_set(ep, pollset_set);
}
void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
ep->vtable->shutdown(ep, why);
}
void grpc_endpoint_destroy(grpc_endpoint* ep) { ep->vtable->destroy(ep); }
absl::string_view grpc_endpoint_get_peer(grpc_endpoint* ep) {

@ -43,7 +43,6 @@ struct grpc_endpoint_vtable {
void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset);
void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset);
void (*shutdown)(grpc_endpoint* ep, grpc_error_handle why);
void (*destroy)(grpc_endpoint* ep);
absl::string_view (*get_peer)(grpc_endpoint* ep);
absl::string_view (*get_local_address)(grpc_endpoint* ep);
@ -86,7 +85,6 @@ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
// Causes any pending and future read/write callbacks to run immediately with
// success==0
void grpc_endpoint_shutdown(grpc_endpoint* ep, grpc_error_handle why);
void grpc_endpoint_destroy(grpc_endpoint* ep);
// Add an endpoint to a pollset or pollset_set, so that when the pollset is

@ -270,25 +270,16 @@ static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
}
void CFStreamShutdown(grpc_endpoint* ep, grpc_error_handle why) {
void CFStreamDestroy(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%s)", ep_impl,
grpc_core::StatusToString(why).c_str());
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
}
CFReadStreamClose(ep_impl->read_stream);
CFWriteStreamClose(ep_impl->write_stream);
ep_impl->stream_sync->Shutdown(why);
ep_impl->stream_sync->Shutdown(absl::UnavailableError("endpoint shutdown"));
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%s)", ep_impl,
grpc_core::StatusToString(why).c_str());
}
}
void CFStreamDestroy(grpc_endpoint* ep) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy DONE", ep_impl);
}
EP_UNREF(ep_impl, "destroy");
}
@ -318,7 +309,6 @@ static const grpc_endpoint_vtable vtable = {CFStreamRead,
CFStreamAddToPollset,
CFStreamAddToPollsetSet,
CFStreamDeleteFromPollsetSet,
CFStreamShutdown,
CFStreamDestroy,
CFStreamGetPeer,
CFStreamGetLocalAddress,

@ -340,4 +340,8 @@ void grpc_shutdown_background_closure(void) {
g_event_engine->shutdown_background_closure();
}
#else // GRPC_POSIX_SOCKET_EV
const char* grpc_get_poll_strategy_name() { return ""; }
#endif // GRPC_POSIX_SOCKET_EV

@ -19,6 +19,10 @@
#ifndef GRPC_SRC_CORE_LIB_IOMGR_EV_POSIX_H
#define GRPC_SRC_CORE_LIB_IOMGR_EV_POSIX_H
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_POSIX_SOCKET_EV
#include <poll.h>
#include <grpc/support/port_platform.h>
@ -99,9 +103,6 @@ void grpc_register_event_engine_factory(const grpc_event_engine_vtable* vtable,
void grpc_event_engine_init(void);
void grpc_event_engine_shutdown(void);
// Return the name of the poll strategy
const char* grpc_get_poll_strategy_name();
// Returns true if polling engine can track errors separately, false otherwise.
// If this is true, fd can be created with track_err set. After this, error
// events will be reported using fd_notify_on_error. If it is not set, errors
@ -207,4 +208,9 @@ void grpc_shutdown_background_closure();
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
#endif // GRPC_POSIX_SOCKET_EV
// Return the name of the poll strategy
const char* grpc_get_poll_strategy_name();
#endif // GRPC_SRC_CORE_LIB_IOMGR_EV_POSIX_H

@ -346,29 +346,17 @@ void EndpointAddToPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}
void EndpointDeleteFromPollsetSet(grpc_endpoint* /* ep */,
grpc_pollset_set* /* pollset */) {}
/// After shutdown, all endpoint operations except destroy are no-op,
/// and will return some kind of sane default (empty strings, nullptrs, etc).
/// It is the caller's responsibility to ensure that calls to EndpointShutdown
/// are synchronized.
void EndpointShutdown(grpc_endpoint* ep, grpc_error_handle why) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP Endpoint %p shutdown why=%s", eeep->wrapper,
why.ToString().c_str());
}
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Shutdown:%s", eeep->wrapper,
why.ToString().c_str());
eeep->wrapper->TriggerShutdown(nullptr);
}
// Attempts to free the underlying data structures.
/// Attempts to free the underlying data structures.
/// After destruction, no new endpoint operations may be started.
/// It is the caller's responsibility to ensure that calls to EndpointDestroy
/// are synchronized.
void EndpointDestroy(grpc_endpoint* ep) {
auto* eeep =
reinterpret_cast<EventEngineEndpointWrapper::grpc_event_engine_endpoint*>(
ep);
GRPC_EVENT_ENGINE_TRACE("EventEngine::Endpoint %p Destroy", eeep->wrapper);
eeep->wrapper->TriggerShutdown(nullptr);
eeep->wrapper->Unref();
}
@ -406,7 +394,6 @@ grpc_endpoint_vtable grpc_event_engine_endpoint_vtable = {
EndpointAddToPollset,
EndpointAddToPollsetSet,
EndpointDeleteFromPollsetSet,
EndpointShutdown,
EndpointDestroy,
EndpointGetPeerAddress,
EndpointGetLocalAddress,

@ -772,15 +772,6 @@ static grpc_error_handle tcp_annotate_error(grpc_error_handle src_error,
static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error_handle error);
static void tcp_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, why);
tcp->read_mu.Lock();
tcp->memory_owner.Reset();
tcp->read_mu.Unlock();
}
static void tcp_free(grpc_tcp* tcp) {
grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
@ -818,10 +809,11 @@ static void tcp_ref(grpc_tcp* tcp) { tcp->refcount.Ref(); }
#endif
static void tcp_destroy(grpc_endpoint* ep) {
gpr_log(GPR_INFO, "IOMGR endpoint shutdown");
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
ZerocopyDisableAndWaitForRemaining(tcp);
grpc_fd_shutdown(tcp->em_fd, absl::UnavailableError("endpoint shutdown"));
if (grpc_event_engine_can_track_errors()) {
ZerocopyDisableAndWaitForRemaining(tcp);
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_set_error(tcp->em_fd);
}
@ -1965,7 +1957,6 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset,
tcp_add_to_pollset_set,
tcp_delete_from_pollset_set,
tcp_shutdown,
tcp_destroy,
tcp_get_peer,
tcp_get_local_address,

@ -124,7 +124,6 @@ typedef struct grpc_tcp {
// to protect ourselves when requesting a shutdown.
gpr_mu mu;
int shutting_down;
grpc_error_handle shutdown_error;
std::string peer_string;
std::string local_address;
@ -220,10 +219,8 @@ static void on_read(void* tcpp, grpc_error_handle error) {
}
grpc_slice_buffer_reset_and_unref(tcp->read_slices);
error = grpc_error_set_int(
tcp->shutting_down
? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down",
&tcp->shutdown_error, 1)
: GRPC_ERROR_CREATE("End of TCP stream"),
tcp->shutting_down ? GRPC_ERROR_CREATE("TCP stream shutting down")
: GRPC_ERROR_CREATE("End of TCP stream"),
grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
}
}
@ -255,10 +252,9 @@ static void win_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
if (tcp->shutting_down) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("TCP socket is shutting down",
&tcp->shutdown_error, 1),
grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE));
grpc_error_set_int(GRPC_ERROR_CREATE("TCP socket is shutting down"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
return;
}
@ -371,10 +367,9 @@ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
if (tcp->shutting_down) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, cb,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("TCP socket is shutting down",
&tcp->shutdown_error, 1),
grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE));
grpc_error_set_int(GRPC_ERROR_CREATE("TCP socket is shutting down"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
return;
}
@ -479,21 +474,14 @@ static void win_delete_from_pollset_set(grpc_endpoint* /* ep */,
// we're not going to protect against these. However the IO Completion Port
// callback will happen from another thread, so we need to protect against
// concurrent access of the data structure in that regard.
static void win_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
static void win_destroy(grpc_endpoint* ep) {
grpc_tcp* tcp = (grpc_tcp*)ep;
gpr_mu_lock(&tcp->mu);
// At that point, what may happen is that we're already inside the IOCP
// callback. See the comments in on_read and on_write.
if (!tcp->shutting_down) {
tcp->shutting_down = 1;
tcp->shutdown_error = why;
}
tcp->shutting_down = 1;
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
}
static void win_destroy(grpc_endpoint* ep) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(tcp, "destroy");
}
@ -517,7 +505,6 @@ static grpc_endpoint_vtable vtable = {win_read,
win_add_to_pollset,
win_add_to_pollset_set,
win_delete_from_pollset_set,
win_shutdown,
win_destroy,
win_get_peer,
win_get_local_address,

@ -232,7 +232,8 @@ void HttpRequest::Orphan() {
GRPC_ERROR_CREATE("HTTP request cancelled during handshake"));
}
if (own_endpoint_ && ep_ != nullptr) {
grpc_endpoint_shutdown(ep_, GRPC_ERROR_CREATE("HTTP request cancelled"));
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
}
}
Unref();

@ -50,7 +50,6 @@ static void finish_connection() {
static void must_succeed(void* arg, grpc_error_handle error) {
CHECK(g_connecting != nullptr);
CHECK(error.ok());
grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(g_connecting);
g_connecting = nullptr;
finish_connection();

@ -152,7 +152,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)tearDown {
grpc_core::ExecCtx exec_ctx;
close(svr_fd_);
grpc_endpoint_destroy(ep_);
if (ep_ != nullptr) grpc_endpoint_destroy(ep_);
}
- (void)testReadWrite {
@ -206,7 +206,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
}
XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
grpc_slice_buffer_reset_and_unref(&write_slices);
grpc_slice_buffer_reset_and_unref(&read_one_slice);
@ -254,7 +253,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
grpc_core::ExecCtx::Get()->Flush();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
@ -310,7 +310,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), absl::OkStatus());
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
grpc_slice_buffer_reset_and_unref(&write_slices);
}
@ -338,7 +337,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), absl::OkStatus());
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
}

@ -94,7 +94,6 @@ static void set_read_done(void* arg, grpc_error_handle /*error*/) {
// shutdown client
static void shutdown_client(grpc_endpoint** client_fd) {
if (*client_fd != nullptr) {
grpc_endpoint_shutdown(*client_fd, GRPC_ERROR_CREATE("Forced Disconnect"));
grpc_endpoint_destroy(*client_fd);
grpc_core::ExecCtx::Get()->Flush();
*client_fd = nullptr;
@ -230,7 +229,8 @@ void grpc_run_bad_client_test(
.PreconditionChannelArgs(server_args.ToC().get()),
sfd.server, false);
server_setup_transport(&a, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
// Bind fds to pollsets
grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(client_cq));

@ -125,7 +125,8 @@ static void init_client() {
g_ctx.ep->client, true);
client_setup_transport(transport);
CHECK(g_ctx.client);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
}
static void init_server() {
@ -138,7 +139,8 @@ static void init_server() {
transport = grpc_create_chttp2_transport(grpc_core::ChannelArgs(),
g_ctx.ep->server, false);
server_setup_transport(transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
}
static void test_init() {

@ -354,7 +354,6 @@ static void run_test(bool http2_response, bool send_settings,
// Proof that the server accepted the TCP connection.
CHECK_EQ(state.connection_attempt_made, true);
// clean up
grpc_endpoint_shutdown(state.tcp, GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(state.tcp);
cleanup_rpc();
grpc_core::ExecCtx::Get()->Flush();

@ -161,7 +161,9 @@ static void proxy_connection_ref(proxy_connection* conn,
static void proxy_connection_unref(proxy_connection* conn,
const char* /*reason*/) {
if (gpr_unref(&conn->refcount)) {
grpc_endpoint_destroy(conn->client_endpoint);
if (conn->client_endpoint != nullptr) {
grpc_endpoint_destroy(conn->client_endpoint);
}
if (conn->server_endpoint != nullptr) {
grpc_endpoint_destroy(conn->server_endpoint);
}
@ -226,13 +228,16 @@ static void proxy_connection_failed(proxy_connection* conn,
}
}
// If we decided to shut down either one and have not yet done so, do so.
if (shutdown_client && !conn->client_shutdown) {
grpc_endpoint_shutdown(conn->client_endpoint, error);
if (shutdown_client && !conn->client_shutdown &&
conn->client_endpoint != nullptr) {
grpc_endpoint_destroy(conn->client_endpoint);
conn->client_endpoint = nullptr;
conn->client_shutdown = true;
}
if (shutdown_server && !conn->server_shutdown &&
(conn->server_endpoint != nullptr)) {
grpc_endpoint_shutdown(conn->server_endpoint, error);
conn->server_endpoint != nullptr) {
grpc_endpoint_destroy(conn->server_endpoint);
conn->server_endpoint = nullptr;
conn->server_shutdown = true;
}
// Unref the connection.
@ -250,8 +255,8 @@ static void on_client_write_done_locked(void* arg, grpc_error_handle error) {
return;
}
if (conn->server_read_failed) {
grpc_endpoint_shutdown(conn->client_endpoint,
absl::UnknownError("Client shutdown"));
grpc_endpoint_destroy(conn->client_endpoint);
conn->client_endpoint = nullptr;
// No more writes. Unref the connection.
proxy_connection_unref(conn, "client_write");
return;
@ -260,7 +265,8 @@ static void on_client_write_done_locked(void* arg, grpc_error_handle error) {
grpc_slice_buffer_reset_and_unref(&conn->client_write_buffer);
// If more data was read from the server since we started this write,
// write that data now.
if (conn->client_deferred_write_buffer.length > 0) {
if (conn->client_deferred_write_buffer.length > 0 &&
conn->client_endpoint != nullptr) {
grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer,
&conn->client_write_buffer);
conn->client_is_writing = true;
@ -292,10 +298,9 @@ static void on_server_write_done_locked(void* arg, grpc_error_handle error) {
"HTTP proxy server write", error);
return;
}
if (conn->client_read_failed) {
grpc_endpoint_shutdown(conn->server_endpoint,
absl::UnknownError("Server shutdown"));
grpc_endpoint_destroy(conn->server_endpoint);
conn->server_endpoint = nullptr;
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
return;
@ -304,7 +309,8 @@ static void on_server_write_done_locked(void* arg, grpc_error_handle error) {
grpc_slice_buffer_reset_and_unref(&conn->server_write_buffer);
// If more data was read from the client since we started this write,
// write that data now.
if (conn->server_deferred_write_buffer.length > 0) {
if (conn->server_deferred_write_buffer.length > 0 &&
conn->server_endpoint != nullptr) {
grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer,
&conn->server_write_buffer);
conn->server_is_writing = true;
@ -345,7 +351,7 @@ static void on_client_read_done_locked(void* arg, grpc_error_handle error) {
if (conn->server_is_writing) {
grpc_slice_buffer_move_into(&conn->client_read_buffer,
&conn->server_deferred_write_buffer);
} else {
} else if (conn->server_endpoint != nullptr) {
grpc_slice_buffer_move_into(&conn->client_read_buffer,
&conn->server_write_buffer);
proxy_connection_ref(conn, "client_read");
@ -356,6 +362,10 @@ static void on_client_read_done_locked(void* arg, grpc_error_handle error) {
&conn->on_server_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
}
if (conn->client_endpoint == nullptr) {
proxy_connection_unref(conn, "client_read");
return;
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_schedule_on_exec_ctx);
@ -390,7 +400,7 @@ static void on_server_read_done_locked(void* arg, grpc_error_handle error) {
if (conn->client_is_writing) {
grpc_slice_buffer_move_into(&conn->server_read_buffer,
&conn->client_deferred_write_buffer);
} else {
} else if (conn->client_endpoint != nullptr) {
grpc_slice_buffer_move_into(&conn->server_read_buffer,
&conn->client_write_buffer);
proxy_connection_ref(conn, "server_read");
@ -401,6 +411,10 @@ static void on_server_read_done_locked(void* arg, grpc_error_handle error) {
&conn->on_client_write_done, nullptr,
/*max_frame_size=*/INT_MAX);
}
if (conn->server_endpoint == nullptr) {
proxy_connection_unref(conn, "server_read");
return;
}
// Read more data.
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_schedule_on_exec_ctx);
@ -604,14 +618,12 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
if (proxy_destroyed.load()) {
grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown"));
grpc_endpoint_destroy(endpoint);
return;
}
grpc_end2end_http_proxy* proxy = static_cast<grpc_end2end_http_proxy*>(arg);
proxy_ref(proxy);
if (proxy->is_shutdown.load()) {
grpc_endpoint_shutdown(endpoint, absl::UnknownError("proxy shutdown"));
grpc_endpoint_destroy(endpoint);
proxy_unref(proxy);
return;

@ -56,11 +56,9 @@ class SockpairFixture : public CoreTestFixture {
~SockpairFixture() override {
ExecCtx exec_ctx;
if (ep_.client != nullptr) {
grpc_endpoint_shutdown(ep_.client, absl::InternalError("done"));
grpc_endpoint_destroy(ep_.client);
}
if (ep_.server != nullptr) {
grpc_endpoint_shutdown(ep_.server, absl::InternalError("done"));
grpc_endpoint_destroy(ep_.server);
}
}
@ -90,7 +88,8 @@ class SockpairFixture : public CoreTestFixture {
grpc_error_handle error = core_server->SetupTransport(
transport, nullptr, core_server->channel_args(), nullptr);
if (error.ok()) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
transport->Orphan();
}
@ -115,7 +114,8 @@ class SockpairFixture : public CoreTestFixture {
grpc_channel* client;
if (channel.ok()) {
client = channel->release()->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
} else {
client = grpc_lame_client_channel_create(
nullptr, static_cast<grpc_status_code>(channel.status().code()),

@ -106,6 +106,7 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
}
struct read_and_write_test_state {
grpc_core::Mutex ep_mu; // Guards read_ep and write_ep.
grpc_endpoint* read_ep;
grpc_endpoint* write_ep;
size_t target_bytes;
@ -125,18 +126,20 @@ struct read_and_write_test_state {
grpc_closure write_scheduler;
};
static void read_scheduler(void* data, grpc_error_handle /* error */) {
static void read_scheduler(void* data, grpc_error_handle error) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
/*urgent=*/false, /*min_progress_size=*/1);
}
static void read_and_write_test_read_handler_read_done(
read_and_write_test_state* state, int read_done_state) {
if (error.ok() && state->bytes_read < state->target_bytes) {
grpc_core::MutexLock lock(&state->ep_mu);
if (state->read_ep != nullptr) {
grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
/*urgent=*/false, /*min_progress_size=*/1);
return;
}
}
VLOG(2) << "Read handler done";
gpr_mu_lock(g_mu);
state->read_done = read_done_state;
state->read_done = 1 + error.ok();
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
gpr_mu_unlock(g_mu);
}
@ -145,37 +148,41 @@ static void read_and_write_test_read_handler(void* data,
grpc_error_handle error) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
if (!error.ok()) {
read_and_write_test_read_handler_read_done(state, 1);
return;
}
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes) {
read_and_write_test_read_handler_read_done(state, 2);
return;
if (error.ok()) {
state->bytes_read +=
count_slices(state->incoming.slices, state->incoming.count,
&state->current_read_data);
}
// We perform many reads one after another. If grpc_endpoint_read and the
// read_handler are both run inline, we might end up growing the stack
// beyond the limit. Schedule the read on ExecCtx to avoid this.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->read_scheduler,
absl::OkStatus());
std::move(error));
}
static void write_scheduler(void* data, grpc_error_handle /* error */) {
static void write_scheduler(void* data, grpc_error_handle error) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
nullptr, /*max_frame_size=*/state->max_write_frame_size);
if (error.ok() && state->current_write_size != 0) {
grpc_core::MutexLock lock(&state->ep_mu);
if (state->write_ep != nullptr) {
grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
nullptr,
/*max_frame_size=*/state->max_write_frame_size);
return;
}
}
VLOG(2) << "Write handler done";
gpr_mu_lock(g_mu);
state->write_done = 1 + error.ok();
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
gpr_mu_unlock(g_mu);
}
static void read_and_write_test_write_handler(void* data,
grpc_error_handle error) {
struct read_and_write_test_state* state =
static_cast<struct read_and_write_test_state*>(data);
grpc_slice* slices = nullptr;
size_t nslices;
if (error.ok()) {
state->bytes_written += state->current_write_size;
if (state->target_bytes - state->bytes_written <
@ -183,25 +190,20 @@ static void read_and_write_test_write_handler(void* data,
state->current_write_size = state->target_bytes - state->bytes_written;
}
if (state->current_write_size != 0) {
slices = allocate_blocks(state->current_write_size, 8192, &nslices,
&state->current_write_data);
size_t nslices;
grpc_slice* slices =
allocate_blocks(state->current_write_size, 8192, &nslices,
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
// We perform many writes one after another. If grpc_endpoint_write and
// the write_handler are both run inline, we might end up growing the
// stack beyond the limit. Schedule the write on ExecCtx to avoid this.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
absl::OkStatus());
gpr_free(slices);
return;
}
}
VLOG(2) << "Write handler done";
gpr_mu_lock(g_mu);
state->write_done = 1 + (error.ok());
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr));
gpr_mu_unlock(g_mu);
// We perform many writes one after another. If grpc_endpoint_write and
// the write_handler are both run inline, we might end up growing the
// stack beyond the limit. Schedule the write on ExecCtx to avoid this.
grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->write_scheduler,
std::move(error));
}
// Do both reading and writing using the grpc_endpoint API.
@ -261,10 +263,13 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
/*urgent=*/false, /*min_progress_size=*/1);
if (shutdown) {
grpc_core::MutexLock lock(&state.ep_mu);
VLOG(2) << "shutdown read";
grpc_endpoint_shutdown(state.read_ep, GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(state.read_ep);
state.read_ep = nullptr;
VLOG(2) << "shutdown write";
grpc_endpoint_shutdown(state.write_ep, GRPC_ERROR_CREATE("Test Shutdown"));
grpc_endpoint_destroy(state.write_ep);
state.write_ep = nullptr;
}
grpc_core::ExecCtx::Get()->Flush();
@ -281,8 +286,10 @@ static void read_and_write_test(grpc_endpoint_test_config config,
end_test(config);
grpc_slice_buffer_destroy(&state.outgoing);
grpc_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
if (!shutdown) {
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
}
}
static void inc_on_failure(void* arg, grpc_error_handle error) {
@ -310,48 +317,11 @@ static void wait_for_fail_count(int* fail_count, int want_fail_count) {
gpr_mu_unlock(g_mu);
}
static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_endpoint_test_fixture f =
begin_test(config, "multiple_shutdown_test", 128);
int fail_count = 0;
grpc_slice_buffer slice_buffer;
grpc_slice_buffer_init(&slice_buffer);
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
/*urgent=*/false, /*min_progress_size=*/1);
wait_for_fail_count(&fail_count, 0);
grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&fail_count, 1);
grpc_endpoint_read(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
/*urgent=*/false, /*min_progress_size=*/1);
wait_for_fail_count(&fail_count, 2);
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
grpc_schedule_on_exec_ctx),
nullptr, /*max_frame_size=*/INT_MAX);
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("Test Shutdown"));
wait_for_fail_count(&fail_count, 3);
grpc_slice_buffer_destroy(&slice_buffer);
grpc_endpoint_destroy(f.client_ep);
grpc_endpoint_destroy(f.server_ep);
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset* pollset, gpr_mu* mu) {
size_t i;
g_pollset = pollset;
g_mu = mu;
multiple_shutdown_test(config);
for (int i = 1; i <= 10000; i = i * 10) {
read_and_write_test(config, 10000000, 100000, 8192, i, false);
read_and_write_test(config, 1000000, 100000, 1, i, false);

@ -49,7 +49,6 @@ static void finish_connection() {
static void must_succeed(void* arg, grpc_error_handle error) {
CHECK(g_connecting != nullptr);
CHECK(error.ok());
grpc_endpoint_shutdown(g_connecting, GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(g_connecting);
g_connecting = nullptr;
finish_connection();

@ -151,7 +151,7 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
- (void)tearDown {
grpc_core::ExecCtx exec_ctx;
close(svr_fd_);
grpc_endpoint_destroy(ep_);
if (ep_ != nullptr) grpc_endpoint_destroy(ep_);
}
- (void)testReadWrite {
@ -205,7 +205,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
}
XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
grpc_slice_buffer_reset_and_unref(&write_slices);
grpc_slice_buffer_reset_and_unref(&read_one_slice);
@ -253,7 +252,8 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
std::future<grpc_error_handle> read_future = read_promise.get_future();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_endpoint_destroy(ep_);
ep_ = nullptr;
grpc_core::ExecCtx::Get()->Flush();
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
@ -309,7 +309,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), absl::OkStatus());
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
grpc_slice_buffer_reset_and_unref(&write_slices);
}
@ -337,7 +336,6 @@ static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const ch
XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
XCTAssertNotEqual(read_future.get(), absl::OkStatus());
grpc_endpoint_shutdown(ep_, absl::OkStatus());
grpc_slice_buffer_reset_and_unref(&read_slices);
}

@ -75,8 +75,6 @@ static void finish_connection() {
static void must_succeed(void* /*arg*/, grpc_error_handle error) {
ASSERT_NE(g_connecting, nullptr);
ASSERT_TRUE(error.ok());
grpc_endpoint_shutdown(g_connecting,
GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(g_connecting);
g_connecting = nullptr;
finish_connection();

@ -158,7 +158,6 @@ static void test_addr_init_str(test_addr* addr) {
static void on_connect(void* /*arg*/, grpc_endpoint* tcp,
grpc_pollset* /*pollset*/,
grpc_tcp_server_acceptor* acceptor) {
grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(tcp);
on_connect_result temp_result;

@ -82,11 +82,6 @@ static void me_add_to_pollset_set(grpc_endpoint* /*ep*/,
static void me_delete_from_pollset_set(grpc_endpoint* /*ep*/,
grpc_pollset_set* /*pollset*/) {}
static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
intercept_endpoint* m = reinterpret_cast<intercept_endpoint*>(ep);
grpc_endpoint_shutdown(m->wrapped_ep, why);
}
static void me_destroy(grpc_endpoint* ep) {
intercept_endpoint* m = reinterpret_cast<intercept_endpoint*>(ep);
grpc_endpoint_destroy(m->wrapped_ep);
@ -111,7 +106,6 @@ static const grpc_endpoint_vtable vtable = {me_read,
me_add_to_pollset,
me_add_to_pollset_set,
me_delete_from_pollset_set,
me_shutdown,
me_destroy,
me_get_peer,
me_get_local_address,
@ -294,8 +288,6 @@ static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
ASSERT_EQ(incoming.count, 1);
ASSERT_TRUE(grpc_slice_eq(s, incoming.slices[0]));
grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_shutdown(f.server_ep, GRPC_ERROR_CREATE("test_leftover end"));
grpc_endpoint_destroy(f.client_ep);
grpc_endpoint_destroy(f.server_ep);

@ -110,6 +110,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
handshake_mgr->Shutdown(
absl::DeadlineExceededError("handshake did not fail as expected"));
}
sc.reset(DEBUG_LOCATION, "test");
grpc_server_credentials_release(creds);
grpc_core::ExecCtx::Get()->Flush();

@ -127,7 +127,6 @@ static void on_connect(void* vargs, grpc_endpoint* tcp,
grpc_tcp_server_acceptor* acceptor) {
gpr_free(acceptor);
struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(tcp);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick",

@ -66,7 +66,6 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list* new_tail;
peer = grpc_endpoint_get_peer(tcp);
grpc_endpoint_shutdown(tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(tcp);
last_colon = peer.rfind(':');
if (server->peer == nullptr) {

@ -100,7 +100,8 @@ class GracefulShutdownTest : public ::testing::Test {
CHECK(core_server->SetupTransport(transport, nullptr,
core_server->channel_args(),
nullptr) == absl::OkStatus());
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
// Start polling on the client
Notification client_poller_thread_started_notification;
client_poll_thread_ = std::make_unique<std::thread>(
@ -131,6 +132,8 @@ class GracefulShutdownTest : public ::testing::Test {
// Start reading on the client
grpc_slice_buffer_init(&read_buffer_);
GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
GRPC_CLOSURE_INIT(&on_read_done_scheduler_, OnReadDoneScheduler, this,
nullptr);
grpc_endpoint_read(fds_.client, &read_buffer_, &on_read_done_, false,
/*min_progress_size=*/1);
}
@ -139,13 +142,15 @@ class GracefulShutdownTest : public ::testing::Test {
void ShutdownAndDestroy() {
shutdown_ = true;
ExecCtx exec_ctx;
grpc_endpoint_shutdown(fds_.client, GRPC_ERROR_CREATE("Client shutdown"));
{
MutexLock lock(&ep_destroy_mu_);
grpc_endpoint_destroy(fds_.client);
fds_.client = nullptr;
}
ExecCtx::Get()->Flush();
client_poll_thread_->join();
CHECK(read_end_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5)));
grpc_endpoint_destroy(fds_.client);
ExecCtx::Get()->Flush();
// Shutdown and destroy server
grpc_server_shutdown_and_notify(server_, cq_, Tag(1000));
cqv_->Expect(Tag(1000), true);
@ -166,13 +171,24 @@ class GracefulShutdownTest : public ::testing::Test {
}
self->read_cv_.SignalAll();
}
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->fds_.client, &self->read_buffer_,
&self->on_read_done_, false, /*min_progress_size=*/1);
} else {
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();
MutexLock lock(&self->ep_destroy_mu_);
if (self->fds_.client != nullptr) {
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->fds_.client, &self->read_buffer_,
&self->on_read_done_scheduler_, false,
/*min_progress_size=*/1);
return;
}
}
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();
}
// Do async hop for OnReadDone() in case grpc_endpoint_read() invokes
// us synchronously while we're holding the lock.
static void OnReadDoneScheduler(void* arg, grpc_error_handle error) {
GracefulShutdownTest* self = static_cast<GracefulShutdownTest*>(arg);
ExecCtx::Run(DEBUG_LOCATION, &self->on_read_done_, std::move(error));
}
// Waits for \a bytes to show up in read_bytes_
@ -274,6 +290,9 @@ class GracefulShutdownTest : public ::testing::Test {
on_write_done_notification_->Notify();
}
// Held when destroying fds_.client so we know not to start another read.
Mutex ep_destroy_mu_;
grpc_endpoint_pair fds_;
grpc_server* server_ = nullptr;
grpc_completion_queue* cq_ = nullptr;
@ -281,6 +300,7 @@ class GracefulShutdownTest : public ::testing::Test {
std::unique_ptr<std::thread> client_poll_thread_;
std::atomic<bool> shutdown_{false};
grpc_closure on_read_done_;
grpc_closure on_read_done_scheduler_;
Mutex mu_;
CondVar read_cv_;
Notification read_end_notification_;

@ -177,14 +177,15 @@ class Client {
LOG(INFO) << "client read " << read_buffer.length << " bytes";
grpc_slice_buffer_reset_and_unref(&read_buffer);
}
grpc_endpoint_shutdown(endpoint_, GRPC_ERROR_CREATE("shutdown"));
grpc_endpoint_destroy(endpoint_);
endpoint_ = nullptr;
grpc_slice_buffer_destroy(&read_buffer);
return retval;
}
void Shutdown() {
ExecCtx exec_ctx;
grpc_endpoint_destroy(endpoint_);
if (endpoint_ != nullptr) grpc_endpoint_destroy(endpoint_);
grpc_pollset_shutdown(pollset_,
GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
grpc_schedule_on_exec_ctx));

@ -231,6 +231,8 @@ class StreamsNotSeenTest : public ::testing::Test {
TrailingMetadataRecordingFilter::reset_state();
grpc_slice_buffer_init(&read_buffer_);
GRPC_CLOSURE_INIT(&on_read_done_, OnReadDone, this, nullptr);
GRPC_CLOSURE_INIT(&on_read_done_scheduler_, OnReadDoneScheduler, this,
nullptr);
// Start the test tcp server
port_ = grpc_pick_unused_port_or_die();
test_tcp_server_init(&server_, OnConnect, this);
@ -282,11 +284,10 @@ class StreamsNotSeenTest : public ::testing::Test {
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cq_);
grpc_channel_destroy(channel_);
grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Test Shutdown"));
if (tcp_ != nullptr) grpc_endpoint_destroy(tcp_);
ExecCtx::Get()->Flush();
CHECK(read_end_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5)));
grpc_endpoint_destroy(tcp_);
shutdown_ = true;
server_poll_thread_->join();
test_tcp_server_destroy(&server_);
@ -357,8 +358,13 @@ class StreamsNotSeenTest : public ::testing::Test {
Notification on_write_done_notification_;
GRPC_CLOSURE_INIT(&on_write_done_, OnWriteDone,
&on_write_done_notification_, nullptr);
grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
/*max_frame_size=*/INT_MAX);
{
MutexLock lock(&tcp_destroy_mu_);
if (tcp_ != nullptr) {
grpc_endpoint_write(tcp_, buffer, &on_write_done_, nullptr,
/*max_frame_size=*/INT_MAX);
}
}
ExecCtx::Get()->Flush();
CHECK(on_write_done_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5)));
@ -381,13 +387,30 @@ class StreamsNotSeenTest : public ::testing::Test {
}
self->read_cv_.SignalAll();
}
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->tcp_, &self->read_buffer_, &self->on_read_done_,
false, /*min_progress_size=*/1);
} else {
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();
MutexLock lock(&self->tcp_destroy_mu_);
if (self->tcp_ != nullptr) {
grpc_slice_buffer_reset_and_unref(&self->read_buffer_);
grpc_endpoint_read(self->tcp_, &self->read_buffer_,
&self->on_read_done_scheduler_, false,
/*min_progress_size=*/1);
return;
}
}
grpc_slice_buffer_destroy(&self->read_buffer_);
self->read_end_notification_.Notify();
}
// Async hop for OnReadDone(), in case grpc_endpoint_read() invokes
// the callback synchronously while holding the lock.
static void OnReadDoneScheduler(void* arg, grpc_error_handle error) {
StreamsNotSeenTest* self = static_cast<StreamsNotSeenTest*>(arg);
ExecCtx::Run(DEBUG_LOCATION, &self->on_read_done_, std::move(error));
}
void CloseServerConnection() {
MutexLock lock(&tcp_destroy_mu_);
grpc_endpoint_destroy(tcp_);
tcp_ = nullptr;
}
// Waits for \a bytes to show up in read_bytes_
@ -416,11 +439,14 @@ class StreamsNotSeenTest : public ::testing::Test {
int port_;
test_tcp_server server_;
std::unique_ptr<std::thread> server_poll_thread_;
// Guards destroying tcp_, so that we know not to start the next read/write.
Mutex tcp_destroy_mu_;
grpc_endpoint* tcp_ = nullptr;
Notification connect_notification_;
grpc_slice_buffer read_buffer_;
grpc_closure on_write_done_;
grpc_closure on_read_done_;
grpc_closure on_read_done_scheduler_;
Notification read_end_notification_;
std::string read_bytes_ ABSL_GUARDED_BY(mu_);
grpc_channel* channel_ = nullptr;
@ -543,7 +569,7 @@ TEST_F(StreamsNotSeenTest, TransportDestroyed) {
cqv_->Expect(Tag(101), true);
cqv_->Verify();
// Shutdown the server endpoint
grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
CloseServerConnection();
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
@ -768,7 +794,7 @@ TEST_F(ZeroConcurrencyTest, TransportDestroyed) {
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), Tag(101),
nullptr);
grpc_endpoint_shutdown(tcp_, GRPC_ERROR_CREATE("Server shutdown"));
CloseServerConnection();
CHECK_EQ(error, GRPC_CALL_OK);
cqv_->Expect(Tag(101), true);
cqv_->Verify();

@ -189,7 +189,7 @@ class EndpointPairFixture : public BaseFixture {
server_transport_, nullptr,
server_args, nullptr)));
grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr,
nullptr);
nullptr, nullptr);
}
// create channel
@ -215,7 +215,7 @@ class EndpointPairFixture : public BaseFixture {
->release()
->c_ptr();
grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr,
nullptr);
nullptr, nullptr);
channel_ = grpc::CreateChannelInternal(
"", channel,

@ -130,7 +130,8 @@ class InProcessCHTTP2 {
"SetupTransport",
core_server->SetupTransport(transport, nullptr,
core_server->channel_args(), nullptr)));
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
}
// create channel
{
@ -151,7 +152,8 @@ class InProcessCHTTP2 {
transport)
->release()
->c_ptr();
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr,
nullptr);
channel_ = grpc::CreateChannelInternal(
"", channel,
std::vector<std::unique_ptr<

Loading…
Cancel
Save