diff --git a/src/core/ext/filters/client_channel/xds/xds_client.cc b/src/core/ext/filters/client_channel/xds/xds_client.cc index f7712ce1cf1..f3692235040 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.cc +++ b/src/core/ext/filters/client_channel/xds/xds_client.cc @@ -515,7 +515,7 @@ template void XdsClient::ChannelState::RetryableCall::OnRetryTimer( void* arg, grpc_error* error) { RetryableCall* calld = static_cast(arg); - calld->chand_->xds_client()->combiner()->Run( + calld->chand_->xds_client()->combiner_->Run( GRPC_CLOSURE_INIT(&calld->on_retry_timer_, OnRetryTimerLocked, calld, nullptr), GRPC_ERROR_REF(error)); @@ -628,7 +628,6 @@ XdsClient::ChannelState::AdsCallState::AdsCallState( // This callback signals the end of the call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. - GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, grpc_schedule_on_exec_ctx); call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops), @@ -875,7 +874,7 @@ void XdsClient::ChannelState::LrsCallState::Reporter:: void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer( void* arg, grpc_error* error) { Reporter* self = static_cast(arg); - self->xds_client()->combiner()->Run( + self->xds_client()->combiner_->Run( GRPC_CLOSURE_INIT(&self->on_next_report_timer_, OnNextReportTimerLocked, self, nullptr), GRPC_ERROR_REF(error)); diff --git a/src/core/ext/filters/client_channel/xds/xds_client.h b/src/core/ext/filters/client_channel/xds/xds_client.h index 40bc2992036..1eac57a8527 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client.h +++ b/src/core/ext/filters/client_channel/xds/xds_client.h @@ -108,8 +108,6 @@ class XdsClient : public InternallyRefCounted { static RefCountedPtr GetFromChannelArgs( const grpc_channel_args& args); - Combiner* combiner() { return combiner_; } - private: class ChannelState; diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 7716cb11b59..53e52c1bbba 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -28,14 +28,17 @@ #include "src/core/lib/iomgr/exec_ctx.h" namespace grpc_core { +// TODO(yashkt) : Remove this class and replace it with a class that does not +// use ExecCtx class Combiner { public: void Run(grpc_closure* closure, grpc_error* error); + // TODO(yashkt) : Remove this method void FinallyRun(grpc_closure* closure, grpc_error* error); Combiner* next_combiner_on_this_exec_ctx = nullptr; grpc_closure_scheduler scheduler; grpc_closure_scheduler finally_scheduler; - grpc_core::MultiProducerSingleConsumerQueue queue; + MultiProducerSingleConsumerQueue queue; // either: // a pointer to the initiating exec ctx if that is the only exec_ctx that has // ever queued to this combiner, or NULL. If this is non-null, it's not diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 41a6ed30477..c553ba240ca 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -58,7 +58,7 @@ const char* ConnectivityStateName(grpc_connectivity_state state) { class AsyncConnectivityStateWatcherInterface::Notifier { public: Notifier(RefCountedPtr watcher, - grpc_connectivity_state state, grpc_core::Combiner* combiner) + grpc_connectivity_state state, Combiner* combiner) : watcher_(std::move(watcher)), state_(state) { if (combiner != nullptr) { combiner->Run( diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index 9f7ae21418d..68120e142df 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -68,6 +68,8 @@ class AsyncConnectivityStateWatcherInterface protected: class Notifier; + // If \a combiner is nullptr, then the notification will be scheduled on the + // ExecCtx. explicit AsyncConnectivityStateWatcherInterface(Combiner* combiner = nullptr) : combiner_(combiner) {} diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index ed57a71bf19..98fc8077bdd 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -89,14 +89,6 @@ typedef struct proxy_connection { grpc_pollset_set* pollset_set; - grpc_closure on_read_request_done_hopper; - grpc_closure on_server_connect_done_hopper; - grpc_closure on_write_response_done_hopper; - grpc_closure on_client_read_done_hopper; - grpc_closure on_client_write_done_hopper; - grpc_closure on_server_read_done_hopper; - grpc_closure on_server_write_done_hopper; - // NOTE: All the closures execute under proxy->combiner lock. Which means // there will not be any data-races between the closures grpc_closure on_read_request_done; @@ -202,6 +194,8 @@ static void proxy_connection_failed(proxy_connection* conn, static void on_client_write_done_hopper(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done_locked, + conn, nullptr); conn->proxy->combiner->Run(&conn->on_client_write_done, GRPC_ERROR_REF(error)); } @@ -223,22 +217,26 @@ static void on_client_write_done(void* arg, grpc_error* error) { grpc_slice_buffer_move_into(&conn->client_deferred_write_buffer, &conn->client_write_buffer); conn->client_is_writing = true; + GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done_hopper, nullptr); + &conn->on_client_write_done, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "write_done"); } } -static void on_server_write_done_hopper(void* arg, grpc_error* error) { +static void on_server_write_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done_locked, + conn, nullptr); conn->proxy->combiner->Run(&conn->on_server_write_done, GRPC_ERROR_REF(error)); } // Callback for writing proxy data to the backend server. -static void on_server_write_done(void* arg, grpc_error* error) { +static void on_server_write_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { @@ -254,22 +252,26 @@ static void on_server_write_done(void* arg, grpc_error* error) { grpc_slice_buffer_move_into(&conn->server_deferred_write_buffer, &conn->server_write_buffer); conn->server_is_writing = true; + GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done_hopper, nullptr); + &conn->on_server_write_done, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "server_write"); } } -static void on_client_read_done_hopper(void* arg, grpc_error* error) { +static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done_locked, + conn, nullptr); conn->proxy->combiner->Run(&conn->on_client_read_done, GRPC_ERROR_REF(error)); } // Callback for reading data from the client, which will be proxied to // the backend server. -static void on_client_read_done(void* arg, grpc_error* error) { +static void on_client_read_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); if (error != GRPC_ERROR_NONE) { proxy_connection_failed(conn, CLIENT_READ_FAILED, "HTTP proxy client read", @@ -290,22 +292,28 @@ static void on_client_read_done(void* arg, grpc_error* error) { &conn->server_write_buffer); proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; + GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done_hopper, nullptr); + &conn->on_server_write_done, nullptr); } // Read more data. + GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done_hopper, /*urgent=*/false); + &conn->on_client_read_done, /*urgent=*/false); } -static void on_server_read_done_hopper(void* arg, grpc_error* error) { +static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done_locked, + conn, nullptr); conn->proxy->combiner->Run(&conn->on_server_read_done, GRPC_ERROR_REF(error)); } // Callback for reading data from the backend server, which will be // proxied to the client. -static void on_server_read_done(void* arg, grpc_error* error) { +static void on_server_read_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); if (error != GRPC_ERROR_NONE) { proxy_connection_failed(conn, SERVER_READ_FAILED, "HTTP proxy server read", @@ -326,22 +334,28 @@ static void on_server_read_done(void* arg, grpc_error* error) { &conn->client_write_buffer); proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; + GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done_hopper, nullptr); + &conn->on_client_write_done, nullptr); } // Read more data. + GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done_hopper, /*urgent=*/false); + &conn->on_server_read_done, /*urgent=*/false); } -static void on_write_response_done_hopper(void* arg, grpc_error* error) { +static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_write_response_done, + on_write_response_done_locked, conn, nullptr); conn->proxy->combiner->Run(&conn->on_write_response_done, GRPC_ERROR_REF(error)); } // Callback to write the HTTP response for the CONNECT request. -static void on_write_response_done(void* arg, grpc_error* error) { +static void on_write_response_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { @@ -357,21 +371,27 @@ static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "client_read"); proxy_connection_ref(conn, "server_read"); proxy_connection_unref(conn, "write_response"); + GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done_hopper, /*urgent=*/false); + &conn->on_client_read_done, /*urgent=*/false); + GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done_hopper, /*urgent=*/false); + &conn->on_server_read_done, /*urgent=*/false); } -static void on_server_connect_done_hopper(void* arg, grpc_error* error) { +static void on_server_connect_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_server_connect_done, + on_server_connect_done_locked, conn, nullptr); conn->proxy->combiner->Run(&conn->on_server_connect_done, GRPC_ERROR_REF(error)); } // Callback to connect to the backend server specified by the HTTP // CONNECT request. -static void on_server_connect_done(void* arg, grpc_error* error) { +static void on_server_connect_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); if (error != GRPC_ERROR_NONE) { // TODO(roth): Technically, in this case, we should handle the error @@ -390,8 +410,10 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_from_copied_string("HTTP/1.0 200 connected\r\n\r\n"); grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; + GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_write_response_done_hopper, nullptr); + &conn->on_write_response_done, nullptr); } /** @@ -414,8 +436,10 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val, return header_matches; } -static void on_read_request_done_hopper(void* arg, grpc_error* error) { +static void on_read_request_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); + GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done_locked, + conn, nullptr); conn->proxy->combiner->Run(&conn->on_read_request_done, GRPC_ERROR_REF(error)); } @@ -425,7 +449,7 @@ static void on_read_request_done_hopper(void* arg, grpc_error* error) { // the client indicating that the request failed. However, for the purposes // of this test code, it's fine to pretend this is a client-side error, // which will cause the client connection to be dropped. -static void on_read_request_done(void* arg, grpc_error* error) { +static void on_read_request_done_locked(void* arg, grpc_error* error) { proxy_connection* conn = static_cast(arg); gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); @@ -450,8 +474,10 @@ static void on_read_request_done(void* arg, grpc_error* error) { grpc_slice_buffer_reset_and_unref(&conn->client_read_buffer); // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { + GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done_hopper, /*urgent=*/false); + &conn->on_read_request_done, /*urgent=*/false); return; } // Make sure we got a CONNECT request. @@ -503,8 +529,10 @@ static void on_read_request_done(void* arg, grpc_error* error) { // The connection callback inherits our reference to conn. const grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC; - grpc_tcp_client_connect(&conn->on_server_connect_done_hopper, - &conn->server_endpoint, conn->pollset_set, nullptr, + GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, + grpc_schedule_on_exec_ctx); + grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint, + conn->pollset_set, nullptr, &resolved_addresses->addrs[0], deadline); grpc_resolved_addresses_destroy(resolved_addresses); } @@ -524,41 +552,6 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, conn->pollset_set = grpc_pollset_set_create(); grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset); grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set); - GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn, - nullptr); - GRPC_CLOSURE_INIT(&conn->on_read_request_done_hopper, - on_read_request_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_server_connect_done_hopper, - on_server_connect_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_write_response_done_hopper, - on_write_response_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_client_read_done_hopper, - on_client_read_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_client_write_done_hopper, - on_client_write_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_server_read_done_hopper, - on_server_read_done_hopper, conn, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&conn->on_server_write_done_hopper, - on_server_write_done_hopper, conn, - grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer); conn->client_is_writing = false; @@ -569,8 +562,10 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_slice_buffer_init(&conn->server_write_buffer); grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); + GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, + grpc_schedule_on_exec_ctx); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done_hopper, /*urgent=*/false); + &conn->on_read_request_done, /*urgent=*/false); } //