From b338d84aec0052f34ca1c95c500822f78301afa3 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Tue, 14 Jan 2020 18:48:18 -0800 Subject: [PATCH] Reviewer comments --- .../ext/filters/client_channel/lb_policy.cc | 2 +- .../ext/filters/client_channel/lb_policy.h | 2 +- .../client_channel/lb_policy/grpclb/grpclb.cc | 4 +- .../resolver/dns/c_ares/dns_resolver_ares.cc | 107 +++++++------- .../dns/c_ares/grpc_ares_ev_driver.cc | 48 +++--- .../dns/c_ares/grpc_ares_ev_driver_windows.cc | 73 ++++------ .../resolver/dns/c_ares/grpc_ares_wrapper.cc | 15 +- .../resolver/dns/native/dns_resolver.cc | 84 +++++------ .../resolver/fake/fake_resolver.cc | 137 ++++++++++++------ .../resolver/fake/fake_resolver.h | 60 +------- .../ext/filters/client_channel/xds/xds_api.h | 4 +- .../client_channel/xds/xds_client_stats.h | 4 +- src/core/lib/transport/connectivity_state.cc | 8 +- src/core/lib/transport/connectivity_state.h | 6 +- 14 files changed, 251 insertions(+), 303 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy.cc b/src/core/ext/filters/client_channel/lb_policy.cc index 748dc1e80af..6c75842fa8b 100644 --- a/src/core/ext/filters/client_channel/lb_policy.cc +++ b/src/core/ext/filters/client_channel/lb_policy.cc @@ -98,7 +98,7 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick( // the time this function returns, the pick will already have // been processed, and we'll be trying to re-process the same // pick again, leading to a crash. - // 2. We are currently running in the data plane logical_thread, but we + // 2. We are currently running in the data plane mutex, but we // need to bounce into the control plane logical_thread to call // ExitIdleLocked(). if (!exit_idle_called_) { diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index fa6dcb7ffee..3779a17c5d5 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -242,7 +242,7 @@ class LoadBalancingPolicy : public InternallyRefCounted { /// live in the LB policy object itself. /// /// Currently, pickers are always accessed from within the - /// client_channel data plane logical_thread, so they do not have to be + /// client_channel data plane mutex, so they do not have to be /// thread-safe. class SubchannelPicker { public: diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 06144d8cfd9..9ceaaf66d47 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -246,7 +246,7 @@ class GrpcLb : public LoadBalancingPolicy { // should not be dropped. // // Note: This is called from the picker, so it will be invoked in - // the channel's data plane logical_thread, NOT the control plane + // the channel's data plane mutex, NOT the control plane // logical_thread. It should not be accessed by any other part of the LB // policy. const char* ShouldDrop(); @@ -254,7 +254,7 @@ class GrpcLb : public LoadBalancingPolicy { private: std::vector serverlist_; - // Guarded by the channel's data plane logical_thread, NOT the control + // Guarded by the channel's data plane mutex, NOT the control // plane logical_thread. It should not be accessed by anything but the // picker via the ShouldDrop() method. size_t drop_index_ = 0; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc index b0a3e948420..f3ed367931b 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc @@ -78,8 +78,8 @@ class AresDnsResolver : public Resolver { static void OnNextResolution(void* arg, grpc_error* error); static void OnResolved(void* arg, grpc_error* error); - static void OnNextResolutionLocked(void* arg, grpc_error* error); - static void OnResolvedLocked(void* arg, grpc_error* error); + void OnNextResolutionLocked(grpc_error* error); + void OnResolvedLocked(grpc_error* error); /// DNS server to use (if not system default) char* dns_server_; @@ -128,6 +128,10 @@ AresDnsResolver::AresDnsResolver(ResolverArgs args) .set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER) .set_jitter(GRPC_DNS_RECONNECT_JITTER) .set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) { + // Closure Initialization + GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx); // Get name to resolve from URI path. const char* path = args.uri->path; if (path[0] == '/') ++path; @@ -201,28 +205,26 @@ void AresDnsResolver::ShutdownLocked() { void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast(arg); - r->logical_thread()->Run( - Closure::ToFunction(GRPC_CLOSURE_INIT(&r->on_next_resolution_, - OnNextResolutionLocked, r, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + GRPC_ERROR_REF(error); // ref owned by lambda + r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); }, + DEBUG_LOCATION); } -void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { - AresDnsResolver* r = static_cast(arg); +void AresDnsResolver::OnNextResolutionLocked(grpc_error* error) { GRPC_CARES_TRACE_LOG( "resolver:%p re-resolution timer fired. error: %s. shutdown_initiated_: " "%d", - r, grpc_error_string(error), r->shutdown_initiated_); - r->have_next_resolution_timer_ = false; - if (error == GRPC_ERROR_NONE && !r->shutdown_initiated_) { - if (!r->resolving_) { + this, grpc_error_string(error), shutdown_initiated_); + have_next_resolution_timer_ = false; + if (error == GRPC_ERROR_NONE && !shutdown_initiated_) { + if (!resolving_) { GRPC_CARES_TRACE_LOG( - "resolver:%p start resolving due to re-resolution timer", r); - r->StartResolvingLocked(); + "resolver:%p start resolving due to re-resolution timer", this); + StartResolvingLocked(); } } - r->Unref(DEBUG_LOCATION, "next_resolution_timer"); + Unref(DEBUG_LOCATION, "next_resolution_timer"); + GRPC_ERROR_UNREF(error); } bool ValueInJsonArray(grpc_json* array, const char* value) { @@ -327,75 +329,71 @@ char* ChooseServiceConfig(char* service_config_choice_json, void AresDnsResolver::OnResolved(void* arg, grpc_error* error) { AresDnsResolver* r = static_cast(arg); - r->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&r->on_resolved_, OnResolvedLocked, r, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + GRPC_ERROR_REF(error); // ref owned by lambda + r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); }, + DEBUG_LOCATION); } -void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { - AresDnsResolver* r = static_cast(arg); - GPR_ASSERT(r->resolving_); - r->resolving_ = false; - gpr_free(r->pending_request_); - r->pending_request_ = nullptr; - if (r->shutdown_initiated_) { - r->Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown"); +void AresDnsResolver::OnResolvedLocked(grpc_error* error) { + GPR_ASSERT(resolving_); + resolving_ = false; + gpr_free(pending_request_); + pending_request_ = nullptr; + if (shutdown_initiated_) { + Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown"); + GRPC_ERROR_UNREF(error); return; } - if (r->addresses_ != nullptr) { + if (addresses_ != nullptr) { Result result; - result.addresses = std::move(*r->addresses_); - if (r->service_config_json_ != nullptr) { + result.addresses = std::move(*addresses_); + if (service_config_json_ != nullptr) { char* service_config_string = ChooseServiceConfig( - r->service_config_json_, &result.service_config_error); - gpr_free(r->service_config_json_); + service_config_json_, &result.service_config_error); + gpr_free(service_config_json_); if (result.service_config_error == GRPC_ERROR_NONE && service_config_string != nullptr) { GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s", - r, service_config_string); + this, service_config_string); result.service_config = ServiceConfig::Create( service_config_string, &result.service_config_error); } gpr_free(service_config_string); } - result.args = grpc_channel_args_copy(r->channel_args_); - r->result_handler()->ReturnResult(std::move(result)); - r->addresses_.reset(); + result.args = grpc_channel_args_copy(channel_args_); + result_handler()->ReturnResult(std::move(result)); + addresses_.reset(); // Reset backoff state so that we start from the beginning when the // next request gets triggered. - r->backoff_.Reset(); + backoff_.Reset(); } else { - GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r, + GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this, grpc_error_string(error)); - r->result_handler()->ReturnError(grpc_error_set_int( + result_handler()->ReturnError(grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "DNS resolution failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); // Set retry timer. - grpc_millis next_try = r->backoff_.NextAttemptTime(); + grpc_millis next_try = backoff_.NextAttemptTime(); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed (will retry): %s", - r, grpc_error_string(error)); - GPR_ASSERT(!r->have_next_resolution_timer_); - r->have_next_resolution_timer_ = true; + this, grpc_error_string(error)); + GPR_ASSERT(!have_next_resolution_timer_); + have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - r->Ref(DEBUG_LOCATION, "retry-timer").release(); + Ref(DEBUG_LOCATION, "retry-timer").release(); if (timeout > 0) { GRPC_CARES_TRACE_LOG("resolver:%p retrying in %" PRId64 " milliseconds", - r, timeout); + this, timeout); } else { - GRPC_CARES_TRACE_LOG("resolver:%p retrying immediately", r); + GRPC_CARES_TRACE_LOG("resolver:%p retrying immediately", this); } - GRPC_CLOSURE_INIT(&r->on_next_resolution_, OnNextResolution, r, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&r->next_resolution_timer_, next_try, - &r->on_next_resolution_); + grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_); } - r->Unref(DEBUG_LOCATION, "dns-resolving"); + Unref(DEBUG_LOCATION, "dns-resolving"); + GRPC_ERROR_UNREF(error); } void AresDnsResolver::MaybeStartResolvingLocked() { @@ -419,8 +417,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() { // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release(); - GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this, - grpc_schedule_on_exec_ctx); grpc_timer_init(&next_resolution_timer_, ExecCtx::Get()->Now() + ms_until_next_resolution, &on_next_resolution_); @@ -438,7 +434,6 @@ void AresDnsResolver::StartResolvingLocked() { GPR_ASSERT(!resolving_); resolving_ = true; service_config_json_ = nullptr; - GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx); pending_request_ = grpc_dns_lookup_ares_locked( dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, &on_resolved_, &addresses_, enable_srv_queries_ /* check_grpclb */, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc index aa9ab36644a..57d66934702 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc @@ -131,10 +131,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) { } static void on_timeout(void* arg, grpc_error* error); -static void on_timeout_locked(void* arg, grpc_error* error); +static void on_timeout_locked(grpc_ares_ev_driver* arg, grpc_error* error); static void on_ares_backup_poll_alarm(void* arg, grpc_error* error); -static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error); +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* arg, + grpc_error* error); static void noop_inject_channel_config(ares_channel /*channel*/) {} @@ -232,16 +233,12 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms( static void on_timeout(void* arg, grpc_error* error) { grpc_ares_ev_driver* driver = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda driver->logical_thread->Run( - grpc_core::Closure::ToFunction( - GRPC_CLOSURE_INIT(&driver->on_timeout_locked, on_timeout_locked, - driver, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION); } -static void on_timeout_locked(void* arg, grpc_error* error) { - grpc_ares_ev_driver* driver = static_cast(arg); +static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_error* error) { GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. " "err=%s", @@ -250,15 +247,14 @@ static void on_timeout_locked(void* arg, grpc_error* error) { grpc_ares_ev_driver_shutdown_locked(driver); } grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); } static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) { grpc_ares_ev_driver* driver = static_cast(arg); + GRPC_ERROR_REF(error); driver->logical_thread->Run( - grpc_core::Closure::ToFunction( - GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked, - on_ares_backup_poll_alarm_locked, driver, nullptr), - GRPC_ERROR_REF(error)), + [driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); }, DEBUG_LOCATION); } @@ -270,8 +266,8 @@ static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) { * b) when some time has passed without fd events having happened * For the latter, we use this backup poller. Also see * https://github.com/grpc/grpc/pull/17688 description for more details. */ -static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) { - grpc_ares_ev_driver* driver = static_cast(arg); +static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver, + grpc_error* error) { GRPC_CARES_TRACE_LOG( "request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. " "driver->shutting_down=%d. " @@ -304,10 +300,10 @@ static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) { grpc_ares_notify_on_event_locked(driver); } grpc_ares_ev_driver_unref(driver); + GRPC_ERROR_UNREF(error); } -static void on_readable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); +static void on_readable_locked(fd_node* fdn, grpc_error* error) { GPR_ASSERT(fdn->readable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -329,20 +325,16 @@ static void on_readable_locked(void* arg, grpc_error* error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); } static void on_readable(void* arg, grpc_error* error) { fd_node* fdn = static_cast(arg); fdn->ev_driver->logical_thread->Run( - grpc_core::Closure::ToFunction( - GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, - nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION); } -static void on_writable_locked(void* arg, grpc_error* error) { - fd_node* fdn = static_cast(arg); +static void on_writable_locked(fd_node* fdn, grpc_error* error) { GPR_ASSERT(fdn->writable_registered); grpc_ares_ev_driver* ev_driver = fdn->ev_driver; const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked(); @@ -362,16 +354,14 @@ static void on_writable_locked(void* arg, grpc_error* error) { } grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_ev_driver_unref(ev_driver); + GRPC_ERROR_UNREF(error); } static void on_writable(void* arg, grpc_error* error) { fd_node* fdn = static_cast(arg); + GRPC_ERROR_REF(error); fdn->ev_driver->logical_thread->Run( - grpc_core::Closure::ToFunction( - GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, - nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + [fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION); } ares_channel* grpc_ares_ev_driver_get_channel_locked( diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc index eef2e47ce4e..ae0d75b78fb 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc @@ -108,6 +108,16 @@ class GrpcPolledFdWindows { socket_type_(socket_type), logical_thread_(std::move(logical_thread)) { gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as); + // Closure Initialization + GRPC_CLOSURE_INIT(&outer_read_closure_, + &GrpcPolledFdWindows::OnIocpReadable, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&outer_write_closure_, + &GrpcPolledFdWindows::OnIocpWriteable, this, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_tcp_connect_locked_, + &GrpcPolledFdWindows::OnTcpConnect, this, + grpc_schedule_on_exec_ctx); winsocket_ = grpc_winsocket_create(as, name_); } @@ -179,10 +189,7 @@ class GrpcPolledFdWindows { return; } } - grpc_socket_notify_on_read( - winsocket_, GRPC_CLOSURE_INIT(&outer_read_closure_, - &GrpcPolledFdWindows::OnIocpReadable, - this, grpc_schedule_on_exec_ctx)); + grpc_socket_notify_on_read(winsocket_, &outer_read_closure_); } void RegisterForOnWriteableLocked(grpc_closure* write_closure) { @@ -233,11 +240,7 @@ class GrpcPolledFdWindows { ScheduleAndNullWriteClosure( GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)")); } else { - grpc_socket_notify_on_write( - winsocket_, - GRPC_CLOSURE_INIT(&outer_write_closure_, - &GrpcPolledFdWindows::OnIocpWriteable, this, - grpc_schedule_on_exec_ctx)); + grpc_socket_notify_on_write(winsocket_, &outer_write_closure_); } break; case WRITE_PENDING: @@ -417,22 +420,15 @@ class GrpcPolledFdWindows { static void OnTcpConnect(void* arg, grpc_error* error) { GrpcPolledFdWindows* grpc_polled_fd = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda grpc_polled_fd->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&grpc_polled_fd->on_tcp_connect_locked_, - &GrpcPolledFdWindows::OnTcpConnectLocked, - grpc_polled_fd, nullptr), - GRPC_ERROR_REF(error)), + [grpc_polled_fd, error]() { + grpc_polled_fd->OnTcpConnectLocked(error); + }, DEBUG_LOCATION); } - static void OnTcpConnectLocked(void* arg, grpc_error* error) { - GrpcPolledFdWindows* grpc_polled_fd = - static_cast(arg); - grpc_polled_fd->InnerOnTcpConnectLocked(error); - } - - void InnerOnTcpConnectLocked(grpc_error* error) { + void OnTcpConnectLocked(grpc_error* error) { GRPC_CARES_TRACE_LOG( "fd:%s InnerOnTcpConnectLocked error:|%s| " "pending_register_for_readable:%d" @@ -473,6 +469,7 @@ class GrpcPolledFdWindows { logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); }, DEBUG_LOCATION); } + GRPC_ERROR_UNREF(error); } int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target, @@ -572,27 +569,15 @@ class GrpcPolledFdWindows { return -1; } } - GRPC_CLOSURE_INIT(&on_tcp_connect_locked_, - &GrpcPolledFdWindows::OnTcpConnect, this, - grpc_schedule_on_exec_ctx); grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_); return out; } static void OnIocpReadable(void* arg, grpc_error* error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); + GRPC_ERROR_REF(error); // ref owned by lambda polled_fd->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&polled_fd->outer_read_closure_, - &GrpcPolledFdWindows::OnIocpReadableLocked, - polled_fd, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); - } - - static void OnIocpReadableLocked(void* arg, grpc_error* error) { - GrpcPolledFdWindows* polled_fd = static_cast(arg); - polled_fd->OnIocpReadableInner(error); + [polled_fd, error]() { OnIocpReadableLocked(error); }, DEBUG_LOCATION); } // TODO(apolcyn): improve this error handling to be less conversative. @@ -600,7 +585,7 @@ class GrpcPolledFdWindows { // c-ares reads from this socket later, but it shouldn't necessarily cancel // the entire resolution attempt. Doing so will allow the "inject broken // nameserver list" test to pass on Windows. - void OnIocpReadableInner(grpc_error* error) { + void OnIocpReadableLocked(grpc_error* error) { if (error == GRPC_ERROR_NONE) { if (winsocket_->read_info.wsa_error != 0) { /* WSAEMSGSIZE would be due to receiving more data @@ -631,25 +616,18 @@ class GrpcPolledFdWindows { "fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(), GRPC_SLICE_LENGTH(read_buf_)); ScheduleAndNullReadClosure(error); + GRPC_ERROR_UNREF(error); } static void OnIocpWriteable(void* arg, grpc_error* error) { GrpcPolledFdWindows* polled_fd = static_cast(arg); + GRPC_ERROR_REF(error); // error owned by lambda polled_fd->logical_thread_->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&polled_fd->outer_write_closure_, - &GrpcPolledFdWindows::OnIocpWriteableLocked, - polled_fd, nullptr), - GRPC_ERROR_REF(error)), + [polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); }, DEBUG_LOCATION); } - static void OnIocpWriteableLocked(void* arg, grpc_error* error) { - GrpcPolledFdWindows* polled_fd = static_cast(arg); - polled_fd->OnIocpWriteableInner(error); - } - - void OnIocpWriteableInner(grpc_error* error) { + void OnIocpWriteableLocked(grpc_error* error) { GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName()); GPR_ASSERT(socket_type_ == SOCK_STREAM); if (error == GRPC_ERROR_NONE) { @@ -676,6 +654,7 @@ class GrpcPolledFdWindows { write_buf_ = grpc_empty_slice(); } ScheduleAndNullWriteClosure(error); + GRPC_ERROR_UNREF(error); } bool gotten_into_driver_list() const { return gotten_into_driver_list_; } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc index 36f63ba7537..a562e2a4d85 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc @@ -695,9 +695,8 @@ typedef struct grpc_resolve_address_ares_request { grpc_ares_request* ares_request = nullptr; } grpc_resolve_address_ares_request; -static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { - grpc_resolve_address_ares_request* r = - static_cast(arg); +static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r, + grpc_error* error) { gpr_free(r->ares_request); grpc_resolved_addresses** resolved_addresses = r->addrs_out; if (r->addresses == nullptr || r->addresses->empty()) { @@ -718,17 +717,15 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done, GRPC_ERROR_REF(error)); delete r; + GRPC_ERROR_UNREF(error); } static void on_dns_lookup_done(void* arg, grpc_error* error) { grpc_resolve_address_ares_request* r = static_cast(arg); - r->logical_thread->Run( - grpc_core::Closure::ToFunction( - GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, - on_dns_lookup_done_locked, r, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + GRPC_ERROR_REF(error); // ref owned by lambda + r->logical_thread->Run([r, error]() { on_dns_lookup_done_locked(r, error); }, + DEBUG_LOCATION); } static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) { diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc index c8fa897fc8e..94e3ffa80cb 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc @@ -67,9 +67,9 @@ class NativeDnsResolver : public Resolver { void StartResolvingLocked(); static void OnNextResolution(void* arg, grpc_error* error); - static void OnNextResolutionLocked(void* arg, grpc_error* error); + void OnNextResolutionLocked(grpc_error* error); static void OnResolved(void* arg, grpc_error* error); - static void OnResolvedLocked(void* arg, grpc_error* error); + void OnResolvedLocked(grpc_error* error); /// name to resolve char* name_to_resolve_ = nullptr; @@ -149,84 +149,76 @@ void NativeDnsResolver::ShutdownLocked() { void NativeDnsResolver::OnNextResolution(void* arg, grpc_error* error) { NativeDnsResolver* r = static_cast(arg); - r->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&r->on_next_resolution_, - NativeDnsResolver::OnNextResolutionLocked, r, - nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + GRPC_ERROR_REF(error); // ref owned by lambda + r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); }, + DEBUG_LOCATION); } -void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) { - NativeDnsResolver* r = static_cast(arg); - r->have_next_resolution_timer_ = false; - if (error == GRPC_ERROR_NONE && !r->resolving_) { - r->StartResolvingLocked(); +void NativeDnsResolver::OnNextResolutionLocked(grpc_error* error) { + have_next_resolution_timer_ = false; + if (error == GRPC_ERROR_NONE && !resolving_) { + StartResolvingLocked(); } - r->Unref(DEBUG_LOCATION, "retry-timer"); + Unref(DEBUG_LOCATION, "retry-timer"); + GRPC_ERROR_UNREF(error); } void NativeDnsResolver::OnResolved(void* arg, grpc_error* error) { NativeDnsResolver* r = static_cast(arg); - r->logical_thread()->Run( - Closure::ToFunction( - GRPC_CLOSURE_INIT(&r->on_resolved_, - NativeDnsResolver::OnResolvedLocked, r, nullptr), - GRPC_ERROR_REF(error)), - DEBUG_LOCATION); + GRPC_ERROR_REF(error); // owned by lambda + r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); }, + DEBUG_LOCATION); } -void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) { - NativeDnsResolver* r = static_cast(arg); - GPR_ASSERT(r->resolving_); - r->resolving_ = false; - if (r->shutdown_) { - r->Unref(DEBUG_LOCATION, "dns-resolving"); +void NativeDnsResolver::OnResolvedLocked(grpc_error* error) { + GPR_ASSERT(resolving_); + resolving_ = false; + if (shutdown_) { + Unref(DEBUG_LOCATION, "dns-resolving"); + GRPC_ERROR_UNREF(error); return; } - if (r->addresses_ != nullptr) { + if (addresses_ != nullptr) { Result result; - for (size_t i = 0; i < r->addresses_->naddrs; ++i) { - result.addresses.emplace_back(&r->addresses_->addrs[i].addr, - r->addresses_->addrs[i].len, + for (size_t i = 0; i < addresses_->naddrs; ++i) { + result.addresses.emplace_back(&addresses_->addrs[i].addr, + addresses_->addrs[i].len, nullptr /* args */); } - grpc_resolved_addresses_destroy(r->addresses_); - result.args = grpc_channel_args_copy(r->channel_args_); - r->result_handler()->ReturnResult(std::move(result)); + grpc_resolved_addresses_destroy(addresses_); + result.args = grpc_channel_args_copy(channel_args_); + result_handler()->ReturnResult(std::move(result)); // Reset backoff state so that we start from the beginning when the // next request gets triggered. - r->backoff_.Reset(); + backoff_.Reset(); } else { gpr_log(GPR_INFO, "dns resolution failed (will retry): %s", grpc_error_string(error)); // Return transient error. - r->result_handler()->ReturnError(grpc_error_set_int( + result_handler()->ReturnError(grpc_error_set_int( GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "DNS resolution failed", &error, 1), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); // Set up for retry. - grpc_millis next_try = r->backoff_.NextAttemptTime(); + grpc_millis next_try = backoff_.NextAttemptTime(); grpc_millis timeout = next_try - ExecCtx::Get()->Now(); - GPR_ASSERT(!r->have_next_resolution_timer_); - r->have_next_resolution_timer_ = true; + GPR_ASSERT(!have_next_resolution_timer_); + have_next_resolution_timer_ = true; // TODO(roth): We currently deal with this ref manually. Once the // new closure API is done, find a way to track this ref with the timer // callback as part of the type system. - r->Ref(DEBUG_LOCATION, "next_resolution_timer").release(); + Ref(DEBUG_LOCATION, "next_resolution_timer").release(); if (timeout > 0) { gpr_log(GPR_DEBUG, "retrying in %" PRId64 " milliseconds", timeout); } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - GRPC_CLOSURE_INIT(&r->on_next_resolution_, - NativeDnsResolver::OnNextResolution, r, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&r->next_resolution_timer_, next_try, - &r->on_next_resolution_); + GRPC_CLOSURE_INIT(&on_next_resolution_, NativeDnsResolver::OnNextResolution, + this, grpc_schedule_on_exec_ctx); + grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_); } - r->Unref(DEBUG_LOCATION, "dns-resolving"); + Unref(DEBUG_LOCATION, "dns-resolving"); + GRPC_ERROR_UNREF(error); } void NativeDnsResolver::MaybeStartResolvingLocked() { diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc index db6593b179b..14fc0304705 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc @@ -45,6 +45,47 @@ namespace grpc_core { +class FakeResolver : public Resolver { + public: + explicit FakeResolver(ResolverArgs args); + + void StartLocked() override; + + void RequestReresolutionLocked() override; + + private: + friend class FakeResolverResponseGenerator; + friend class FakeResolverResponseSetter; + + virtual ~FakeResolver(); + + void ShutdownLocked() override; + + void MaybeSendResultLocked(); + + void ReturnReresolutionResult(); + + // passed-in parameters + grpc_channel_args* channel_args_ = nullptr; + RefCountedPtr response_generator_; + // If has_next_result_ is true, next_result_ is the next resolution result + // to be returned. + bool has_next_result_ = false; + Result next_result_; + // Result to use for the pretended re-resolution in + // RequestReresolutionLocked(). + bool has_reresolution_result_ = false; + Result reresolution_result_; + // True after the call to StartLocked(). + bool started_ = false; + // True after the call to ShutdownLocked(). + bool shutdown_ = false; + // if true, return failure + bool return_failure_ = false; + // pending re-resolution + bool reresolution_closure_pending_ = false; +}; + FakeResolver::FakeResolver(ResolverArgs args) : Resolver(args.logical_thread, std::move(args.result_handler)), response_generator_( @@ -123,15 +164,35 @@ void FakeResolver::ReturnReresolutionResult() { Unref(); } -// -// FakeResolverResponseGenerator -// - -FakeResolverResponseGenerator::FakeResolverResponseGenerator() {} +class FakeResolverResponseSetter { + public: + explicit FakeResolverResponseSetter(RefCountedPtr resolver, + Resolver::Result result, + bool has_result = false, + bool immediate = true) + : resolver_(std::move(resolver)), + result_(result), + has_result_(has_result), + immediate_(immediate) {} + void SetResponseLocked(); + void SetReresolutionResponseLocked(); + void SetFailureLocked(); + + private: + RefCountedPtr resolver_; + Resolver::Result result_; + bool has_result_; + bool immediate_; +}; -FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {} +void FakeResolverResponseSetter::SetReresolutionResponseLocked() { + if (!resolver_->shutdown_) { + resolver_->reresolution_result_ = std::move(result_); + resolver_->has_reresolution_result_ = has_result_; + } +} -void FakeResolverResponseGenerator::ResponseSetter::SetResponseLocked() { +void FakeResolverResponseSetter::SetResponseLocked() { if (!resolver_->shutdown_) { resolver_->next_result_ = std::move(result_); resolver_->has_next_result_ = true; @@ -139,6 +200,21 @@ void FakeResolverResponseGenerator::ResponseSetter::SetResponseLocked() { } } +void FakeResolverResponseSetter::SetFailureLocked() { + if (!resolver_->shutdown_) { + resolver_->return_failure_ = true; + if (immediate_) resolver_->MaybeSendResultLocked(); + } +} + +// +// FakeResolverResponseGenerator +// + +FakeResolverResponseGenerator::FakeResolverResponseGenerator() {} + +FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {} + void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { RefCountedPtr resolver; { @@ -150,9 +226,8 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { } resolver = resolver_->Ref(); } - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver, - std::move(result)); + FakeResolverResponseSetter* arg = + new FakeResolverResponseSetter(resolver, std::move(result)); resolver->logical_thread()->Run( [arg]() { arg->SetResponseLocked(); @@ -161,14 +236,6 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) { DEBUG_LOCATION); } -void FakeResolverResponseGenerator::ResponseSetter:: - SetReresolutionResponseLocked() { - if (!resolver_->shutdown_) { - resolver_->reresolution_result_ = std::move(result_); - resolver_->has_reresolution_result_ = has_result_; - } -} - void FakeResolverResponseGenerator::SetReresolutionResponse( Resolver::Result result) { RefCountedPtr resolver; @@ -177,10 +244,8 @@ void FakeResolverResponseGenerator::SetReresolutionResponse( GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver, - std::move(result)); - arg->set_has_result(); + FakeResolverResponseSetter* arg = + new FakeResolverResponseSetter(resolver, std::move(result), true); resolver->logical_thread()->Run( [arg]() { arg->SetReresolutionResponseLocked(); @@ -196,9 +261,8 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver, - Resolver::Result()); + FakeResolverResponseSetter* arg = + new FakeResolverResponseSetter(resolver, Resolver::Result()); resolver->logical_thread()->Run( [arg]() { arg->SetReresolutionResponseLocked(); @@ -207,13 +271,6 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() { DEBUG_LOCATION); } -void FakeResolverResponseGenerator::ResponseSetter::SetFailureLocked() { - if (!resolver_->shutdown_) { - resolver_->return_failure_ = true; - if (immediate_) resolver_->MaybeSendResultLocked(); - } -} - void FakeResolverResponseGenerator::SetFailure() { RefCountedPtr resolver; { @@ -221,9 +278,8 @@ void FakeResolverResponseGenerator::SetFailure() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver, - Resolver::Result()); + FakeResolverResponseSetter* arg = + new FakeResolverResponseSetter(resolver, Resolver::Result()); resolver->logical_thread()->Run( [arg]() { arg->SetFailureLocked(); @@ -239,10 +295,8 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() { GPR_ASSERT(resolver_ != nullptr); resolver = resolver_->Ref(); } - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver, - Resolver::Result()); - arg->reset_immediate(); + FakeResolverResponseSetter* arg = new FakeResolverResponseSetter( + resolver, Resolver::Result(), false, false); resolver->logical_thread()->Run( [arg]() { arg->SetFailureLocked(); @@ -257,9 +311,8 @@ void FakeResolverResponseGenerator::SetFakeResolver( resolver_ = std::move(resolver); if (resolver_ == nullptr) return; if (has_result_) { - FakeResolverResponseGenerator::ResponseSetter* arg = - new FakeResolverResponseGenerator::ResponseSetter(resolver_, - std::move(result_)); + FakeResolverResponseSetter* arg = + new FakeResolverResponseSetter(resolver_, std::move(result_)); resolver_->logical_thread()->Run( [arg]() { arg->SetResponseLocked(); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h index dbfafe86c40..5e837a82fd4 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h @@ -30,47 +30,7 @@ namespace grpc_core { -class FakeResolverResponseGenerator; - -class FakeResolver : public Resolver { - public: - explicit FakeResolver(ResolverArgs args); - - void StartLocked() override; - - void RequestReresolutionLocked() override; - - private: - friend class FakeResolverResponseGenerator; - - virtual ~FakeResolver(); - - void ShutdownLocked() override; - - void MaybeSendResultLocked(); - - void ReturnReresolutionResult(); - - // passed-in parameters - grpc_channel_args* channel_args_ = nullptr; - RefCountedPtr response_generator_; - // If has_next_result_ is true, next_result_ is the next resolution result - // to be returned. - bool has_next_result_ = false; - Result next_result_; - // Result to use for the pretended re-resolution in - // RequestReresolutionLocked(). - bool has_reresolution_result_ = false; - Result reresolution_result_; - // True after the call to StartLocked(). - bool started_ = false; - // True after the call to ShutdownLocked(). - bool shutdown_ = false; - // if true, return failure - bool return_failure_ = false; - // pending re-resolution - bool reresolution_closure_pending_ = false; -}; +class FakeResolver; /// A mechanism for generating responses for the fake resolver. /// An instance of this class is passed to the fake resolver via a channel @@ -121,24 +81,6 @@ class FakeResolverResponseGenerator // Set the corresponding FakeResolver to this generator. void SetFakeResolver(RefCountedPtr resolver); - class ResponseSetter { - public: - explicit ResponseSetter(RefCountedPtr resolver, - Resolver::Result result) - : resolver_(std::move(resolver)), result_(result) {} - void set_has_result() { has_result_ = true; } - void reset_immediate() { immediate_ = false; } - void SetResponseLocked(); - void SetReresolutionResponseLocked(); - void SetFailureLocked(); - - private: - RefCountedPtr resolver_; - Resolver::Result result_; - bool has_result_ = false; - bool immediate_ = true; - }; - // Mutex protecting the members below. Mutex mu_; RefCountedPtr resolver_; diff --git a/src/core/ext/filters/client_channel/xds/xds_api.h b/src/core/ext/filters/client_channel/xds/xds_api.h index 516cf9e1a29..fe685b40df3 100644 --- a/src/core/ext/filters/client_channel/xds/xds_api.h +++ b/src/core/ext/filters/client_channel/xds/xds_api.h @@ -91,7 +91,7 @@ class XdsPriorityListUpdate { // There are two phases of accessing this class's content: // 1. to initialize in the control plane logical_thread; -// 2. to use in the data plane logical_thread. +// 2. to use in the data plane mutex. // So no additional synchronization is needed. class XdsDropConfig : public RefCounted { public: @@ -113,7 +113,7 @@ class XdsDropConfig : public RefCounted { DropCategory{std::move(name), parts_per_million}); } - // The only method invoked from the data plane logical_thread. + // The only method invoked from the data plane mutex. bool ShouldDrop(const grpc_core::UniquePtr** category_name) const; const DropCategoryList& drop_category_list() const { diff --git a/src/core/ext/filters/client_channel/xds/xds_client_stats.h b/src/core/ext/filters/client_channel/xds/xds_client_stats.h index e492a296b21..a1d915e03a4 100644 --- a/src/core/ext/filters/client_channel/xds/xds_client_stats.h +++ b/src/core/ext/filters/client_channel/xds/xds_client_stats.h @@ -140,7 +140,7 @@ class XdsClientStats { // Only be called from the control plane logical_thread. void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); } // Might be called from the control plane logical_thread or the data plane - // logical_thread. + // mutex. // TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged, // this method will also only be invoked in the control plane // logical_thread. We may then be able to simplify the LocalityStats' @@ -220,7 +220,7 @@ class XdsClientStats { Atomic total_dropped_requests_{0}; // Protects dropped_requests_. A mutex is necessary because the length of // dropped_requests_ can be accessed by both the picker (from data plane - // logical_thread) and the load reporting thread (from the control plane + // mutex) and the load reporting thread (from the control plane // logical_thread). Mutex dropped_requests_mu_; DroppedRequestsMap dropped_requests_; diff --git a/src/core/lib/transport/connectivity_state.cc b/src/core/lib/transport/connectivity_state.cc index 7b10faa8d63..a370a645a55 100644 --- a/src/core/lib/transport/connectivity_state.cc +++ b/src/core/lib/transport/connectivity_state.cc @@ -59,10 +59,10 @@ class AsyncConnectivityStateWatcherInterface::Notifier { public: Notifier(RefCountedPtr watcher, grpc_connectivity_state state, - const RefCountedPtr& combiner) + const RefCountedPtr& logical_thread) : watcher_(std::move(watcher)), state_(state) { - if (combiner != nullptr) { - combiner->Run( + if (logical_thread != nullptr) { + logical_thread->Run( Closure::ToFunction( GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr), GRPC_ERROR_NONE), @@ -92,7 +92,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier { void AsyncConnectivityStateWatcherInterface::Notify( grpc_connectivity_state state) { - new Notifier(Ref(), state, combiner_); // Deletes itself when done. + new Notifier(Ref(), state, logical_thread_); // Deletes itself when done. } // diff --git a/src/core/lib/transport/connectivity_state.h b/src/core/lib/transport/connectivity_state.h index c4220b5bd55..4eb053eb27e 100644 --- a/src/core/lib/transport/connectivity_state.h +++ b/src/core/lib/transport/connectivity_state.h @@ -72,14 +72,14 @@ class AsyncConnectivityStateWatcherInterface // If \a combiner is nullptr, then the notification will be scheduled on the // ExecCtx. explicit AsyncConnectivityStateWatcherInterface( - RefCountedPtr combiner = nullptr) - : combiner_(std::move(combiner)) {} + RefCountedPtr logical_thread = nullptr) + : logical_thread_(std::move(logical_thread)) {} // Invoked asynchronously when Notify() is called. virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0; private: - RefCountedPtr combiner_; + RefCountedPtr logical_thread_; }; // Tracks connectivity state. Maintains a list of watchers that are