Reviewer comments

reviewable/pr21361/r5
Yash Tibrewal 5 years ago
parent 4c47f6543c
commit b338d84aec
  1. 2
      src/core/ext/filters/client_channel/lb_policy.cc
  2. 2
      src/core/ext/filters/client_channel/lb_policy.h
  3. 4
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  4. 103
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  5. 48
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  6. 73
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  7. 13
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  8. 80
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  9. 137
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  10. 60
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  11. 4
      src/core/ext/filters/client_channel/xds/xds_api.h
  12. 4
      src/core/ext/filters/client_channel/xds/xds_client_stats.h
  13. 8
      src/core/lib/transport/connectivity_state.cc
  14. 6
      src/core/lib/transport/connectivity_state.h

@ -98,7 +98,7 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
// the time this function returns, the pick will already have
// been processed, and we'll be trying to re-process the same
// pick again, leading to a crash.
// 2. We are currently running in the data plane logical_thread, but we
// 2. We are currently running in the data plane mutex, but we
// need to bounce into the control plane logical_thread to call
// ExitIdleLocked().
if (!exit_idle_called_) {

@ -242,7 +242,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// live in the LB policy object itself.
///
/// Currently, pickers are always accessed from within the
/// client_channel data plane logical_thread, so they do not have to be
/// client_channel data plane mutex, so they do not have to be
/// thread-safe.
class SubchannelPicker {
public:

@ -246,7 +246,7 @@ class GrpcLb : public LoadBalancingPolicy {
// should not be dropped.
//
// Note: This is called from the picker, so it will be invoked in
// the channel's data plane logical_thread, NOT the control plane
// the channel's data plane mutex, NOT the control plane
// logical_thread. It should not be accessed by any other part of the LB
// policy.
const char* ShouldDrop();
@ -254,7 +254,7 @@ class GrpcLb : public LoadBalancingPolicy {
private:
std::vector<GrpcLbServer> serverlist_;
// Guarded by the channel's data plane logical_thread, NOT the control
// Guarded by the channel's data plane mutex, NOT the control
// plane logical_thread. It should not be accessed by anything but the
// picker via the ShouldDrop() method.
size_t drop_index_ = 0;

@ -78,8 +78,8 @@ class AresDnsResolver : public Resolver {
static void OnNextResolution(void* arg, grpc_error* error);
static void OnResolved(void* arg, grpc_error* error);
static void OnNextResolutionLocked(void* arg, grpc_error* error);
static void OnResolvedLocked(void* arg, grpc_error* error);
void OnNextResolutionLocked(grpc_error* error);
void OnResolvedLocked(grpc_error* error);
/// DNS server to use (if not system default)
char* dns_server_;
@ -128,6 +128,10 @@ AresDnsResolver::AresDnsResolver(ResolverArgs args)
.set_multiplier(GRPC_DNS_RECONNECT_BACKOFF_MULTIPLIER)
.set_jitter(GRPC_DNS_RECONNECT_JITTER)
.set_max_backoff(GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
// Closure Initialization
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx);
// Get name to resolve from URI path.
const char* path = args.uri->path;
if (path[0] == '/') ++path;
@ -201,28 +205,26 @@ void AresDnsResolver::ShutdownLocked() {
void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
r->logical_thread()->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&r->on_next_resolution_,
OnNextResolutionLocked, r, nullptr),
GRPC_ERROR_REF(error)),
GRPC_ERROR_REF(error); // ref owned by lambda
r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); },
DEBUG_LOCATION);
}
void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
void AresDnsResolver::OnNextResolutionLocked(grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"resolver:%p re-resolution timer fired. error: %s. shutdown_initiated_: "
"%d",
r, grpc_error_string(error), r->shutdown_initiated_);
r->have_next_resolution_timer_ = false;
if (error == GRPC_ERROR_NONE && !r->shutdown_initiated_) {
if (!r->resolving_) {
this, grpc_error_string(error), shutdown_initiated_);
have_next_resolution_timer_ = false;
if (error == GRPC_ERROR_NONE && !shutdown_initiated_) {
if (!resolving_) {
GRPC_CARES_TRACE_LOG(
"resolver:%p start resolving due to re-resolution timer", r);
r->StartResolvingLocked();
"resolver:%p start resolving due to re-resolution timer", this);
StartResolvingLocked();
}
}
r->Unref(DEBUG_LOCATION, "next_resolution_timer");
Unref(DEBUG_LOCATION, "next_resolution_timer");
GRPC_ERROR_UNREF(error);
}
bool ValueInJsonArray(grpc_json* array, const char* value) {
@ -327,75 +329,71 @@ char* ChooseServiceConfig(char* service_config_choice_json,
void AresDnsResolver::OnResolved(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
r->logical_thread()->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_resolved_, OnResolvedLocked, r, nullptr),
GRPC_ERROR_REF(error)),
GRPC_ERROR_REF(error); // ref owned by lambda
r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); },
DEBUG_LOCATION);
}
void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
GPR_ASSERT(r->resolving_);
r->resolving_ = false;
gpr_free(r->pending_request_);
r->pending_request_ = nullptr;
if (r->shutdown_initiated_) {
r->Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown");
void AresDnsResolver::OnResolvedLocked(grpc_error* error) {
GPR_ASSERT(resolving_);
resolving_ = false;
gpr_free(pending_request_);
pending_request_ = nullptr;
if (shutdown_initiated_) {
Unref(DEBUG_LOCATION, "OnResolvedLocked() shutdown");
GRPC_ERROR_UNREF(error);
return;
}
if (r->addresses_ != nullptr) {
if (addresses_ != nullptr) {
Result result;
result.addresses = std::move(*r->addresses_);
if (r->service_config_json_ != nullptr) {
result.addresses = std::move(*addresses_);
if (service_config_json_ != nullptr) {
char* service_config_string = ChooseServiceConfig(
r->service_config_json_, &result.service_config_error);
gpr_free(r->service_config_json_);
service_config_json_, &result.service_config_error);
gpr_free(service_config_json_);
if (result.service_config_error == GRPC_ERROR_NONE &&
service_config_string != nullptr) {
GRPC_CARES_TRACE_LOG("resolver:%p selected service config choice: %s",
r, service_config_string);
this, service_config_string);
result.service_config = ServiceConfig::Create(
service_config_string, &result.service_config_error);
}
gpr_free(service_config_string);
}
result.args = grpc_channel_args_copy(r->channel_args_);
r->result_handler()->ReturnResult(std::move(result));
r->addresses_.reset();
result.args = grpc_channel_args_copy(channel_args_);
result_handler()->ReturnResult(std::move(result));
addresses_.reset();
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
backoff_.Reset();
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", r,
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this,
grpc_error_string(error));
r->result_handler()->ReturnError(grpc_error_set_int(
result_handler()->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"DNS resolution failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
// Set retry timer.
grpc_millis next_try = r->backoff_.NextAttemptTime();
grpc_millis next_try = backoff_.NextAttemptTime();
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed (will retry): %s",
r, grpc_error_string(error));
GPR_ASSERT(!r->have_next_resolution_timer_);
r->have_next_resolution_timer_ = true;
this, grpc_error_string(error));
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
r->Ref(DEBUG_LOCATION, "retry-timer").release();
Ref(DEBUG_LOCATION, "retry-timer").release();
if (timeout > 0) {
GRPC_CARES_TRACE_LOG("resolver:%p retrying in %" PRId64 " milliseconds",
r, timeout);
this, timeout);
} else {
GRPC_CARES_TRACE_LOG("resolver:%p retrying immediately", r);
GRPC_CARES_TRACE_LOG("resolver:%p retrying immediately", this);
}
GRPC_CLOSURE_INIT(&r->on_next_resolution_, OnNextResolution, r,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&r->next_resolution_timer_, next_try,
&r->on_next_resolution_);
grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
}
r->Unref(DEBUG_LOCATION, "dns-resolving");
Unref(DEBUG_LOCATION, "dns-resolving");
GRPC_ERROR_UNREF(error);
}
void AresDnsResolver::MaybeStartResolvingLocked() {
@ -419,8 +417,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
Ref(DEBUG_LOCATION, "next_resolution_timer_cooldown").release();
GRPC_CLOSURE_INIT(&on_next_resolution_, OnNextResolution, this,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&next_resolution_timer_,
ExecCtx::Get()->Now() + ms_until_next_resolution,
&on_next_resolution_);
@ -438,7 +434,6 @@ void AresDnsResolver::StartResolvingLocked() {
GPR_ASSERT(!resolving_);
resolving_ = true;
service_config_json_ = nullptr;
GRPC_CLOSURE_INIT(&on_resolved_, OnResolved, this, grpc_schedule_on_exec_ctx);
pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
&on_resolved_, &addresses_, enable_srv_queries_ /* check_grpclb */,

@ -131,10 +131,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
}
static void on_timeout(void* arg, grpc_error* error);
static void on_timeout_locked(void* arg, grpc_error* error);
static void on_timeout_locked(grpc_ares_ev_driver* arg, grpc_error* error);
static void on_ares_backup_poll_alarm(void* arg, grpc_error* error);
static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error);
static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* arg,
grpc_error* error);
static void noop_inject_channel_config(ares_channel /*channel*/) {}
@ -232,16 +233,12 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
static void on_timeout(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
driver->logical_thread->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&driver->on_timeout_locked, on_timeout_locked,
driver, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[driver, error]() { on_timeout_locked(driver, error); }, DEBUG_LOCATION);
}
static void on_timeout_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
static void on_timeout_locked(grpc_ares_ev_driver* driver, grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_timeout_locked. driver->shutting_down=%d. "
"err=%s",
@ -250,15 +247,14 @@ static void on_timeout_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver_shutdown_locked(driver);
}
grpc_ares_ev_driver_unref(driver);
GRPC_ERROR_UNREF(error);
}
static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
GRPC_ERROR_REF(error);
driver->logical_thread->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked,
on_ares_backup_poll_alarm_locked, driver, nullptr),
GRPC_ERROR_REF(error)),
[driver, error]() { on_ares_backup_poll_alarm_locked(driver, error); },
DEBUG_LOCATION);
}
@ -270,8 +266,8 @@ static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) {
* b) when some time has passed without fd events having happened
* For the latter, we use this backup poller. Also see
* https://github.com/grpc/grpc/pull/17688 description for more details. */
static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
static void on_ares_backup_poll_alarm_locked(grpc_ares_ev_driver* driver,
grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"request:%p ev_driver=%p on_ares_backup_poll_alarm_locked. "
"driver->shutting_down=%d. "
@ -304,10 +300,10 @@ static void on_ares_backup_poll_alarm_locked(void* arg, grpc_error* error) {
grpc_ares_notify_on_event_locked(driver);
}
grpc_ares_ev_driver_unref(driver);
GRPC_ERROR_UNREF(error);
}
static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
static void on_readable_locked(fd_node* fdn, grpc_error* error) {
GPR_ASSERT(fdn->readable_registered);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
@ -329,20 +325,16 @@ static void on_readable_locked(void* arg, grpc_error* error) {
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
GRPC_ERROR_UNREF(error);
}
static void on_readable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->logical_thread->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[fdn, error]() { on_readable_locked(fdn, error); }, DEBUG_LOCATION);
}
static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
static void on_writable_locked(fd_node* fdn, grpc_error* error) {
GPR_ASSERT(fdn->writable_registered);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
const ares_socket_t as = fdn->grpc_polled_fd->GetWrappedAresSocketLocked();
@ -362,16 +354,14 @@ static void on_writable_locked(void* arg, grpc_error* error) {
}
grpc_ares_notify_on_event_locked(ev_driver);
grpc_ares_ev_driver_unref(ev_driver);
GRPC_ERROR_UNREF(error);
}
static void on_writable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
GRPC_ERROR_REF(error);
fdn->ev_driver->logical_thread->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
[fdn, error]() { on_writable_locked(fdn, error); }, DEBUG_LOCATION);
}
ares_channel* grpc_ares_ev_driver_get_channel_locked(

@ -108,6 +108,16 @@ class GrpcPolledFdWindows {
socket_type_(socket_type),
logical_thread_(std::move(logical_thread)) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
// Closure Initialization
GRPC_CLOSURE_INIT(&outer_read_closure_,
&GrpcPolledFdWindows::OnIocpReadable, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteable, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnect, this,
grpc_schedule_on_exec_ctx);
winsocket_ = grpc_winsocket_create(as, name_);
}
@ -179,10 +189,7 @@ class GrpcPolledFdWindows {
return;
}
}
grpc_socket_notify_on_read(
winsocket_, GRPC_CLOSURE_INIT(&outer_read_closure_,
&GrpcPolledFdWindows::OnIocpReadable,
this, grpc_schedule_on_exec_ctx));
grpc_socket_notify_on_read(winsocket_, &outer_read_closure_);
}
void RegisterForOnWriteableLocked(grpc_closure* write_closure) {
@ -233,11 +240,7 @@ class GrpcPolledFdWindows {
ScheduleAndNullWriteClosure(
GRPC_WSA_ERROR(wsa_error_code, "WSASend (overlapped)"));
} else {
grpc_socket_notify_on_write(
winsocket_,
GRPC_CLOSURE_INIT(&outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteable, this,
grpc_schedule_on_exec_ctx));
grpc_socket_notify_on_write(winsocket_, &outer_write_closure_);
}
break;
case WRITE_PENDING:
@ -417,22 +420,15 @@ class GrpcPolledFdWindows {
static void OnTcpConnect(void* arg, grpc_error* error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
grpc_polled_fd->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&grpc_polled_fd->on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnectLocked,
grpc_polled_fd, nullptr),
GRPC_ERROR_REF(error)),
[grpc_polled_fd, error]() {
grpc_polled_fd->OnTcpConnectLocked(error);
},
DEBUG_LOCATION);
}
static void OnTcpConnectLocked(void* arg, grpc_error* error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
grpc_polled_fd->InnerOnTcpConnectLocked(error);
}
void InnerOnTcpConnectLocked(grpc_error* error) {
void OnTcpConnectLocked(grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked error:|%s| "
"pending_register_for_readable:%d"
@ -473,6 +469,7 @@ class GrpcPolledFdWindows {
logical_thread_->Run([this]() { ContinueRegisterForOnWriteableLocked(); },
DEBUG_LOCATION);
}
GRPC_ERROR_UNREF(error);
}
int Connect(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
@ -572,27 +569,15 @@ class GrpcPolledFdWindows {
return -1;
}
}
GRPC_CLOSURE_INIT(&on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnect, this,
grpc_schedule_on_exec_ctx);
grpc_socket_notify_on_write(winsocket_, &on_tcp_connect_locked_);
return out;
}
static void OnIocpReadable(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
GRPC_ERROR_REF(error); // ref owned by lambda
polled_fd->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&polled_fd->outer_read_closure_,
&GrpcPolledFdWindows::OnIocpReadableLocked,
polled_fd, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void OnIocpReadableLocked(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
polled_fd->OnIocpReadableInner(error);
[polled_fd, error]() { OnIocpReadableLocked(error); }, DEBUG_LOCATION);
}
// TODO(apolcyn): improve this error handling to be less conversative.
@ -600,7 +585,7 @@ class GrpcPolledFdWindows {
// c-ares reads from this socket later, but it shouldn't necessarily cancel
// the entire resolution attempt. Doing so will allow the "inject broken
// nameserver list" test to pass on Windows.
void OnIocpReadableInner(grpc_error* error) {
void OnIocpReadableLocked(grpc_error* error) {
if (error == GRPC_ERROR_NONE) {
if (winsocket_->read_info.wsa_error != 0) {
/* WSAEMSGSIZE would be due to receiving more data
@ -631,25 +616,18 @@ class GrpcPolledFdWindows {
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_));
ScheduleAndNullReadClosure(error);
GRPC_ERROR_UNREF(error);
}
static void OnIocpWriteable(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
GRPC_ERROR_REF(error); // error owned by lambda
polled_fd->logical_thread_->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&polled_fd->outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteableLocked,
polled_fd, nullptr),
GRPC_ERROR_REF(error)),
[polled_fd, error]() { polled_fd->OnIocpWriteableLocked(error); },
DEBUG_LOCATION);
}
static void OnIocpWriteableLocked(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
polled_fd->OnIocpWriteableInner(error);
}
void OnIocpWriteableInner(grpc_error* error) {
void OnIocpWriteableLocked(grpc_error* error) {
GRPC_CARES_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GPR_ASSERT(socket_type_ == SOCK_STREAM);
if (error == GRPC_ERROR_NONE) {
@ -676,6 +654,7 @@ class GrpcPolledFdWindows {
write_buf_ = grpc_empty_slice();
}
ScheduleAndNullWriteClosure(error);
GRPC_ERROR_UNREF(error);
}
bool gotten_into_driver_list() const { return gotten_into_driver_list_; }

@ -695,9 +695,8 @@ typedef struct grpc_resolve_address_ares_request {
grpc_ares_request* ares_request = nullptr;
} grpc_resolve_address_ares_request;
static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
static void on_dns_lookup_done_locked(grpc_resolve_address_ares_request* r,
grpc_error* error) {
gpr_free(r->ares_request);
grpc_resolved_addresses** resolved_addresses = r->addrs_out;
if (r->addresses == nullptr || r->addresses->empty()) {
@ -718,16 +717,14 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done,
GRPC_ERROR_REF(error));
delete r;
GRPC_ERROR_UNREF(error);
}
static void on_dns_lookup_done(void* arg, grpc_error* error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
r->logical_thread->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked,
on_dns_lookup_done_locked, r, nullptr),
GRPC_ERROR_REF(error)),
GRPC_ERROR_REF(error); // ref owned by lambda
r->logical_thread->Run([r, error]() { on_dns_lookup_done_locked(r, error); },
DEBUG_LOCATION);
}

@ -67,9 +67,9 @@ class NativeDnsResolver : public Resolver {
void StartResolvingLocked();
static void OnNextResolution(void* arg, grpc_error* error);
static void OnNextResolutionLocked(void* arg, grpc_error* error);
void OnNextResolutionLocked(grpc_error* error);
static void OnResolved(void* arg, grpc_error* error);
static void OnResolvedLocked(void* arg, grpc_error* error);
void OnResolvedLocked(grpc_error* error);
/// name to resolve
char* name_to_resolve_ = nullptr;
@ -149,84 +149,76 @@ void NativeDnsResolver::ShutdownLocked() {
void NativeDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
r->logical_thread()->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_next_resolution_,
NativeDnsResolver::OnNextResolutionLocked, r,
nullptr),
GRPC_ERROR_REF(error)),
GRPC_ERROR_REF(error); // ref owned by lambda
r->logical_thread()->Run([r, error]() { r->OnNextResolutionLocked(error); },
DEBUG_LOCATION);
}
void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
r->have_next_resolution_timer_ = false;
if (error == GRPC_ERROR_NONE && !r->resolving_) {
r->StartResolvingLocked();
void NativeDnsResolver::OnNextResolutionLocked(grpc_error* error) {
have_next_resolution_timer_ = false;
if (error == GRPC_ERROR_NONE && !resolving_) {
StartResolvingLocked();
}
r->Unref(DEBUG_LOCATION, "retry-timer");
Unref(DEBUG_LOCATION, "retry-timer");
GRPC_ERROR_UNREF(error);
}
void NativeDnsResolver::OnResolved(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
r->logical_thread()->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_resolved_,
NativeDnsResolver::OnResolvedLocked, r, nullptr),
GRPC_ERROR_REF(error)),
GRPC_ERROR_REF(error); // owned by lambda
r->logical_thread()->Run([r, error]() { r->OnResolvedLocked(error); },
DEBUG_LOCATION);
}
void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
GPR_ASSERT(r->resolving_);
r->resolving_ = false;
if (r->shutdown_) {
r->Unref(DEBUG_LOCATION, "dns-resolving");
void NativeDnsResolver::OnResolvedLocked(grpc_error* error) {
GPR_ASSERT(resolving_);
resolving_ = false;
if (shutdown_) {
Unref(DEBUG_LOCATION, "dns-resolving");
GRPC_ERROR_UNREF(error);
return;
}
if (r->addresses_ != nullptr) {
if (addresses_ != nullptr) {
Result result;
for (size_t i = 0; i < r->addresses_->naddrs; ++i) {
result.addresses.emplace_back(&r->addresses_->addrs[i].addr,
r->addresses_->addrs[i].len,
for (size_t i = 0; i < addresses_->naddrs; ++i) {
result.addresses.emplace_back(&addresses_->addrs[i].addr,
addresses_->addrs[i].len,
nullptr /* args */);
}
grpc_resolved_addresses_destroy(r->addresses_);
result.args = grpc_channel_args_copy(r->channel_args_);
r->result_handler()->ReturnResult(std::move(result));
grpc_resolved_addresses_destroy(addresses_);
result.args = grpc_channel_args_copy(channel_args_);
result_handler()->ReturnResult(std::move(result));
// Reset backoff state so that we start from the beginning when the
// next request gets triggered.
r->backoff_.Reset();
backoff_.Reset();
} else {
gpr_log(GPR_INFO, "dns resolution failed (will retry): %s",
grpc_error_string(error));
// Return transient error.
r->result_handler()->ReturnError(grpc_error_set_int(
result_handler()->ReturnError(grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"DNS resolution failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
// Set up for retry.
grpc_millis next_try = r->backoff_.NextAttemptTime();
grpc_millis next_try = backoff_.NextAttemptTime();
grpc_millis timeout = next_try - ExecCtx::Get()->Now();
GPR_ASSERT(!r->have_next_resolution_timer_);
r->have_next_resolution_timer_ = true;
GPR_ASSERT(!have_next_resolution_timer_);
have_next_resolution_timer_ = true;
// TODO(roth): We currently deal with this ref manually. Once the
// new closure API is done, find a way to track this ref with the timer
// callback as part of the type system.
r->Ref(DEBUG_LOCATION, "next_resolution_timer").release();
Ref(DEBUG_LOCATION, "next_resolution_timer").release();
if (timeout > 0) {
gpr_log(GPR_DEBUG, "retrying in %" PRId64 " milliseconds", timeout);
} else {
gpr_log(GPR_DEBUG, "retrying immediately");
}
GRPC_CLOSURE_INIT(&r->on_next_resolution_,
NativeDnsResolver::OnNextResolution, r,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&r->next_resolution_timer_, next_try,
&r->on_next_resolution_);
GRPC_CLOSURE_INIT(&on_next_resolution_, NativeDnsResolver::OnNextResolution,
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&next_resolution_timer_, next_try, &on_next_resolution_);
}
r->Unref(DEBUG_LOCATION, "dns-resolving");
Unref(DEBUG_LOCATION, "dns-resolving");
GRPC_ERROR_UNREF(error);
}
void NativeDnsResolver::MaybeStartResolvingLocked() {

@ -45,6 +45,47 @@
namespace grpc_core {
class FakeResolver : public Resolver {
public:
explicit FakeResolver(ResolverArgs args);
void StartLocked() override;
void RequestReresolutionLocked() override;
private:
friend class FakeResolverResponseGenerator;
friend class FakeResolverResponseSetter;
virtual ~FakeResolver();
void ShutdownLocked() override;
void MaybeSendResultLocked();
void ReturnReresolutionResult();
// passed-in parameters
grpc_channel_args* channel_args_ = nullptr;
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// If has_next_result_ is true, next_result_ is the next resolution result
// to be returned.
bool has_next_result_ = false;
Result next_result_;
// Result to use for the pretended re-resolution in
// RequestReresolutionLocked().
bool has_reresolution_result_ = false;
Result reresolution_result_;
// True after the call to StartLocked().
bool started_ = false;
// True after the call to ShutdownLocked().
bool shutdown_ = false;
// if true, return failure
bool return_failure_ = false;
// pending re-resolution
bool reresolution_closure_pending_ = false;
};
FakeResolver::FakeResolver(ResolverArgs args)
: Resolver(args.logical_thread, std::move(args.result_handler)),
response_generator_(
@ -123,15 +164,35 @@ void FakeResolver::ReturnReresolutionResult() {
Unref();
}
//
// FakeResolverResponseGenerator
//
FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
class FakeResolverResponseSetter {
public:
explicit FakeResolverResponseSetter(RefCountedPtr<FakeResolver> resolver,
Resolver::Result result,
bool has_result = false,
bool immediate = true)
: resolver_(std::move(resolver)),
result_(result),
has_result_(has_result),
immediate_(immediate) {}
void SetResponseLocked();
void SetReresolutionResponseLocked();
void SetFailureLocked();
private:
RefCountedPtr<FakeResolver> resolver_;
Resolver::Result result_;
bool has_result_;
bool immediate_;
};
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
void FakeResolverResponseSetter::SetReresolutionResponseLocked() {
if (!resolver_->shutdown_) {
resolver_->reresolution_result_ = std::move(result_);
resolver_->has_reresolution_result_ = has_result_;
}
}
void FakeResolverResponseGenerator::ResponseSetter::SetResponseLocked() {
void FakeResolverResponseSetter::SetResponseLocked() {
if (!resolver_->shutdown_) {
resolver_->next_result_ = std::move(result_);
resolver_->has_next_result_ = true;
@ -139,6 +200,21 @@ void FakeResolverResponseGenerator::ResponseSetter::SetResponseLocked() {
}
}
void FakeResolverResponseSetter::SetFailureLocked() {
if (!resolver_->shutdown_) {
resolver_->return_failure_ = true;
if (immediate_) resolver_->MaybeSendResultLocked();
}
}
//
// FakeResolverResponseGenerator
//
FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
RefCountedPtr<FakeResolver> resolver;
{
@ -150,9 +226,8 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
}
resolver = resolver_->Ref();
}
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver,
std::move(result));
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, std::move(result));
resolver->logical_thread()->Run(
[arg]() {
arg->SetResponseLocked();
@ -161,14 +236,6 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::ResponseSetter::
SetReresolutionResponseLocked() {
if (!resolver_->shutdown_) {
resolver_->reresolution_result_ = std::move(result_);
resolver_->has_reresolution_result_ = has_result_;
}
}
void FakeResolverResponseGenerator::SetReresolutionResponse(
Resolver::Result result) {
RefCountedPtr<FakeResolver> resolver;
@ -177,10 +244,8 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver,
std::move(result));
arg->set_has_result();
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, std::move(result), true);
resolver->logical_thread()->Run(
[arg]() {
arg->SetReresolutionResponseLocked();
@ -196,9 +261,8 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver,
Resolver::Result());
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, Resolver::Result());
resolver->logical_thread()->Run(
[arg]() {
arg->SetReresolutionResponseLocked();
@ -207,13 +271,6 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::ResponseSetter::SetFailureLocked() {
if (!resolver_->shutdown_) {
resolver_->return_failure_ = true;
if (immediate_) resolver_->MaybeSendResultLocked();
}
}
void FakeResolverResponseGenerator::SetFailure() {
RefCountedPtr<FakeResolver> resolver;
{
@ -221,9 +278,8 @@ void FakeResolverResponseGenerator::SetFailure() {
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver,
Resolver::Result());
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver, Resolver::Result());
resolver->logical_thread()->Run(
[arg]() {
arg->SetFailureLocked();
@ -239,10 +295,8 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() {
GPR_ASSERT(resolver_ != nullptr);
resolver = resolver_->Ref();
}
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver,
Resolver::Result());
arg->reset_immediate();
FakeResolverResponseSetter* arg = new FakeResolverResponseSetter(
resolver, Resolver::Result(), false, false);
resolver->logical_thread()->Run(
[arg]() {
arg->SetFailureLocked();
@ -257,9 +311,8 @@ void FakeResolverResponseGenerator::SetFakeResolver(
resolver_ = std::move(resolver);
if (resolver_ == nullptr) return;
if (has_result_) {
FakeResolverResponseGenerator::ResponseSetter* arg =
new FakeResolverResponseGenerator::ResponseSetter(resolver_,
std::move(result_));
FakeResolverResponseSetter* arg =
new FakeResolverResponseSetter(resolver_, std::move(result_));
resolver_->logical_thread()->Run(
[arg]() {
arg->SetResponseLocked();

@ -30,47 +30,7 @@
namespace grpc_core {
class FakeResolverResponseGenerator;
class FakeResolver : public Resolver {
public:
explicit FakeResolver(ResolverArgs args);
void StartLocked() override;
void RequestReresolutionLocked() override;
private:
friend class FakeResolverResponseGenerator;
virtual ~FakeResolver();
void ShutdownLocked() override;
void MaybeSendResultLocked();
void ReturnReresolutionResult();
// passed-in parameters
grpc_channel_args* channel_args_ = nullptr;
RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
// If has_next_result_ is true, next_result_ is the next resolution result
// to be returned.
bool has_next_result_ = false;
Result next_result_;
// Result to use for the pretended re-resolution in
// RequestReresolutionLocked().
bool has_reresolution_result_ = false;
Result reresolution_result_;
// True after the call to StartLocked().
bool started_ = false;
// True after the call to ShutdownLocked().
bool shutdown_ = false;
// if true, return failure
bool return_failure_ = false;
// pending re-resolution
bool reresolution_closure_pending_ = false;
};
class FakeResolver;
/// A mechanism for generating responses for the fake resolver.
/// An instance of this class is passed to the fake resolver via a channel
@ -121,24 +81,6 @@ class FakeResolverResponseGenerator
// Set the corresponding FakeResolver to this generator.
void SetFakeResolver(RefCountedPtr<FakeResolver> resolver);
class ResponseSetter {
public:
explicit ResponseSetter(RefCountedPtr<FakeResolver> resolver,
Resolver::Result result)
: resolver_(std::move(resolver)), result_(result) {}
void set_has_result() { has_result_ = true; }
void reset_immediate() { immediate_ = false; }
void SetResponseLocked();
void SetReresolutionResponseLocked();
void SetFailureLocked();
private:
RefCountedPtr<FakeResolver> resolver_;
Resolver::Result result_;
bool has_result_ = false;
bool immediate_ = true;
};
// Mutex protecting the members below.
Mutex mu_;
RefCountedPtr<FakeResolver> resolver_;

@ -91,7 +91,7 @@ class XdsPriorityListUpdate {
// There are two phases of accessing this class's content:
// 1. to initialize in the control plane logical_thread;
// 2. to use in the data plane logical_thread.
// 2. to use in the data plane mutex.
// So no additional synchronization is needed.
class XdsDropConfig : public RefCounted<XdsDropConfig> {
public:
@ -113,7 +113,7 @@ class XdsDropConfig : public RefCounted<XdsDropConfig> {
DropCategory{std::move(name), parts_per_million});
}
// The only method invoked from the data plane logical_thread.
// The only method invoked from the data plane mutex.
bool ShouldDrop(const grpc_core::UniquePtr<char>** category_name) const;
const DropCategoryList& drop_category_list() const {

@ -140,7 +140,7 @@ class XdsClientStats {
// Only be called from the control plane logical_thread.
void RefByPicker() { picker_refcount_.FetchAdd(1, MemoryOrder::ACQ_REL); }
// Might be called from the control plane logical_thread or the data plane
// logical_thread.
// mutex.
// TODO(juanlishen): Once https://github.com/grpc/grpc/pull/19390 is merged,
// this method will also only be invoked in the control plane
// logical_thread. We may then be able to simplify the LocalityStats'
@ -220,7 +220,7 @@ class XdsClientStats {
Atomic<uint64_t> total_dropped_requests_{0};
// Protects dropped_requests_. A mutex is necessary because the length of
// dropped_requests_ can be accessed by both the picker (from data plane
// logical_thread) and the load reporting thread (from the control plane
// mutex) and the load reporting thread (from the control plane
// logical_thread).
Mutex dropped_requests_mu_;
DroppedRequestsMap dropped_requests_;

@ -59,10 +59,10 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state,
const RefCountedPtr<LogicalThread>& combiner)
const RefCountedPtr<LogicalThread>& logical_thread)
: watcher_(std::move(watcher)), state_(state) {
if (combiner != nullptr) {
combiner->Run(
if (logical_thread != nullptr) {
logical_thread->Run(
Closure::ToFunction(
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr),
GRPC_ERROR_NONE),
@ -92,7 +92,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {
void AsyncConnectivityStateWatcherInterface::Notify(
grpc_connectivity_state state) {
new Notifier(Ref(), state, combiner_); // Deletes itself when done.
new Notifier(Ref(), state, logical_thread_); // Deletes itself when done.
}
//

@ -72,14 +72,14 @@ class AsyncConnectivityStateWatcherInterface
// If \a combiner is nullptr, then the notification will be scheduled on the
// ExecCtx.
explicit AsyncConnectivityStateWatcherInterface(
RefCountedPtr<LogicalThread> combiner = nullptr)
: combiner_(std::move(combiner)) {}
RefCountedPtr<LogicalThread> logical_thread = nullptr)
: logical_thread_(std::move(logical_thread)) {}
// Invoked asynchronously when Notify() is called.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;
private:
RefCountedPtr<LogicalThread> combiner_;
RefCountedPtr<LogicalThread> logical_thread_;
};
// Tracks connectivity state. Maintains a list of watchers that are

Loading…
Cancel
Save