Convert some GRPC_CLOSURE_SCHED to GRPC_CLOSURE_RUN

pull/20748/head
Yash Tibrewal 5 years ago
parent f20b64ea38
commit db577b3360
  1. 60
      src/core/ext/filters/client_channel/http_connect_handshaker.cc
  2. 18
      src/core/lib/iomgr/tcp_posix.cc
  3. 10
      src/core/lib/surface/call.cc

@ -58,7 +58,7 @@ class HttpConnectHandshaker : public Handshaker {
static void OnWriteDone(void* arg, grpc_error* error);
static void OnReadDone(void* arg, grpc_error* error);
gpr_mu mu_;
Mutex mu_;
bool is_shutdown_ = false;
// Endpoint and read buffer to destroy after a shutdown.
@ -78,7 +78,6 @@ class HttpConnectHandshaker : public Handshaker {
};
HttpConnectHandshaker::~HttpConnectHandshaker() {
gpr_mu_destroy(&mu_);
if (endpoint_to_destroy_ != nullptr) {
grpc_endpoint_destroy(endpoint_to_destroy_);
}
@ -131,20 +130,21 @@ void HttpConnectHandshaker::HandshakeFailedLocked(grpc_error* error) {
// Callback invoked when finished writing HTTP CONNECT request.
void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
gpr_mu_lock(&handshaker->mu_);
ReleasableMutexLock lock(&handshaker->mu_);
if (error != GRPC_ERROR_NONE || handshaker->is_shutdown_) {
// If the write failed or we're shutting down, clean up and invoke the
// callback with the error.
handshaker->HandshakeFailedLocked(GRPC_ERROR_REF(error));
gpr_mu_unlock(&handshaker->mu_);
lock.Unlock();
handshaker->Unref();
} else {
// Otherwise, read the response.
// The read callback inherits our ref to the handshaker.
grpc_endpoint_read(handshaker->args_->endpoint,
handshaker->args_->read_buffer,
&handshaker->response_read_closure_, /*urgent=*/true);
gpr_mu_unlock(&handshaker->mu_);
grpc_endpoint* ep = handshaker->args_->endpoint;
grpc_slice_buffer* read_buffer = handshaker->args_->read_buffer;
grpc_closure* closure = &handshaker->response_read_closure_;
lock.Unlock();
grpc_endpoint_read(ep, read_buffer, closure, /*urgent=*/true);
}
}
@ -152,7 +152,7 @@ void HttpConnectHandshaker::OnWriteDone(void* arg, grpc_error* error) {
void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
auto* handshaker = static_cast<HttpConnectHandshaker*>(arg);
gpr_mu_lock(&handshaker->mu_);
ReleasableMutexLock lock(&handshaker->mu_);
if (error != GRPC_ERROR_NONE || handshaker->is_shutdown_) {
// If the read failed or we're shutting down, clean up and invoke the
// callback with the error.
@ -204,10 +204,11 @@ void HttpConnectHandshaker::OnReadDone(void* arg, grpc_error* error) {
// at the Content-Length: header).
if (handshaker->http_parser_.state != GRPC_HTTP_BODY) {
grpc_slice_buffer_reset_and_unref_internal(handshaker->args_->read_buffer);
grpc_endpoint_read(handshaker->args_->endpoint,
handshaker->args_->read_buffer,
&handshaker->response_read_closure_, /*urgent=*/true);
gpr_mu_unlock(&handshaker->mu_);
grpc_endpoint* ep = handshaker->args_->endpoint;
grpc_slice_buffer* read_buffer = handshaker->args_->read_buffer;
grpc_closure* closure = &handshaker->response_read_closure_;
lock.Unlock();
grpc_endpoint_read(ep, read_buffer, closure, /*urgent=*/true);
return;
}
// Make sure we got a 2xx response.
@ -227,7 +228,7 @@ done:
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
handshaker->is_shutdown_ = true;
gpr_mu_unlock(&handshaker->mu_);
lock.Unlock();
handshaker->Unref();
}
@ -236,13 +237,14 @@ done:
//
void HttpConnectHandshaker::Shutdown(grpc_error* why) {
gpr_mu_lock(&mu_);
if (!is_shutdown_) {
is_shutdown_ = true;
grpc_endpoint_shutdown(args_->endpoint, GRPC_ERROR_REF(why));
CleanupArgsForFailureLocked();
{
MutexLock lock(&mu_);
if (!is_shutdown_) {
is_shutdown_ = true;
grpc_endpoint_shutdown(args_->endpoint, GRPC_ERROR_REF(why));
CleanupArgsForFailureLocked();
}
}
gpr_mu_unlock(&mu_);
GRPC_ERROR_UNREF(why);
}
@ -257,9 +259,10 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* acceptor,
if (server_name == nullptr) {
// Set shutdown to true so that subsequent calls to
// http_connect_handshaker_shutdown() do nothing.
gpr_mu_lock(&mu_);
is_shutdown_ = true;
gpr_mu_unlock(&mu_);
{
MutexLock lock(&mu_);
is_shutdown_ = true;
}
GRPC_CLOSURE_SCHED(on_handshake_done, GRPC_ERROR_NONE);
return;
}
@ -290,7 +293,7 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* acceptor,
}
}
// Save state in the handshaker object.
MutexLock lock(&mu_);
ReleasableMutexLock lock(&mu_);
args_ = args;
on_handshake_done_ = on_handshake_done;
// Log connection via proxy.
@ -320,12 +323,15 @@ void HttpConnectHandshaker::DoHandshake(grpc_tcp_server_acceptor* acceptor,
gpr_free(header_strings);
// Take a new ref to be held by the write callback.
Ref().release();
grpc_endpoint_write(args->endpoint, &write_buffer_, &request_done_closure_,
nullptr);
grpc_endpoint* ep = args->endpoint;
grpc_slice_buffer* write_buffer = &write_buffer_;
grpc_closure* closure = &request_done_closure_;
lock.Unlock();
grpc_endpoint_write(ep, write_buffer, closure, nullptr);
}
HttpConnectHandshaker::HttpConnectHandshaker() {
gpr_mu_init(&mu_);
grpc_slice_buffer_init(&write_buffer_);
GRPC_CLOSURE_INIT(&request_done_closure_, &HttpConnectHandshaker::OnWriteDone,
this, grpc_schedule_on_exec_ctx);

@ -415,7 +415,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
GRPC_CLOSURE_SCHED(cb, error);
GRPC_CLOSURE_RUN(cb, error);
}
#define MAX_READ_IOVEC 4
@ -643,7 +643,7 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
* right thing (i.e calls tcp_do_read() which either reads the available
* bytes or calls notify_on_read() to be notified when new bytes become
* available */
GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE);
GRPC_CLOSURE_RUN(&tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
@ -1023,7 +1023,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_REF(error));
GRPC_CLOSURE_RUN(cb, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "write");
return;
}
@ -1072,11 +1072,11 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
tcp->outgoing_buffer_arg = arg;
if (buf->length == 0) {
GRPC_CLOSURE_SCHED(
cb, grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
GRPC_CLOSURE_RUN(cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
: GRPC_ERROR_NONE);
tcp_shutdown_buffer_list(tcp);
return;
}
@ -1098,7 +1098,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
GRPC_CLOSURE_SCHED(cb, error);
GRPC_CLOSURE_RUN(cb, error);
}
}

@ -1223,12 +1223,8 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs error */
bctl->call = nullptr;
/* This closure may be meant to be run within some combiner. Since we aren't
* running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
* of GRPC_CLOSURE_RUN.
*/
GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs error */
@ -1573,7 +1569,7 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
GRPC_CLOSURE_RUN((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;

Loading…
Cancel
Save