[EventEngine] Update Endpoint API to return true if read/write succeeds immediately (#32370)

Requires cherrypick
pull/32465/head
Vignesh Babu 2 years ago committed by GitHub
parent 184d4826d4
commit 8fdc82f33d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 29
      include/grpc/event_engine/event_engine.h
  3. 61
      src/core/lib/event_engine/posix_engine/posix_endpoint.cc
  4. 16
      src/core/lib/event_engine/posix_engine/posix_endpoint.h
  5. 3
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  6. 20
      src/core/lib/event_engine/windows/windows_endpoint.cc
  7. 4
      src/core/lib/event_engine/windows/windows_endpoint.h
  8. 179
      src/core/lib/iomgr/event_engine_shims/endpoint.cc
  9. 22
      test/core/event_engine/event_engine_test_utils.cc
  10. 6
      test/core/event_engine/test_suite/posix/oracle_event_engine_posix.cc
  11. 4
      test/core/event_engine/test_suite/posix/oracle_event_engine_posix.h
  12. 1
      test/core/event_engine/test_suite/posix_event_engine_test.cc
  13. 14
      test/core/event_engine/windows/windows_endpoint_test.cc

@ -1468,6 +1468,7 @@ grpc_cc_library(
"//src/core:channel_stack_type",
"//src/core:chunked_vector",
"//src/core:closure",
"//src/core:construct_destruct",
"//src/core:context",
"//src/core:default_event_engine",
"//src/core:dual_ref_counted",

@ -181,10 +181,13 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// Reads data from the Endpoint.
///
/// When data is available on the connection, that data is moved into the
/// \a buffer, and the \a on_read callback is called. The caller must ensure
/// that the callback has access to the buffer when executed later.
/// Ownership of the buffer is not transferred. Valid slices *may* be placed
/// into the buffer even if the callback is invoked with a non-OK Status.
/// \a buffer. If the read succeeds immediately, it returns true and the \a
/// on_read callback is not executed. Otherwise it returns false and the \a
/// on_read callback executes asynchronously when the read completes. The
/// caller must ensure that the callback has access to the buffer when it
/// executes. Ownership of the buffer is not transferred. Valid slices *may*
/// be placed into the buffer even if the callback is invoked with a non-OK
/// Status.
///
/// There can be at most one outstanding read per Endpoint at any given
/// time. An outstanding read is one in which the \a on_read callback has
@ -195,7 +198,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// For failed read operations, implementations should pass the appropriate
/// statuses to \a on_read. For example, callbacks might expect to receive
/// CANCELLED on endpoint shutdown.
virtual void Read(absl::AnyInvocable<void(absl::Status)> on_read,
virtual bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) = 0;
/// A struct representing optional arguments that may be provided to an
/// EventEngine Endpoint Write API call.
@ -213,12 +216,14 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
};
/// Writes data out on the connection.
///
/// \a on_writable is called when the connection is ready for more data. The
/// Slices within the \a data buffer may be mutated at will by the Endpoint
/// until \a on_writable is called. The \a data SliceBuffer will remain
/// valid after calling \a Write, but its state is otherwise undefined. All
/// bytes in \a data must have been written before calling \a on_writable
/// unless an error has occurred.
/// If the write succeeds immediately, it returns true and the
/// \a on_writable callback is not executed. Otherwise it returns false and
/// the \a on_writable callback is called asynchronously when the connection
/// is ready for more data. The Slices within the \a data buffer may be
/// mutated at will by the Endpoint until \a on_writable is called. The \a
/// data SliceBuffer will remain valid after calling \a Write, but its state
/// is otherwise undefined. All bytes in \a data must have been written
/// before calling \a on_writable unless an error has occurred.
///
/// There can be at most one outstanding write per Endpoint at any given
/// time. An outstanding write is one in which the \a on_writable callback
@ -229,7 +234,7 @@ class EventEngine : public std::enable_shared_from_this<EventEngine> {
/// For failed write operations, implementations should pass the appropriate
/// statuses to \a on_writable. For example, callbacks might expect to
/// receive CANCELLED on endpoint shutdown.
virtual void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
virtual bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) = 0;
/// Returns an address in the format described in DNSResolver. The returned
/// values are expected to remain valid for the life of the Endpoint.

@ -575,12 +575,11 @@ void PosixEndpointImpl::HandleRead(absl::Status status) {
Unref();
}
void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
bool PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer,
const EventEngine::Endpoint::ReadArgs* args) {
grpc_core::ReleasableMutexLock lock(&read_mu_);
GPR_ASSERT(read_cb_ == nullptr);
read_cb_ = std::move(on_read);
incoming_buffer_ = buffer;
incoming_buffer_->Clear();
incoming_buffer_->Swap(last_read_buffer_);
@ -591,6 +590,7 @@ void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
}
Ref().release();
if (is_first_read_) {
read_cb_ = std::move(on_read);
UpdateRcvLowat();
// Endpoint read called for the very first time. Register read callback
// with the polling engine.
@ -598,16 +598,40 @@ void PosixEndpointImpl::Read(absl::AnyInvocable<void(absl::Status)> on_read,
lock.Release();
handle_->NotifyOnRead(on_read_);
} else if (inq_ == 0) {
read_cb_ = std::move(on_read);
UpdateRcvLowat();
lock.Release();
// Upper layer asked to read more but we know there is no pending data to
// read from previous reads. So, wait for POLLIN.
handle_->NotifyOnRead(on_read_);
} else {
lock.Release();
on_read_->SetStatus(absl::OkStatus());
engine_->Run(on_read_);
absl::Status status;
MaybeMakeReadSlices();
if (!TcpDoRead(status)) {
UpdateRcvLowat();
read_cb_ = std::move(on_read);
// We've consumed the edge, request a new one.
lock.Release();
handle_->NotifyOnRead(on_read_);
return false;
}
if (!status.ok()) {
// Read failed immediately. Schedule the on_read callback to run
// asynchronously.
lock.Release();
engine_->Run([on_read = std::move(on_read), status]() mutable {
on_read(status);
});
Unref();
return false;
}
// Read succeeded immediately. Return true and don't run the on_read
// callback.
incoming_buffer_ = nullptr;
Unref();
return true;
}
return false;
}
#ifdef GRPC_LINUX_ERRQUEUE
@ -1103,7 +1127,7 @@ void PosixEndpointImpl::HandleWrite(absl::Status status) {
}
}
void PosixEndpointImpl::Write(
bool PosixEndpointImpl::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const EventEngine::Endpoint::WriteArgs* args) {
absl::Status status = absl::OkStatus();
@ -1114,11 +1138,15 @@ void PosixEndpointImpl::Write(
GPR_DEBUG_ASSERT(data != nullptr);
if (data->Length() == 0) {
on_writable(handle_->IsHandleShutdown()
? TcpAnnotateError(absl::InternalError("EOF"))
: status);
TcpShutdownTracedBufferList();
return;
if (handle_->IsHandleShutdown()) {
status = TcpAnnotateError(absl::InternalError("EOF"));
engine_->Run([on_writable = std::move(on_writable), status]() mutable {
on_writable(status);
});
return false;
}
return true;
}
zerocopy_send_record = TcpGetSendZerocopyRecord(*data);
@ -1142,14 +1170,19 @@ void PosixEndpointImpl::Write(
write_cb_ = std::move(on_writable);
current_zerocopy_send_ = zerocopy_send_record;
handle_->NotifyOnWrite(on_write_);
} else {
// TODO(vigneshbabu): Consider eventually running this callback inline to
// avoid a thread hop. At the time of submission, it causes deadlocks which
// should be reolved after ExecCtx removal.
return false;
}
if (!status.ok()) {
// Write failed immediately. Schedule the on_writable callback to run
// asynchronously.
engine_->Run([on_writable = std::move(on_writable), status]() mutable {
on_writable(status);
});
return false;
}
// Write succeeded immediately. Return true and don't run the on_writable
// callback.
return true;
}
void PosixEndpointImpl::MaybeShutdown(

@ -472,12 +472,12 @@ class PosixEndpointImpl : public grpc_core::RefCounted<PosixEndpointImpl> {
grpc_event_engine::experimental::MemoryAllocator&& allocator,
const PosixTcpOptions& options);
~PosixEndpointImpl() override;
void Read(
bool Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args);
void Write(
bool Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
@ -608,20 +608,20 @@ class PosixEndpoint : public PosixEndpointWithFdSupport {
: impl_(new PosixEndpointImpl(handle, on_shutdown, std::move(engine),
std::move(allocator), options)) {}
void Read(
bool Read(
absl::AnyInvocable<void(absl::Status)> on_read,
grpc_event_engine::experimental::SliceBuffer* buffer,
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
args) override {
impl_->Read(std::move(on_read), buffer, args);
return impl_->Read(std::move(on_read), buffer, args);
}
void Write(
bool Write(
absl::AnyInvocable<void(absl::Status)> on_writable,
grpc_event_engine::experimental::SliceBuffer* data,
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
args) override {
impl_->Write(std::move(on_writable), data, args);
return impl_->Write(std::move(on_writable), data, args);
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
@ -661,14 +661,14 @@ class PosixEndpoint : public PosixEndpointWithFdSupport {
public:
PosixEndpoint() = default;
void Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
bool Read(absl::AnyInvocable<void(absl::Status)> /*on_read*/,
grpc_event_engine::experimental::SliceBuffer* /*buffer*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
ReadArgs* /*args*/) override {
grpc_core::Crash("PosixEndpoint::Read not supported on this platform");
}
void Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
bool Write(absl::AnyInvocable<void(absl::Status)> /*on_writable*/,
grpc_event_engine::experimental::SliceBuffer* /*data*/,
const grpc_event_engine::experimental::EventEngine::Endpoint::
WriteArgs* /*args*/) override {

@ -299,7 +299,8 @@ PosixEnginePollerManager::PosixEnginePollerManager(
PosixEnginePollerManager::PosixEnginePollerManager(PosixEventPoller* poller)
: poller_(poller),
poller_state_(PollerState::kExternal),
executor_(nullptr) {
executor_(nullptr),
trigger_shutdown_called_(false) {
GPR_DEBUG_ASSERT(poller_ != nullptr);
}

@ -65,14 +65,14 @@ WindowsEndpoint::~WindowsEndpoint() {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p destoyed", this);
}
void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
bool WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* /* args */) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p reading", this);
if (io_state_->socket->IsShutdown()) {
executor_->Run([on_read = std::move(on_read)]() mutable {
on_read(absl::UnavailableError("Socket is shutting down."));
});
return;
return false;
}
// Prepare the WSABUF struct
WSABUF wsa_buffers[kMaxWSABUFCount];
@ -116,7 +116,7 @@ void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
}
executor_->Run(
[result, on_read = std::move(on_read)]() mutable { on_read(result); });
return;
return false;
}
// Otherwise, let's retry, by queuing a read.
memset(io_state_->socket->read_info()->overlapped(), 0, sizeof(OVERLAPPED));
@ -131,20 +131,21 @@ void WindowsEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
wsa_error,
absl::StrFormat("WindowsEndpont::%p Read failed", this).c_str()));
});
return;
return false;
}
io_state_->handle_read_event.Prime(io_state_, buffer, std::move(on_read));
io_state_->socket->NotifyOnRead(&io_state_->handle_read_event);
return false;
}
void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
bool WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* /* args */) {
GRPC_EVENT_ENGINE_ENDPOINT_TRACE("WindowsEndpoint::%p writing", this);
if (io_state_->socket->IsShutdown()) {
executor_->Run([on_writable = std::move(on_writable)]() mutable {
on_writable(absl::UnavailableError("Socket is shutting down."));
});
return;
return false;
}
if (grpc_event_engine_endpoint_data_trace.enabled()) {
for (size_t i = 0; i < data->Count(); i++) {
@ -171,7 +172,7 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
// Write completed, exiting early
executor_->Run(
[cb = std::move(on_writable)]() mutable { cb(absl::OkStatus()); });
return;
return false;
}
// The data was not completely delivered, we should send the rest of it by
// doing an async write operation.
@ -193,7 +194,7 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
executor_->Run([cb = std::move(on_writable), wsa_error]() mutable {
cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
});
return;
return false;
}
}
auto write_info = io_state_->socket->write_info();
@ -209,13 +210,14 @@ void WindowsEndpoint::Write(absl::AnyInvocable<void(absl::Status)> on_writable,
executor_->Run([cb = std::move(on_writable), wsa_error]() mutable {
cb(GRPC_WSA_ERROR(wsa_error, "WSASend"));
});
return;
return false;
}
}
// As all is now setup, we can now ask for the IOCP notification. It may
// trigger the callback immediately however, but no matter.
io_state_->handle_write_event.Prime(io_state_, data, std::move(on_writable));
io_state_->socket->NotifyOnWrite(&io_state_->handle_write_event);
return false;
}
const EventEngine::ResolvedAddress& WindowsEndpoint::GetPeerAddress() const {
return peer_address_;

@ -31,9 +31,9 @@ class WindowsEndpoint : public EventEngine::Endpoint {
MemoryAllocator&& allocator, const EndpointConfig& config,
Executor* Executor);
~WindowsEndpoint() override;
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
const EventEngine::ResolvedAddress& GetPeerAddress() const override;
const EventEngine::ResolvedAddress& GetLocalAddress() const override;

@ -33,6 +33,8 @@
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/construct_destruct.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/endpoint.h"
@ -94,15 +96,101 @@ class EventEngineEndpointWrapper {
grpc_endpoint* GetGrpcEndpoint() { return &eeep_->base; }
// Read using the underlying EventEngine endpoint object.
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
bool Read(grpc_closure* read_cb, grpc_slice_buffer* pending_read_buffer,
const EventEngine::Endpoint::ReadArgs* args) {
endpoint_->Read(std::move(on_read), buffer, args);
Ref();
pending_read_cb_ = read_cb;
pending_read_buffer_ = pending_read_buffer;
// TODO(vigneshbabu): Use SliceBufferCast<> here.
grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer),
SliceBuffer::TakeCSliceBuffer(*pending_read_buffer_));
SliceBuffer* read_buffer =
reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
read_buffer->Clear();
return endpoint_->Read(
[this](absl::Status status) { FinishPendingRead(status); }, read_buffer,
args);
}
void FinishPendingRead(absl::Status status) {
auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->read_buffer);
grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(),
pending_read_buffer_);
read_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep_->wrapper,
std::string(eeep_->wrapper->PeerAddress()).c_str(),
status.ToString().c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < pending_read_buffer_->count; i++) {
char* dump = grpc_dump_slice(pending_read_buffer_->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
gpr_free(dump);
}
}
}
pending_read_buffer_ = nullptr;
grpc_closure* cb = pending_read_cb_;
pending_read_cb_ = nullptr;
if (grpc_core::ExecCtx::Get() == nullptr) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
} else {
grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EventEngineEndpointWrapper::Read().
Unref();
}
// Write using the underlying EventEngine endpoint object
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* args) {
endpoint_->Write(std::move(on_writable), data, args);
bool Write(grpc_closure* write_cb, grpc_slice_buffer* slices,
const EventEngine::Endpoint::WriteArgs* args) {
Ref();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", this,
std::string(PeerAddress()).c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < slices->count; i++) {
char* dump =
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
gpr_free(dump);
}
}
}
// TODO(vigneshbabu): Use SliceBufferCast<> here.
grpc_core::Construct(reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer),
SliceBuffer::TakeCSliceBuffer(*slices));
SliceBuffer* write_buffer =
reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
pending_write_cb_ = write_cb;
return endpoint_->Write(
[this](absl::Status status) { FinishPendingWrite(status); },
write_buffer, args);
}
void FinishPendingWrite(absl::Status status) {
auto* write_buffer = reinterpret_cast<SliceBuffer*>(&eeep_->write_buffer);
write_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", this,
std::string(PeerAddress()).c_str(), status.ToString().c_str());
}
grpc_closure* cb = pending_write_cb_;
pending_write_cb_ = nullptr;
if (grpc_core::ExecCtx::Get() == nullptr) {
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
} else {
grpc_core::Closure::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EventEngineEndpointWrapper::Write().
Unref();
}
// Returns true if the endpoint is not yet shutdown. In that case, it also
@ -186,6 +274,9 @@ class EventEngineEndpointWrapper {
std::atomic<int64_t> shutdown_ref_{1};
absl::AnyInvocable<void(absl::StatusOr<int>)> on_release_fd_;
grpc_core::Mutex mu_;
grpc_closure* pending_read_cb_;
grpc_closure* pending_write_cb_;
grpc_slice_buffer* pending_read_buffer_;
std::string peer_address_;
std::string local_address_;
int fd_{-1};
@ -204,41 +295,11 @@ void EndpointRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
return;
}
eeep->wrapper->Ref();
EventEngine::Endpoint::ReadArgs read_args = {min_progress_size};
// TODO(vigneshbabu): Use SliceBufferCast<> here.
SliceBuffer* read_buffer = new (&eeep->read_buffer)
SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices));
read_buffer->Clear();
eeep->wrapper->Read(
[eeep, cb, slices](absl::Status status) {
auto* read_buffer = reinterpret_cast<SliceBuffer*>(&eeep->read_buffer);
grpc_slice_buffer_move_into(read_buffer->c_slice_buffer(), slices);
read_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p READ (peer=%s) error=%s", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str(),
status.ToString().c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < slices->count; i++) {
char* dump = grpc_dump_slice(slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ DATA: %s", dump);
gpr_free(dump);
}
}
}
{
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EndpointRead
eeep->wrapper->Unref();
},
read_buffer, &read_args);
if (eeep->wrapper->Read(cb, slices, &read_args)) {
// Read succeeded immediately. Run the callback inline.
eeep->wrapper->FinishPendingRead(absl::OkStatus());
}
eeep->wrapper->ShutdownUnref();
}
@ -256,45 +317,11 @@ void EndpointWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
return;
}
eeep->wrapper->Ref();
EventEngine::Endpoint::WriteArgs write_args = {arg, max_frame_size};
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
size_t i;
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s)", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str());
if (gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
for (i = 0; i < slices->count; i++) {
char* dump =
grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE DATA: %s", dump);
gpr_free(dump);
}
}
if (eeep->wrapper->Write(cb, slices, &write_args)) {
// Write succeeded immediately. Run the callback inline.
eeep->wrapper->FinishPendingWrite(absl::OkStatus());
}
// TODO(vigneshbabu): Use SliceBufferCast<> here.
SliceBuffer* write_buffer = new (&eeep->write_buffer)
SliceBuffer(SliceBuffer::TakeCSliceBuffer(*slices));
eeep->wrapper->Write(
[eeep, cb](absl::Status status) {
auto* write_buffer =
reinterpret_cast<SliceBuffer*>(&eeep->write_buffer);
write_buffer->~SliceBuffer();
if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace)) {
gpr_log(GPR_INFO, "TCP: %p WRITE (peer=%s) error=%s", eeep->wrapper,
std::string(eeep->wrapper->PeerAddress()).c_str(),
status.ToString().c_str());
}
{
grpc_core::ApplicationCallbackExecCtx app_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, status);
}
// For the ref taken in EndpointWrite
eeep->wrapper->Unref();
},
write_buffer, &write_args);
eeep->wrapper->ShutdownUnref();
}

@ -129,17 +129,23 @@ absl::Status SendValidatePayload(absl::string_view data,
args.read_hint_bytes -= read_slice_buf.Length();
read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
read_store_buf);
receive_endpoint->Read(read_cb, &read_slice_buf, &args);
if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
read_cb(absl::OkStatus());
}
};
// Start asynchronous reading at the receive_endpoint.
receive_endpoint->Read(read_cb, &read_slice_buf, &args);
if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
read_cb(absl::OkStatus());
}
// Start asynchronous writing at the send_endpoint.
send_endpoint->Write(
[&write_signal](absl::Status status) {
GPR_ASSERT(status.ok());
write_signal.Notify();
},
&write_slice_buf, nullptr);
if (send_endpoint->Write(
[&write_signal](absl::Status status) {
GPR_ASSERT(status.ok());
write_signal.Notify();
},
&write_slice_buf, nullptr)) {
write_signal.Notify();
}
write_signal.WaitForNotification();
read_signal.WaitForNotification();
// Check if data written == data read

@ -231,7 +231,7 @@ PosixOracleEndpoint::~PosixOracleEndpoint() {
close(socket_fd_);
}
void PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
bool PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
SliceBuffer* buffer, const ReadArgs* args) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(buffer != nullptr);
@ -241,15 +241,17 @@ void PosixOracleEndpoint::Read(absl::AnyInvocable<void(absl::Status)> on_read,
read_ops_channel_ =
ReadOperation(read_hint_bytes, buffer, std::move(on_read));
read_op_signal_->Notify();
return false;
}
void PosixOracleEndpoint::Write(
bool PosixOracleEndpoint::Write(
absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
const WriteArgs* /*args*/) {
grpc_core::MutexLock lock(&mu_);
GPR_ASSERT(data != nullptr);
write_ops_channel_ = WriteOperation(data, std::move(on_writable));
write_op_signal_->Notify();
return false;
}
void PosixOracleEndpoint::ProcessReadOperations() {

@ -44,9 +44,9 @@ class PosixOracleEndpoint : public EventEngine::Endpoint {
explicit PosixOracleEndpoint(int socket_fd);
static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd);
~PosixOracleEndpoint() override;
void Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
const ReadArgs* args) override;
void Write(absl::AnyInvocable<void(absl::Status)> on_writable,
bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
SliceBuffer* data, const WriteArgs* args) override;
void Shutdown();
EventEngine::ResolvedAddress& GetPeerAddress() const override {

@ -44,6 +44,7 @@ int main(int argc, char** argv) {
grpc_event_engine::experimental::InitServerTests();
// TODO(vigneshbabu): remove when the experiment is over
grpc_core::ForceEnableExperiment("event_engine_client", true);
grpc_core::ForceEnableExperiment("event_engine_listener", true);
// TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
// until we clear out the iomgr shutdown code.
grpc_init();

@ -61,19 +61,20 @@ TEST_F(WindowsEndpointTest, BasicCommunication) {
std::string message = "0xDEADBEEF";
grpc_core::Notification read_done;
SliceBuffer read_buffer;
server.Read(
EXPECT_FALSE(server.Read(
[&read_done, &message, &read_buffer](absl::Status) {
ASSERT_EQ(read_buffer.Count(), 1u);
auto slice = read_buffer.TakeFirst();
EXPECT_EQ(slice.as_string_view(), message);
read_done.Notify();
},
&read_buffer, nullptr);
&read_buffer, nullptr));
grpc_core::Notification write_done;
SliceBuffer write_buffer;
write_buffer.Append(Slice::FromCopiedString(message));
client.Write([&write_done](absl::Status) { write_done.Notify(); },
&write_buffer, nullptr);
EXPECT_FALSE(
client.Write([&write_done](absl::Status) { write_done.Notify(); },
&write_buffer, nullptr));
iocp.Work(5s, []() {});
// Cleanup
write_done.WaitForNotification();
@ -121,10 +122,11 @@ TEST_F(WindowsEndpointTest, Conversation) {
void WriteAndQueueReader(WindowsEndpoint* writer, WindowsEndpoint* reader) {
write_buffer.Clear();
write_buffer.Append(Slice::FromCopiedString(messages[exchange]));
writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr);
EXPECT_FALSE(
writer->Write([](absl::Status) {}, &write_buffer, /*args=*/nullptr));
auto cb = [this](absl::Status status) { ReadCB(status); };
read_buffer.Clear();
reader->Read(cb, &read_buffer, /*args=*/nullptr);
EXPECT_FALSE(reader->Read(cb, &read_buffer, /*args=*/nullptr));
}
// Asserts that the received string matches, then queues the next Write/Read

Loading…
Cancel
Save