diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 81385a1a271..f3087e23db8 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -57,6 +57,8 @@ class HttpConnectHandshaker : public Handshaker { void HandshakeFailedLocked(grpc_error* error); static void OnWriteDone(void* arg, grpc_error* error); static void OnReadDone(void* arg, grpc_error* error); + static void OnWriteDoneScheduler(void* arg, grpc_error* error); + static void OnReadDoneScheduler(void* arg, grpc_error* error); Mutex mu_; @@ -127,6 +129,18 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error* error) { ExecCtx::Run(DEBUG_LOCATION, on_handshake_done_, error); } +// This callback can be invoked inline while already holding onto the mutex. To +// avoid deadlocks, schedule OnWriteDone on ExecCtx. +void HttpConnectHandshaker::OnWriteDoneScheduler(void* arg, grpc_error* error) { + auto* handshaker = static_cast(arg); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_INIT(&handshaker->request_done_closure_, + &HttpConnectHandshaker::OnWriteDone, handshaker, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_REF(error)); +} + // Callback invoked when finished writing HTTP CONNECT request. void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) { auto* handshaker = static_cast(arg); @@ -140,13 +154,27 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) { } else { // Otherwise, read the response. // The read callback inherits our ref to the handshaker. - lock.Unlock(); - grpc_endpoint_read(handshaker->args_->endpoint, - handshaker->args_->read_buffer, - &handshaker->response_read_closure_, /*urgent=*/true); + grpc_endpoint_read( + handshaker->args_->endpoint, handshaker->args_->read_buffer, + GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, + &HttpConnectHandshaker::OnReadDoneScheduler, + handshaker, grpc_schedule_on_exec_ctx), + /*urgent=*/true); } } +// This callback can be invoked inline while already holding onto the mutex. To +// avoid deadlocks, schedule OnReadDone on ExecCtx. +void HttpConnectHandshaker::OnReadDoneScheduler(void* arg, grpc_error* error) { + auto* handshaker = static_cast(arg); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, + &HttpConnectHandshaker::OnReadDone, handshaker, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_REF(error)); +} + // Callback invoked for reading HTTP CONNECT response. void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) { auto* handshaker = static_cast(arg); @@ -202,10 +230,12 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) { // at the Content-Length: header). if (handshaker->http_parser_.state != GRPC_HTTP_BODY) { grpc_slice_buffer_reset_and_unref_internal(handshaker->args_->read_buffer); - lock.Unlock(); - grpc_endpoint_read(handshaker->args_->endpoint, - handshaker->args_->read_buffer, - &handshaker->response_read_closure_, /*urgent=*/true); + grpc_endpoint_read( + handshaker->args_->endpoint, handshaker->args_->read_buffer, + GRPC_CLOSURE_INIT(&handshaker->response_read_closure_, + &HttpConnectHandshaker::OnReadDoneScheduler, + handshaker, grpc_schedule_on_exec_ctx), + /*urgent=*/true); return; } // Make sure we got a 2xx response. @@ -290,7 +320,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, } } // Save state in the handshaker object. - ReleasableMutexLock lock(&mu_); + MutexLock lock(&mu_); args_ = args; on_handshake_done_ = on_handshake_done; // Log connection via proxy. @@ -320,17 +350,16 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* /*acceptor*/, gpr_free(header_strings); // Take a new ref to be held by the write callback. Ref().release(); - lock.Unlock(); - grpc_endpoint_write(args->endpoint, &write_buffer_, &request_done_closure_, - nullptr); + grpc_endpoint_write( + args->endpoint, &write_buffer_, + GRPC_CLOSURE_INIT(&request_done_closure_, + &HttpConnectHandshaker::OnWriteDoneScheduler, this, + grpc_schedule_on_exec_ctx), + nullptr); } HttpConnectHandshaker::HttpConnectHandshaker() { grpc_slice_buffer_init(&write_buffer_); - GRPC_CLOSURE_INIT(&request_done_closure_, &HttpConnectHandshaker::OnWriteDone, - this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&response_read_closure_, &HttpConnectHandshaker::OnReadDone, - this, grpc_schedule_on_exec_ctx); grpc_http_parser_init(&http_parser_, GRPC_HTTP_RESPONSE, &http_response_); } diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index bbe86376caa..1ffdd7a3bf9 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -66,10 +66,12 @@ class SecurityHandshaker : public Handshaker { void HandshakeFailedLocked(grpc_error* error); void CleanupArgsForFailureLocked(); - static void ScheduleRead(void* arg, grpc_error* /* error */); - static void ScheduleWrite(void* arg, grpc_error* /* error */); static void OnHandshakeDataReceivedFromPeerFn(void* arg, grpc_error* error); static void OnHandshakeDataSentToPeerFn(void* arg, grpc_error* error); + static void OnHandshakeDataReceivedFromPeerFnScheduler(void* arg, + grpc_error* error); + static void OnHandshakeDataSentToPeerFnScheduler(void* arg, + grpc_error* error); static void OnHandshakeNextDoneGrpcWrapper( tsi_result result, void* user_data, const unsigned char* bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result* handshaker_result); @@ -96,8 +98,6 @@ class SecurityHandshaker : public Handshaker { size_t handshake_buffer_size_; unsigned char* handshake_buffer_; grpc_slice_buffer outgoing_; - grpc_closure schedule_read_closure_; - grpc_closure schedule_write_closure_; grpc_closure on_handshake_data_sent_to_peer_; grpc_closure on_handshake_data_received_from_peer_; grpc_closure on_peer_checked_; @@ -122,17 +122,6 @@ SecurityHandshaker::SecurityHandshaker(tsi_handshaker* handshaker, } gpr_mu_init(&mu_); grpc_slice_buffer_init(&outgoing_); - GRPC_CLOSURE_INIT(&schedule_read_closure_, &SecurityHandshaker::ScheduleRead, - this, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&schedule_write_closure_, - &SecurityHandshaker::ScheduleWrite, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_handshake_data_sent_to_peer_, - &SecurityHandshaker::OnHandshakeDataSentToPeerFn, this, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&on_handshake_data_received_from_peer_, - &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFn, - this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_peer_checked_, &SecurityHandshaker::OnPeerCheckedFn, this, grpc_schedule_on_exec_ctx); } @@ -293,19 +282,6 @@ grpc_error* SecurityHandshaker::CheckPeerLocked() { return GRPC_ERROR_NONE; } -void SecurityHandshaker::ScheduleRead(void* arg, grpc_error* /* error */) { - SecurityHandshaker* h = static_cast(arg); - grpc_endpoint_read(h->args_->endpoint, h->args_->read_buffer, - &h->on_handshake_data_received_from_peer_, - /*urgent=*/true); -} - -void SecurityHandshaker::ScheduleWrite(void* arg, grpc_error* /* error */) { - SecurityHandshaker* h = static_cast(arg); - grpc_endpoint_write(h->args_->endpoint, &h->outgoing_, - &h->on_handshake_data_sent_to_peer_, nullptr); -} - grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( tsi_result result, const unsigned char* bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result* handshaker_result) { @@ -317,7 +293,13 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( // Read more if we need to. if (result == TSI_INCOMPLETE_DATA) { GPR_ASSERT(bytes_to_send_size == 0); - ExecCtx::Run(DEBUG_LOCATION, &schedule_read_closure_, GRPC_ERROR_NONE); + grpc_endpoint_read( + args_->endpoint, args_->read_buffer, + GRPC_CLOSURE_INIT( + &on_handshake_data_received_from_peer_, + &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, + this, grpc_schedule_on_exec_ctx), + /*urgent=*/true); return error; } if (result != TSI_OK) { @@ -335,10 +317,22 @@ grpc_error* SecurityHandshaker::OnHandshakeNextDoneLocked( reinterpret_cast(bytes_to_send), bytes_to_send_size); grpc_slice_buffer_reset_and_unref_internal(&outgoing_); grpc_slice_buffer_add(&outgoing_, to_send); - ExecCtx::Run(DEBUG_LOCATION, &schedule_write_closure_, GRPC_ERROR_NONE); + grpc_endpoint_write( + args_->endpoint, &outgoing_, + GRPC_CLOSURE_INIT( + &on_handshake_data_sent_to_peer_, + &SecurityHandshaker::OnHandshakeDataSentToPeerFnScheduler, this, + grpc_schedule_on_exec_ctx), + nullptr); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. - ExecCtx::Run(DEBUG_LOCATION, &schedule_read_closure_, GRPC_ERROR_NONE); + grpc_endpoint_read( + args_->endpoint, args_->read_buffer, + GRPC_CLOSURE_INIT( + &on_handshake_data_received_from_peer_, + &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, + this, grpc_schedule_on_exec_ctx), + /*urgent=*/true); } else { // Handshake has finished, check peer and so on. error = CheckPeerLocked(); @@ -381,6 +375,19 @@ grpc_error* SecurityHandshaker::DoHandshakerNextLocked( hs_result); } +// This callback might be run inline while we are still holding on to the mutex, +// so schedule OnHandshakeDataReceivedFromPeerFn on ExecCtx to avoid a deadlock. +void SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler( + void* arg, grpc_error* error) { + SecurityHandshaker* h = static_cast(arg); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_INIT(&h->on_handshake_data_received_from_peer_, + &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFn, + h, grpc_schedule_on_exec_ctx), + GRPC_ERROR_REF(error)); +} + void SecurityHandshaker::OnHandshakeDataReceivedFromPeerFn(void* arg, grpc_error* error) { RefCountedPtr h(static_cast(arg)); @@ -402,6 +409,19 @@ void SecurityHandshaker::OnHandshakeDataReceivedFromPeerFn(void* arg, } } +// This callback might be run inline while we are still holding on to the mutex, +// so schedule OnHandshakeDataSentToPeerFn on ExecCtx to avoid a deadlock. +void SecurityHandshaker::OnHandshakeDataSentToPeerFnScheduler( + void* arg, grpc_error* error) { + SecurityHandshaker* h = static_cast(arg); + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, + GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer_, + &SecurityHandshaker::OnHandshakeDataSentToPeerFn, h, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_REF(error)); +} + void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg, grpc_error* error) { RefCountedPtr h(static_cast(arg)); @@ -413,7 +433,13 @@ void SecurityHandshaker::OnHandshakeDataSentToPeerFn(void* arg, } // We may be done. if (h->handshaker_result_ == nullptr) { - ExecCtx::Run(DEBUG_LOCATION, &h->schedule_read_closure_, GRPC_ERROR_NONE); + grpc_endpoint_read( + h->args_->endpoint, h->args_->read_buffer, + GRPC_CLOSURE_INIT( + &h->on_handshake_data_received_from_peer_, + &SecurityHandshaker::OnHandshakeDataReceivedFromPeerFnScheduler, + h.get(), grpc_schedule_on_exec_ctx), + /*urgent=*/true); } else { error = h->CheckPeerLocked(); if (error != GRPC_ERROR_NONE) {