|
|
|
@ -22,6 +22,7 @@ |
|
|
|
|
#include <grpc/support/port_platform.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/gprpp/crash.h" |
|
|
|
|
#include "src/core/lib/gprpp/sync.h" |
|
|
|
|
#include "src/core/lib/gprpp/thd.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_event_engine { |
|
|
|
@ -39,20 +40,45 @@ ThreadyEventEngine::CreateListener( |
|
|
|
|
absl::AnyInvocable<void(absl::Status)> on_shutdown, |
|
|
|
|
const EndpointConfig& config, |
|
|
|
|
std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) { |
|
|
|
|
struct AcceptState { |
|
|
|
|
grpc_core::Mutex mu_; |
|
|
|
|
grpc_core::CondVar cv_; |
|
|
|
|
int pending_accepts_ ABSL_GUARDED_BY(mu_) = 0; |
|
|
|
|
}; |
|
|
|
|
auto accept_state = std::make_shared<AcceptState>(); |
|
|
|
|
return impl_->CreateListener( |
|
|
|
|
[this, on_accept = std::make_shared<Listener::AcceptCallback>( |
|
|
|
|
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint, |
|
|
|
|
MemoryAllocator memory_allocator) { |
|
|
|
|
[this, accept_state, |
|
|
|
|
on_accept = std::make_shared<Listener::AcceptCallback>( |
|
|
|
|
std::move(on_accept))](std::unique_ptr<Endpoint> endpoint, |
|
|
|
|
MemoryAllocator memory_allocator) { |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&accept_state->mu_); |
|
|
|
|
++accept_state->pending_accepts_; |
|
|
|
|
} |
|
|
|
|
Asynchronously( |
|
|
|
|
[on_accept, endpoint = std::move(endpoint), |
|
|
|
|
[on_accept, accept_state, endpoint = std::move(endpoint), |
|
|
|
|
memory_allocator = std::move(memory_allocator)]() mutable { |
|
|
|
|
(*on_accept)(std::move(endpoint), std::move(memory_allocator)); |
|
|
|
|
{ |
|
|
|
|
grpc_core::MutexLock lock(&accept_state->mu_); |
|
|
|
|
--accept_state->pending_accepts_; |
|
|
|
|
if (accept_state->pending_accepts_ == 0) { |
|
|
|
|
accept_state->cv_.Signal(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
}, |
|
|
|
|
[this, |
|
|
|
|
[this, accept_state, |
|
|
|
|
on_shutdown = std::move(on_shutdown)](absl::Status status) mutable { |
|
|
|
|
Asynchronously([on_shutdown = std::move(on_shutdown), |
|
|
|
|
Asynchronously([accept_state, on_shutdown = std::move(on_shutdown), |
|
|
|
|
status = std::move(status)]() mutable { |
|
|
|
|
while (true) { |
|
|
|
|
grpc_core::MutexLock lock(&accept_state->mu_); |
|
|
|
|
if (accept_state->pending_accepts_ == 0) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
accept_state->cv_.Wait(&accept_state->mu_); |
|
|
|
|
} |
|
|
|
|
on_shutdown(std::move(status)); |
|
|
|
|
}); |
|
|
|
|
}, |
|
|
|
|