Use LogicalThread in client channel code

reviewable/pr21361/r1
Yash Tibrewal 5 years ago
parent 5817f6287d
commit e05417db32
  1. 109
      src/core/ext/filters/client_channel/client_channel.cc
  2. 9
      src/core/ext/filters/client_channel/lb_policy.cc
  3. 11
      src/core/ext/filters/client_channel/lb_policy.h
  4. 68
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  5. 32
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  6. 6
      src/core/ext/filters/client_channel/resolver.cc
  7. 10
      src/core/ext/filters/client_channel/resolver.h
  8. 16
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  9. 47
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
  10. 15
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  11. 35
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_libuv.cc
  12. 8
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  13. 122
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  14. 30
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  15. 3
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  16. 5
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
  17. 17
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  18. 46
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  19. 6
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  20. 2
      src/core/ext/filters/client_channel/resolver_factory.h
  21. 4
      src/core/ext/filters/client_channel/resolver_registry.cc
  22. 2
      src/core/ext/filters/client_channel/resolver_registry.h
  23. 75
      src/core/ext/filters/client_channel/xds/xds_client.cc
  24. 11
      src/core/ext/filters/client_channel/xds/xds_client.h
  25. 6
      src/core/lib/iomgr/closure.h
  26. 18
      src/core/lib/iomgr/logical_thread.cc
  27. 28
      src/core/lib/iomgr/logical_thread.h
  28. 9
      src/core/lib/transport/connectivity_state.cc
  29. 8
      src/core/lib/transport/connectivity_state.h
  30. 21
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  31. 40
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  32. 43
      test/core/client_channel/resolvers/dns_resolver_test.cc
  33. 10
      test/core/client_channel/resolvers/fake_resolver_test.cc
  34. 53
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  35. 3
      test/core/end2end/fuzzers/api_fuzzer.cc
  36. 11
      test/core/end2end/goaway_server_test.cc
  37. 29
      test/core/iomgr/logical_thread_test.cc
  38. 7
      test/cpp/naming/cancel_ares_query_test.cc
  39. 18
      test/cpp/naming/resolver_component_test.cc

@ -56,8 +56,8 @@
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -209,8 +209,8 @@ class ChannelData {
void Cancel();
private:
static void AddWatcherLocked(void* arg, grpc_error* ignored);
static void RemoveWatcherLocked(void* arg, grpc_error* ignored);
static void AddWatcherLocked(ExternalConnectivityWatcher* arg);
static void RemoveWatcherLocked(ExternalConnectivityWatcher* arg);
ChannelData* chand_;
grpc_polling_entity pollent_;
@ -218,8 +218,6 @@ class ChannelData {
grpc_connectivity_state* state_;
grpc_closure* on_complete_;
grpc_closure* watcher_timer_init_;
grpc_closure add_closure_;
grpc_closure remove_closure_;
Atomic<bool> done_{false};
};
@ -247,7 +245,7 @@ class ChannelData {
static void StartTransportOpLocked(void* arg, grpc_error* ignored);
static void TryToConnectLocked(void* arg, grpc_error* error_ignored);
static void TryToConnectLocked(ChannelData* arg);
void ProcessLbPolicy(
const Resolver::Result& resolver_result,
@ -283,7 +281,7 @@ class ChannelData {
//
// Fields used in the control plane. Guarded by combiner.
//
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
grpc_pollset_set* interested_parties_;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
OrphanablePtr<ResolvingLoadBalancingPolicy> resolving_lb_policy_;
@ -1049,15 +1047,21 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
: parent_(std::move(parent)),
state_(new_state),
connected_subchannel_(std::move(connected_subchannel)) {
parent_->parent_->chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_, ApplyUpdateInControlPlaneCombiner,
this, nullptr),
ExecCtx::Run(
DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
Updater* self = static_cast<Updater*>(arg);
self->parent_->parent_->chand_->combiner_->Run(
[self]() { ApplyUpdateInControlPlaneCombiner(self); },
DEBUG_LOCATION);
},
this, nullptr),
GRPC_ERROR_NONE);
}
private:
static void ApplyUpdateInControlPlaneCombiner(void* arg,
grpc_error* /*error*/) {
static void ApplyUpdateInControlPlaneCombiner(void* arg) {
Updater* self = static_cast<Updater*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
gpr_log(GPR_INFO,
@ -1083,7 +1087,6 @@ class ChannelData::SubchannelWrapper : public SubchannelInterface {
RefCountedPtr<WatcherWrapper> parent_;
grpc_connectivity_state state_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_closure closure_;
};
std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
@ -1146,9 +1149,16 @@ ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
grpc_polling_entity_add_to_pollset_set(&pollent_,
chand_->interested_parties_);
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&add_closure_, AddWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
auto* self =
static_cast<ExternalConnectivityWatcher*>(arg);
self->chand_->combiner_->Run(
[self]() { AddWatcherLocked(self); }, DEBUG_LOCATION);
},
this, nullptr),
GRPC_ERROR_NONE);
}
ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
@ -1174,9 +1184,8 @@ void ChannelData::ExternalConnectivityWatcher::Notify(
// Not needed in state SHUTDOWN, because the tracker will
// automatically remove all watchers in that case.
if (state != GRPC_CHANNEL_SHUTDOWN) {
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); },
DEBUG_LOCATION);
}
}
@ -1188,15 +1197,12 @@ void ChannelData::ExternalConnectivityWatcher::Cancel() {
}
ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
// Hop back into the combiner to clean up.
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&remove_closure_, RemoveWatcherLocked, this, nullptr),
GRPC_ERROR_NONE);
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); },
DEBUG_LOCATION);
}
void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
ExternalConnectivityWatcher* self) {
Closure::Run(DEBUG_LOCATION, self->watcher_timer_init_, GRPC_ERROR_NONE);
// Add new watcher.
self->chand_->state_tracker_.AddWatcher(
@ -1205,9 +1211,7 @@ void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked(
}
void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked(
void* arg, grpc_error* /*ignored*/) {
ExternalConnectivityWatcher* self =
static_cast<ExternalConnectivityWatcher*>(arg);
ExternalConnectivityWatcher* self) {
self->chand_->state_tracker_.RemoveWatcher(self);
}
@ -1224,17 +1228,12 @@ class ChannelData::ConnectivityWatcherAdder {
initial_state_(initial_state),
watcher_(std::move(watcher)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherAdder::AddWatcherLocked, this,
nullptr),
GRPC_ERROR_NONE);
chand_->combiner_->Run([this]() { AddWatcherLocked(this); },
DEBUG_LOCATION);
}
private:
static void AddWatcherLocked(void* arg, grpc_error* /*error*/) {
ConnectivityWatcherAdder* self =
static_cast<ConnectivityWatcherAdder*>(arg);
static void AddWatcherLocked(ConnectivityWatcherAdder* self) {
self->chand_->state_tracker_.AddWatcher(self->initial_state_,
std::move(self->watcher_));
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
@ -1245,7 +1244,6 @@ class ChannelData::ConnectivityWatcherAdder {
ChannelData* chand_;
grpc_connectivity_state initial_state_;
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
grpc_closure closure_;
};
//
@ -1258,17 +1256,12 @@ class ChannelData::ConnectivityWatcherRemover {
AsyncConnectivityStateWatcherInterface* watcher)
: chand_(chand), watcher_(watcher) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
chand_->combiner_->Run(
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherRemover::RemoveWatcherLocked,
this, nullptr),
GRPC_ERROR_NONE);
chand_->combiner_->Run([this]() { RemoveWatcherLocked(this); },
DEBUG_LOCATION);
}
private:
static void RemoveWatcherLocked(void* arg, grpc_error* /*error*/) {
ConnectivityWatcherRemover* self =
static_cast<ConnectivityWatcherRemover*>(arg);
static void RemoveWatcherLocked(ConnectivityWatcherRemover* self) {
self->chand_->state_tracker_.RemoveWatcher(self->watcher_);
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityWatcherRemover");
@ -1277,7 +1270,6 @@ class ChannelData::ConnectivityWatcherRemover {
ChannelData* chand_;
AsyncConnectivityStateWatcherInterface* watcher_;
grpc_closure closure_;
};
//
@ -1418,7 +1410,7 @@ ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
client_channel_factory_(
ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
channelz_node_(GetChannelzNode(args->channel_args)),
combiner_(grpc_combiner_create()),
combiner_(MakeRefCounted<LogicalThread>()),
interested_parties_(grpc_pollset_set_create()),
subchannel_pool_(GetSubchannelPool(args->channel_args)),
state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
@ -1489,7 +1481,6 @@ ChannelData::~ChannelData() {
// Stop backup polling.
grpc_client_channel_stop_backup_polling(interested_parties_);
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "client_channel");
GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
gpr_mu_destroy(&info_mu_);
}
@ -1890,9 +1881,11 @@ void ChannelData::StartTransportOp(grpc_channel_element* elem,
op->handler_private.extra_arg = elem;
GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
chand->combiner_->Run(
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op, nullptr),
GRPC_ERROR_NONE);
Closure::ToFunction(
GRPC_CLOSURE_INIT(&op->handler_private.closure,
ChannelData::StartTransportOpLocked, op, nullptr),
GRPC_ERROR_NONE),
DEBUG_LOCATION);
}
void ChannelData::GetChannelInfo(grpc_channel_element* elem,
@ -1943,7 +1936,7 @@ ChannelData::GetConnectedSubchannelInDataPlane(
return connected_subchannel->Ref();
}
void ChannelData::TryToConnectLocked(void* arg, grpc_error* /*error_ignored*/) {
void ChannelData::TryToConnectLocked(ChannelData* arg) {
auto* chand = static_cast<ChannelData*>(arg);
if (chand->resolving_lb_policy_ != nullptr) {
chand->resolving_lb_policy_->ExitIdleLocked();
@ -1958,8 +1951,16 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
grpc_connectivity_state out = state_tracker_.state();
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
combiner_->Run(GRPC_CLOSURE_CREATE(TryToConnectLocked, this, nullptr),
GRPC_ERROR_NONE);
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
auto* chand = static_cast<ChannelData*>(arg);
chand->combiner_->Run(
[chand]() { TryToConnectLocked(chand); },
DEBUG_LOCATION);
},
this, nullptr),
GRPC_ERROR_NONE);
}
return out;
}

@ -33,13 +33,12 @@ DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(false, "lb_policy_refcount");
LoadBalancingPolicy::LoadBalancingPolicy(Args args, intptr_t initial_refcount)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount, initial_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
combiner_(std::move(args.combiner)),
interested_parties_(grpc_pollset_set_create()),
channel_control_helper_(std::move(args.channel_control_helper)) {}
LoadBalancingPolicy::~LoadBalancingPolicy() {
grpc_pollset_set_destroy(interested_parties_);
GRPC_COMBINER_UNREF(combiner_, "lb_policy");
}
void LoadBalancingPolicy::Orphan() {
@ -106,8 +105,10 @@ LoadBalancingPolicy::PickResult LoadBalancingPolicy::QueuePicker::Pick(
exit_idle_called_ = true;
parent_->Ref().release(); // ref held by closure.
parent_->combiner()->Run(
GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr),
GRPC_ERROR_NONE);
Closure::ToFunction(
GRPC_CLOSURE_CREATE(&CallExitIdle, parent_.get(), nullptr),
GRPC_ERROR_NONE),
DEBUG_LOCATION);
}
PickResult result;
result.type = PickResult::PICK_QUEUE;

@ -31,7 +31,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/string_view.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
@ -311,10 +311,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
struct Args {
/// The combiner under which all LB policy calls will be run.
/// Policy does NOT take ownership of the reference to the combiner.
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a smart pointer that does pass ownership
// of a reference.
Combiner* combiner = nullptr;
RefCountedPtr<LogicalThread> combiner;
/// Channel control helper.
/// Note: LB policies MUST NOT call any method on the helper from
/// their constructor.
@ -387,7 +384,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
};
protected:
Combiner* combiner() const { return combiner_; }
RefCountedPtr<LogicalThread> combiner() const { return combiner_; }
// Note: LB policies MUST NOT call any method on the helper from their
// constructor.
@ -400,7 +397,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
private:
/// Combiner under which LB policy actions take place.
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
/// Owned pointer to interested parties in load balancing decisions.
grpc_pollset_set* interested_parties_;
/// Channel control helper.

@ -912,9 +912,11 @@ void GrpcLb::BalancerCallState::MaybeSendClientLoadReport(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
MaybeSendClientLoadReportLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
MaybeSendClientLoadReportLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
@ -997,9 +999,11 @@ void GrpcLb::BalancerCallState::ClientLoadReportDone(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
ClientLoadReportDoneLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->client_load_report_closure_,
ClientLoadReportDoneLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
@ -1019,9 +1023,11 @@ void GrpcLb::BalancerCallState::OnInitialRequestSent(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_initial_request_sent_,
OnInitialRequestSentLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_initial_request_sent_,
OnInitialRequestSentLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(
@ -1043,9 +1049,11 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceived(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_,
OnBalancerMessageReceivedLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_message_received_,
OnBalancerMessageReceivedLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
@ -1199,9 +1207,11 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceived(void* arg,
grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
lb_calld->grpclb_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_status_received_,
OnBalancerStatusReceivedLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lb_calld->lb_on_balancer_status_received_,
OnBalancerStatusReceivedLocked, lb_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
@ -1539,10 +1549,12 @@ void GrpcLb::OnBalancerChannelConnectivityChanged(void* arg,
grpc_error* error) {
GrpcLb* self = static_cast<GrpcLb*>(arg);
self->combiner()->Run(
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked,
self, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->lb_channel_on_connectivity_changed_,
&GrpcLb::OnBalancerChannelConnectivityChangedLocked,
self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
@ -1636,10 +1648,12 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_,
&GrpcLb::OnBalancerCallRetryTimerLocked, grpclb_policy,
nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_call_retry_,
&GrpcLb::OnBalancerCallRetryTimerLocked,
grpclb_policy, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
@ -1681,9 +1695,11 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
void GrpcLb::OnFallbackTimer(void* arg, grpc_error* error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_,
&GrpcLb::OnFallbackTimerLocked, grpclb_policy, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(GRPC_CLOSURE_INIT(&grpclb_policy->lb_on_fallback_,
&GrpcLb::OnFallbackTimerLocked,
grpclb_policy, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {

@ -867,9 +867,11 @@ void XdsLb::MaybeCancelFallbackAtStartupChecks() {
void XdsLb::OnFallbackTimer(void* arg, grpc_error* error) {
XdsLb* xdslb_policy = static_cast<XdsLb*>(arg);
xdslb_policy->combiner()->Run(
GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_,
&XdsLb::OnFallbackTimerLocked, xdslb_policy, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(GRPC_CLOSURE_INIT(&xdslb_policy->lb_on_fallback_,
&XdsLb::OnFallbackTimerLocked,
xdslb_policy, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
@ -1404,9 +1406,11 @@ void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimer(
void* arg, grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsLb::PriorityList::LocalityMap::OnDelayedRemovalTimerLocked(
@ -1443,9 +1447,11 @@ void XdsLb::PriorityList::LocalityMap::OnFailoverTimer(void* arg,
grpc_error* error) {
LocalityMap* self = static_cast<LocalityMap*>(arg);
self->xds_policy_->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked, self,
nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_failover_timer_, OnFailoverTimerLocked,
self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsLb::PriorityList::LocalityMap::OnFailoverTimerLocked(
@ -1699,9 +1705,11 @@ void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimer(
void* arg, grpc_error* error) {
Locality* self = static_cast<Locality*>(arg);
self->xds_policy()->combiner()->Run(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_,
OnDelayedRemovalTimerLocked, self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsLb::PriorityList::LocalityMap::Locality::OnDelayedRemovalTimerLocked(

@ -30,13 +30,11 @@ namespace grpc_core {
// Resolver
//
Resolver::Resolver(Combiner* combiner,
Resolver::Resolver(RefCountedPtr<LogicalThread> combiner,
std::unique_ptr<ResultHandler> result_handler)
: InternallyRefCounted(&grpc_trace_resolver_refcount),
result_handler_(std::move(result_handler)),
combiner_(GRPC_COMBINER_REF(combiner, "resolver")) {}
Resolver::~Resolver() { GRPC_COMBINER_UNREF(combiner_, "resolver"); }
combiner_(std::move(combiner)) {}
//
// Resolver::Result

@ -27,8 +27,8 @@
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/logical_thread.h"
extern grpc_core::DebugOnlyTraceFlag grpc_trace_resolver_refcount;
@ -87,7 +87,7 @@ class Resolver : public InternallyRefCounted<Resolver> {
// Not copyable nor movable.
Resolver(const Resolver&) = delete;
Resolver& operator=(const Resolver&) = delete;
virtual ~Resolver();
virtual ~Resolver() {}
/// Starts resolving.
virtual void StartLocked() = 0;
@ -126,19 +126,19 @@ class Resolver : public InternallyRefCounted<Resolver> {
// TODO(roth): Once we have a C++-like interface for combiners, this
// API should change to take a RefCountedPtr<>, so that we always take
// ownership of a new ref.
explicit Resolver(Combiner* combiner,
explicit Resolver(RefCountedPtr<LogicalThread> combiner,
std::unique_ptr<ResultHandler> result_handler);
/// Shuts down the resolver.
virtual void ShutdownLocked() = 0;
Combiner* combiner() const { return combiner_; }
RefCountedPtr<LogicalThread> combiner() const { return combiner_; }
ResultHandler* result_handler() const { return result_handler_.get(); }
private:
std::unique_ptr<ResultHandler> result_handler_;
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
};
} // namespace grpc_core

@ -40,9 +40,9 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/gethostname.h"
#include "src/core/lib/iomgr/iomgr_custom.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
@ -201,9 +201,11 @@ void AresDnsResolver::ShutdownLocked() {
void AresDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
r->combiner()->Run(GRPC_CLOSURE_INIT(&r->on_next_resolution_,
OnNextResolutionLocked, r, nullptr),
GRPC_ERROR_REF(error));
r->combiner()->Run(
Closure::ToFunction(GRPC_CLOSURE_INIT(&r->on_next_resolution_,
OnNextResolutionLocked, r, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void AresDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
@ -326,8 +328,10 @@ char* ChooseServiceConfig(char* service_config_choice_json,
void AresDnsResolver::OnResolved(void* arg, grpc_error* error) {
AresDnsResolver* r = static_cast<AresDnsResolver*>(arg);
r->combiner()->Run(
GRPC_CLOSURE_INIT(&r->on_resolved_, OnResolvedLocked, r, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_resolved_, OnResolvedLocked, r, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {

@ -31,7 +31,6 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
@ -67,7 +66,7 @@ struct grpc_ares_ev_driver {
gpr_refcount refs;
/** combiner to synchronize c-ares and I/O callbacks on */
grpc_core::Combiner* combiner;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner;
/** a list of grpc_fd that this event driver is currently using. */
fd_node* fds;
/** is this event driver currently working? */
@ -107,7 +106,6 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
GRPC_CARES_TRACE_LOG("request:%p destroy ev_driver %p", ev_driver->request,
ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr);
GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel);
grpc_ares_complete_request_locked(ev_driver->request);
delete ev_driver;
@ -143,11 +141,11 @@ static void noop_inject_channel_config(ares_channel /*channel*/) {}
void (*grpc_ares_test_only_inject_config)(ares_channel channel) =
noop_inject_channel_config;
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_core::Combiner* combiner,
grpc_ares_request* request) {
grpc_error* grpc_ares_ev_driver_create_locked(
grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner,
grpc_ares_request* request) {
*ev_driver = new grpc_ares_ev_driver();
ares_options opts;
memset(&opts, 0, sizeof(opts));
@ -164,7 +162,7 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
gpr_free(*ev_driver);
return err;
}
(*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
(*ev_driver)->combiner = std::move(combiner);
gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr;
@ -234,9 +232,12 @@ static grpc_millis calculate_next_ares_backup_poll_alarm_ms(
static void on_timeout(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
driver->combiner->Run(GRPC_CLOSURE_INIT(&driver->on_timeout_locked,
on_timeout_locked, driver, nullptr),
GRPC_ERROR_REF(error));
driver->combiner->Run(
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&driver->on_timeout_locked, on_timeout_locked,
driver, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void on_timeout_locked(void* arg, grpc_error* error) {
@ -254,9 +255,11 @@ static void on_timeout_locked(void* arg, grpc_error* error) {
static void on_ares_backup_poll_alarm(void* arg, grpc_error* error) {
grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
driver->combiner->Run(
GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked,
on_ares_backup_poll_alarm_locked, driver, nullptr),
GRPC_ERROR_REF(error));
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&driver->on_ares_backup_poll_alarm_locked,
on_ares_backup_poll_alarm_locked, driver, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
/* In case of non-responsive DNS servers, dropped packets, etc., c-ares has
@ -331,8 +334,11 @@ static void on_readable_locked(void* arg, grpc_error* error) {
static void on_readable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->combiner->Run(
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn, nullptr),
GRPC_ERROR_REF(error));
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void on_writable_locked(void* arg, grpc_error* error) {
@ -361,8 +367,11 @@ static void on_writable_locked(void* arg, grpc_error* error) {
static void on_writable(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
fdn->ev_driver->combiner->Run(
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn, nullptr),
GRPC_ERROR_REF(error));
grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
ares_channel* grpc_ares_ev_driver_get_channel_locked(

@ -40,11 +40,11 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked(
/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
created successfully. */
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_core::Combiner* combiner,
grpc_ares_request* request);
grpc_error* grpc_ares_ev_driver_create_locked(
grpc_ares_ev_driver** ev_driver, grpc_pollset_set* pollset_set,
int query_timeout_ms,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner,
grpc_ares_request* request);
/* Called back when all DNS lookups have completed. */
void grpc_ares_ev_driver_on_queries_complete_locked(
@ -90,12 +90,13 @@ class GrpcPolledFdFactory {
/* Creates a new wrapped fd for the current platform */
virtual GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
Combiner* combiner) = 0;
RefCountedPtr<LogicalThread> combiner) = 0;
/* Optionally configures the ares channel after creation */
virtual void ConfigureAresChannelLocked(ares_channel channel) = 0;
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(Combiner* combiner);
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
RefCountedPtr<LogicalThread> combiner);
} // namespace grpc_core

@ -31,7 +31,7 @@
#include <grpc/support/time.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
namespace grpc_core {
@ -41,19 +41,15 @@ void ares_uv_poll_close_cb(uv_handle_t* handle) { delete handle; }
class GrpcPolledFdLibuv : public GrpcPolledFd {
public:
GrpcPolledFdLibuv(ares_socket_t as, Combiner* combiner)
: as_(as), combiner_(combiner) {
GrpcPolledFdLibuv(ares_socket_t as, RefCountedPtr<LogicalThread> combiner)
: as_(as), combiner_(std::move(combiner)) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, (intptr_t)as);
handle_ = new uv_poll_t();
uv_poll_init_socket(uv_default_loop(), handle_, as);
handle_->data = this;
GRPC_COMBINER_REF(combiner_, "libuv ares event driver");
}
~GrpcPolledFdLibuv() {
gpr_free(name_);
GRPC_COMBINER_UNREF(combiner_, "libuv ares event driver");
}
~GrpcPolledFdLibuv() { gpr_free(name_); }
void RegisterForOnReadableLocked(grpc_closure* read_closure) override {
GPR_ASSERT(read_closure_ == nullptr);
@ -77,7 +73,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
return false;
}
void ShutdownInternalLocked(grpc_error* error) {
void ShutdownInternalLocked(grpc_error* /*error*/) {
uv_poll_stop(handle_);
uv_close(reinterpret_cast<uv_handle_t*>(handle_), ares_uv_poll_close_cb);
if (read_closure_ != nullptr) {
@ -109,7 +105,7 @@ class GrpcPolledFdLibuv : public GrpcPolledFd {
grpc_closure* read_closure_ = nullptr;
grpc_closure* write_closure_ = nullptr;
int poll_events_ = 0;
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
};
struct AresUvPollCbArg {
@ -121,14 +117,14 @@ struct AresUvPollCbArg {
int events;
};
static void ares_uv_poll_cb_locked(void* arg, grpc_error* error) {
std::unique_ptr<AresUvPollCbArg> arg_struct(
reinterpret_cast<AresUvPollCbArg*>(arg));
static void ares_uv_poll_cb_locked(AresUvPollCbArg* arg) {
std::unique_ptr<AresUvPollCbArg> arg_struct(arg);
uv_poll_t* handle = arg_struct->handle;
int status = arg_struct->status;
int events = arg_struct->events;
GrpcPolledFdLibuv* polled_fd =
reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
grpc_error* error = GRPC_ERROR_NONE;
if (status < 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("cares polling error");
error =
@ -155,16 +151,15 @@ void ares_uv_poll_cb(uv_poll_t* handle, int status, int events) {
GrpcPolledFdLibuv* polled_fd =
reinterpret_cast<GrpcPolledFdLibuv*>(handle->data);
AresUvPollCbArg* arg = new AresUvPollCbArg(handle, status, events);
polled_fd->combiner_->Run(
GRPC_CLOSURE_CREATE(ares_uv_poll_cb_locked, arg, nullptr),
GRPC_ERROR_NONE);
polled_fd->combiner_->Run([arg]() { ares_uv_poll_cb_locked(arg); },
DEBUG_LOCATION);
}
class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
public:
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
Combiner* combiner) override {
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
RefCountedPtr<LogicalThread> combiner) override {
return new GrpcPolledFdLibuv(as, combiner);
}
@ -172,7 +167,7 @@ class GrpcPolledFdFactoryLibuv : public GrpcPolledFdFactory {
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
Combiner* combiner) {
RefCountedPtr<LogicalThread> /*combiner*/) {
return MakeUnique<GrpcPolledFdFactoryLibuv>();
}

@ -88,9 +88,9 @@ class GrpcPolledFdPosix : public GrpcPolledFd {
class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
public:
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
Combiner* /*combiner*/) override {
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
RefCountedPtr<LogicalThread> /*combiner*/) override {
return new GrpcPolledFdPosix(as, driver_pollset_set);
}
@ -98,7 +98,7 @@ class GrpcPolledFdFactoryPosix : public GrpcPolledFdFactory {
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
Combiner* /*combiner*/) {
RefCountedPtr<LogicalThread> /*combiner*/) {
return MakeUnique<GrpcPolledFdFactoryPosix>();
}

@ -97,8 +97,8 @@ class GrpcPolledFdWindows {
WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY,
};
GrpcPolledFdWindows(ares_socket_t as, Combiner* combiner, int address_family,
int socket_type)
GrpcPolledFdWindows(ares_socket_t as, RefCountedPtr<LogicalThread> combiner,
int address_family, int socket_type)
: read_buf_(grpc_empty_slice()),
write_buf_(grpc_empty_slice()),
tcp_write_state_(WRITE_IDLE),
@ -107,18 +107,10 @@ class GrpcPolledFdWindows {
socket_type_(socket_type) {
gpr_asprintf(&name_, "c-ares socket: %" PRIdPTR, as);
winsocket_ = grpc_winsocket_create(as, name_);
combiner_ = GRPC_COMBINER_REF(combiner, name_);
GRPC_CLOSURE_INIT(&continue_register_for_on_readable_locked_,
&GrpcPolledFdWindows::ContinueRegisterForOnReadableLocked,
this, nullptr);
GRPC_CLOSURE_INIT(
&continue_register_for_on_writeable_locked_,
&GrpcPolledFdWindows::ContinueRegisterForOnWriteableLocked, this,
nullptr);
combiner_ = std::move(combiner);
}
~GrpcPolledFdWindows() {
GRPC_COMBINER_UNREF(combiner_, name_);
grpc_slice_unref_internal(read_buf_);
grpc_slice_unref_internal(write_buf_);
GPR_ASSERT(read_closure_ == nullptr);
@ -145,19 +137,16 @@ class GrpcPolledFdWindows {
GPR_ASSERT(!read_buf_has_data_);
read_buf_ = GRPC_SLICE_MALLOC(4192);
if (connect_done_) {
combiner_->Run(&continue_register_for_on_readable_locked_,
GRPC_ERROR_NONE);
combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); },
DEBUG_LOCATION);
} else {
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == nullptr);
pending_continue_register_for_on_readable_locked_ =
&continue_register_for_on_readable_locked_;
GPR_ASSERT(pending_continue_register_for_on_readable_locked_ == false);
pending_continue_register_for_on_readable_locked_ = true;
}
}
static void ContinueRegisterForOnReadableLocked(void* arg,
grpc_error* unused_error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
static void ContinueRegisterForOnReadableLocked(
GrpcPolledFdWindows* grpc_polled_fd) {
grpc_polled_fd->InnerContinueRegisterForOnReadableLocked(GRPC_ERROR_NONE);
}
@ -213,19 +202,16 @@ class GrpcPolledFdWindows {
GPR_ASSERT(write_closure_ == nullptr);
write_closure_ = write_closure;
if (connect_done_) {
combiner_->Run(&continue_register_for_on_writeable_locked_,
GRPC_ERROR_NONE);
combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); },
DEBUG_LOCATION);
} else {
GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == nullptr);
pending_continue_register_for_on_writeable_locked_ =
&continue_register_for_on_writeable_locked_;
GPR_ASSERT(pending_continue_register_for_on_writeable_locked_ == false);
pending_continue_register_for_on_writeable_locked_ = true;
}
}
static void ContinueRegisterForOnWriteableLocked(void* arg,
grpc_error* unused_error) {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
static void ContinueRegisterForOnWriteableLocked(
GrpcPolledFdWindows* grpc_polled_fd) {
grpc_polled_fd->InnerContinueRegisterForOnWriteableLocked(GRPC_ERROR_NONE);
}
@ -441,10 +427,12 @@ class GrpcPolledFdWindows {
GrpcPolledFdWindows* grpc_polled_fd =
static_cast<GrpcPolledFdWindows*>(arg);
grpc_polled_fd->combiner_->Run(
GRPC_CLOSURE_INIT(&grpc_polled_fd->on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnectLocked,
grpc_polled_fd, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&grpc_polled_fd->on_tcp_connect_locked_,
&GrpcPolledFdWindows::OnTcpConnectLocked,
grpc_polled_fd, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void OnTcpConnectLocked(void* arg, grpc_error* error) {
@ -456,8 +444,8 @@ class GrpcPolledFdWindows {
void InnerOnTcpConnectLocked(grpc_error* error) {
GRPC_CARES_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked error:|%s| "
"pending_register_for_readable:%" PRIdPTR
" pending_register_for_writeable:%" PRIdPTR,
"pending_register_for_readable:%d"
" pending_register_for_writeable:%d",
GetName(), grpc_error_string(error),
pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
@ -486,13 +474,13 @@ class GrpcPolledFdWindows {
// this fd to abort.
wsa_connect_error_ = WSA_OPERATION_ABORTED;
}
if (pending_continue_register_for_on_readable_locked_ != nullptr) {
combiner_->Run(pending_continue_register_for_on_readable_locked_,
GRPC_ERROR_NONE);
if (pending_continue_register_for_on_readable_locked_) {
combiner_->Run([this]() { ContinueRegisterForOnReadableLocked(this); },
DEBUG_LOCATION);
}
if (pending_continue_register_for_on_writeable_locked_ != nullptr) {
combiner_->Run(pending_continue_register_for_on_writeable_locked_,
GRPC_ERROR_NONE);
if (pending_continue_register_for_on_writeable_locked_) {
combiner_->Run([this]() { ContinueRegisterForOnWriteableLocked(this); },
DEBUG_LOCATION);
}
}
@ -603,10 +591,12 @@ class GrpcPolledFdWindows {
static void OnIocpReadable(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
polled_fd->combiner_->Run(
GRPC_CLOSURE_INIT(&polled_fd->outer_read_closure_,
&GrpcPolledFdWindows::OnIocpReadableLocked, polled_fd,
nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&polled_fd->outer_read_closure_,
&GrpcPolledFdWindows::OnIocpReadableLocked,
polled_fd, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void OnIocpReadableLocked(void* arg, grpc_error* error) {
@ -655,10 +645,12 @@ class GrpcPolledFdWindows {
static void OnIocpWriteable(void* arg, grpc_error* error) {
GrpcPolledFdWindows* polled_fd = static_cast<GrpcPolledFdWindows*>(arg);
polled_fd->combiner_->Run(
GRPC_CLOSURE_INIT(&polled_fd->outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteableLocked,
polled_fd, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&polled_fd->outer_write_closure_,
&GrpcPolledFdWindows::OnIocpWriteableLocked,
polled_fd, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void OnIocpWriteableLocked(void* arg, grpc_error* error) {
@ -698,7 +690,7 @@ class GrpcPolledFdWindows {
bool gotten_into_driver_list() const { return gotten_into_driver_list_; }
void set_gotten_into_driver_list() { gotten_into_driver_list_ = true; }
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
char recv_from_source_addr_[200];
ares_socklen_t recv_from_source_addr_len_;
grpc_slice read_buf_;
@ -721,10 +713,8 @@ class GrpcPolledFdWindows {
// We don't run register_for_{readable,writeable} logic until
// a socket is connected. In the interim, we queue readable/writeable
// registrations with the following state.
grpc_closure continue_register_for_on_readable_locked_;
grpc_closure continue_register_for_on_writeable_locked_;
grpc_closure* pending_continue_register_for_on_readable_locked_ = nullptr;
grpc_closure* pending_continue_register_for_on_writeable_locked_ = nullptr;
bool pending_continue_register_for_on_readable_locked_ = false;
bool pending_continue_register_for_on_writeable_locked_ = false;
};
struct SockToPolledFdEntry {
@ -742,14 +732,10 @@ struct SockToPolledFdEntry {
* with a GrpcPolledFdWindows factory and event driver */
class SockToPolledFdMap {
public:
SockToPolledFdMap(Combiner* combiner) {
combiner_ = GRPC_COMBINER_REF(combiner, "sock to polled fd map");
}
SockToPolledFdMap(RefCountedPtr<LogicalThread> combiner)
: combiner_(std::move(combiner)) {}
~SockToPolledFdMap() {
GPR_ASSERT(head_ == nullptr);
GRPC_COMBINER_UNREF(combiner_, "sock to polled fd map");
}
~SockToPolledFdMap() { GPR_ASSERT(head_ == nullptr); }
void AddNewSocket(SOCKET s, GrpcPolledFdWindows* polled_fd) {
SockToPolledFdEntry* new_node = new SockToPolledFdEntry(s, polled_fd);
@ -861,7 +847,7 @@ class SockToPolledFdMap {
private:
SockToPolledFdEntry* head_ = nullptr;
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
};
const struct ares_socket_functions custom_ares_sock_funcs = {
@ -910,12 +896,12 @@ class GrpcPolledFdWindowsWrapper : public GrpcPolledFd {
class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
public:
GrpcPolledFdFactoryWindows(Combiner* combiner)
GrpcPolledFdFactoryWindows(RefCountedPtr<LogicalThread> combiner)
: sock_to_polled_fd_map_(combiner) {}
GrpcPolledFd* NewGrpcPolledFdLocked(ares_socket_t as,
grpc_pollset_set* driver_pollset_set,
Combiner* combiner) override {
GrpcPolledFd* NewGrpcPolledFdLocked(
ares_socket_t as, grpc_pollset_set* driver_pollset_set,
RefCountedPtr<LogicalThread> combiner) override {
GrpcPolledFdWindows* polled_fd = sock_to_polled_fd_map_.LookupPolledFd(as);
// Set a flag so that the virtual socket "close" method knows it
// doesn't need to call ShutdownLocked, since now the driver will.
@ -933,8 +919,8 @@ class GrpcPolledFdFactoryWindows : public GrpcPolledFdFactory {
};
std::unique_ptr<GrpcPolledFdFactory> NewGrpcPolledFdFactory(
Combiner* combiner) {
return MakeUnique<GrpcPolledFdFactoryWindows>(combiner);
RefCountedPtr<LogicalThread> combiner) {
return MakeUnique<GrpcPolledFdFactoryWindows>(std::move(combiner));
}
} // namespace grpc_core

@ -37,7 +37,6 @@
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
@ -350,7 +349,8 @@ done:
void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
grpc_ares_request* r, const char* dns_server, const char* name,
const char* default_port, grpc_pollset_set* interested_parties,
bool check_grpclb, int query_timeout_ms, grpc_core::Combiner* combiner) {
bool check_grpclb, int query_timeout_ms,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
ares_channel* channel = nullptr;
@ -590,7 +590,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) {
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
grpc_ares_request* r =
static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
r->ev_driver = nullptr;
@ -633,7 +633,8 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) =
grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
GPR_ASSERT(r != nullptr);
@ -674,7 +675,7 @@ void grpc_ares_cleanup(void) {}
typedef struct grpc_resolve_address_ares_request {
/* combiner that queries and related callbacks run under */
grpc_core::Combiner* combiner;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner;
/** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out;
/** currently resolving addresses */
@ -716,20 +717,20 @@ static void on_dns_lookup_done_locked(void* arg, grpc_error* error) {
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_resolve_address_done,
GRPC_ERROR_REF(error));
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
delete r;
}
static void on_dns_lookup_done(void* arg, grpc_error* error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
r->combiner->Run(GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked,
on_dns_lookup_done_locked, r, nullptr),
GRPC_ERROR_REF(error));
r->combiner->Run(grpc_core::Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked,
on_dns_lookup_done_locked, r, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
void* arg, grpc_error* /*unused_error*/) {
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(void* arg) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done_locked, on_dns_lookup_done, r,
@ -748,16 +749,15 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_resolved_addresses** addrs) {
grpc_resolve_address_ares_request* r =
new grpc_resolve_address_ares_request();
r->combiner = grpc_combiner_create();
r->combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
r->addrs_out = addrs;
r->on_resolve_address_done = on_done;
r->name = name;
r->default_port = default_port;
r->interested_parties = interested_parties;
r->combiner->Run(
GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r,
nullptr),
GRPC_ERROR_NONE);
[r]() { grpc_resolve_address_invoke_dns_lookup_ares_locked(r); },
DEBUG_LOCATION);
}
void (*grpc_resolve_address_ares)(

@ -23,6 +23,7 @@
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
@ -65,7 +66,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner);
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner);
/* Cancel the pending grpc_ares_request \a request */
extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);

@ -31,7 +31,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) {
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
return NULL;
}
@ -40,7 +40,8 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addrs, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) =
grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}

@ -150,9 +150,12 @@ void NativeDnsResolver::ShutdownLocked() {
void NativeDnsResolver::OnNextResolution(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
r->combiner()->Run(
GRPC_CLOSURE_INIT(&r->on_next_resolution_,
NativeDnsResolver::OnNextResolutionLocked, r, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_next_resolution_,
NativeDnsResolver::OnNextResolutionLocked, r,
nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
@ -167,9 +170,11 @@ void NativeDnsResolver::OnNextResolutionLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::OnResolved(void* arg, grpc_error* error) {
NativeDnsResolver* r = static_cast<NativeDnsResolver*>(arg);
r->combiner()->Run(
GRPC_CLOSURE_INIT(&r->on_resolved_, NativeDnsResolver::OnResolvedLocked,
r, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&r->on_resolved_,
NativeDnsResolver::OnResolvedLocked, r, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {

@ -70,7 +70,7 @@ class FakeResolver : public Resolver {
void MaybeSendResultLocked();
static void ReturnReresolutionResult(void* arg, grpc_error* error);
static void ReturnReresolutionResult(void* arg);
// passed-in parameters
grpc_channel_args* channel_args_ = nullptr;
@ -90,7 +90,6 @@ class FakeResolver : public Resolver {
// if true, return failure
bool return_failure_ = false;
// pending re-resolution
grpc_closure reresolution_closure_;
bool reresolution_closure_pending_ = false;
};
@ -127,9 +126,8 @@ void FakeResolver::RequestReresolutionLocked() {
if (!reresolution_closure_pending_) {
reresolution_closure_pending_ = true;
Ref().release(); // ref held by closure
GRPC_CLOSURE_INIT(&reresolution_closure_, ReturnReresolutionResult, this,
nullptr);
combiner()->Run(&reresolution_closure_, GRPC_ERROR_NONE);
combiner()->Run([this]() { ReturnReresolutionResult(this); },
DEBUG_LOCATION);
}
}
}
@ -159,7 +157,7 @@ void FakeResolver::MaybeSendResultLocked() {
}
}
void FakeResolver::ReturnReresolutionResult(void* arg, grpc_error* /*error*/) {
void FakeResolver::ReturnReresolutionResult(void* arg) {
FakeResolver* self = static_cast<FakeResolver*>(arg);
self->reresolution_closure_pending_ = false;
self->MaybeSendResultLocked();
@ -175,15 +173,13 @@ FakeResolverResponseGenerator::FakeResolverResponseGenerator() {}
FakeResolverResponseGenerator::~FakeResolverResponseGenerator() {}
struct SetResponseClosureArg {
grpc_closure set_response_closure;
RefCountedPtr<FakeResolver> resolver;
Resolver::Result result;
bool has_result = false;
bool immediate = true;
};
void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
grpc_error* /*error*/) {
void FakeResolverResponseGenerator::SetResponseLocked(void* arg) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
auto& resolver = closure_arg->resolver;
if (!resolver->shutdown_) {
@ -209,13 +205,10 @@ void FakeResolverResponseGenerator::SetResponse(Resolver::Result result) {
closure_arg->resolver = std::move(resolver);
closure_arg->result = std::move(result);
closure_arg->resolver->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetReresolutionResponseLocked(
void* arg, grpc_error* /*error*/) {
void FakeResolverResponseGenerator::SetReresolutionResponseLocked(void* arg) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
auto& resolver = closure_arg->resolver;
if (!resolver->shutdown_) {
@ -238,9 +231,8 @@ void FakeResolverResponseGenerator::SetReresolutionResponse(
closure_arg->result = std::move(result);
closure_arg->has_result = true;
closure_arg->resolver->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
SetReresolutionResponseLocked, closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetReresolutionResponseLocked(closure_arg); },
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
@ -253,13 +245,11 @@ void FakeResolverResponseGenerator::UnsetReresolutionResponse() {
SetResponseClosureArg* closure_arg = new SetResponseClosureArg();
closure_arg->resolver = std::move(resolver);
closure_arg->resolver->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
SetReresolutionResponseLocked, closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetReresolutionResponseLocked(closure_arg); },
DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetFailureLocked(void* arg,
grpc_error* /*error*/) {
void FakeResolverResponseGenerator::SetFailureLocked(void* arg) {
SetResponseClosureArg* closure_arg = static_cast<SetResponseClosureArg*>(arg);
auto& resolver = closure_arg->resolver;
if (!resolver->shutdown_) {
@ -279,9 +269,7 @@ void FakeResolverResponseGenerator::SetFailure() {
SetResponseClosureArg* closure_arg = new SetResponseClosureArg();
closure_arg->resolver = std::move(resolver);
closure_arg->resolver->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetFailureOnReresolution() {
@ -295,9 +283,7 @@ void FakeResolverResponseGenerator::SetFailureOnReresolution() {
closure_arg->resolver = std::move(resolver);
closure_arg->immediate = false;
closure_arg->resolver->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetFailureLocked,
closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetFailureLocked(closure_arg); }, DEBUG_LOCATION);
}
void FakeResolverResponseGenerator::SetFakeResolver(
@ -310,9 +296,7 @@ void FakeResolverResponseGenerator::SetFakeResolver(
closure_arg->resolver = resolver_->Ref();
closure_arg->result = std::move(result_);
resolver_->combiner()->Run(
GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, SetResponseLocked,
closure_arg, nullptr),
GRPC_ERROR_NONE);
[closure_arg]() { SetResponseLocked(closure_arg); }, DEBUG_LOCATION);
has_result_ = false;
}
}

@ -80,9 +80,9 @@ class FakeResolverResponseGenerator
// Set the corresponding FakeResolver to this generator.
void SetFakeResolver(RefCountedPtr<FakeResolver> resolver);
static void SetResponseLocked(void* arg, grpc_error* error);
static void SetReresolutionResponseLocked(void* arg, grpc_error* error);
static void SetFailureLocked(void* arg, grpc_error* error);
static void SetResponseLocked(void* arg);
static void SetReresolutionResponseLocked(void* arg);
static void SetFailureLocked(void* arg);
// Mutex protecting the members below.
Mutex mu_;

@ -39,7 +39,7 @@ struct ResolverArgs {
/// Used to drive I/O in the name resolution process.
grpc_pollset_set* pollset_set = nullptr;
/// The combiner under which all resolver calls will be run.
Combiner* combiner = nullptr;
RefCountedPtr<LogicalThread> combiner = nullptr;
/// The result handler to be used by the resolver.
std::unique_ptr<Resolver::ResultHandler> result_handler;
};

@ -145,7 +145,7 @@ bool ResolverRegistry::IsValidTarget(const char* target) {
OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, Combiner* combiner,
grpc_pollset_set* pollset_set, RefCountedPtr<LogicalThread> combiner,
std::unique_ptr<Resolver::ResultHandler> result_handler) {
GPR_ASSERT(g_state != nullptr);
grpc_uri* uri = nullptr;
@ -156,7 +156,7 @@ OrphanablePtr<Resolver> ResolverRegistry::CreateResolver(
resolver_args.uri = uri;
resolver_args.args = args;
resolver_args.pollset_set = pollset_set;
resolver_args.combiner = combiner;
resolver_args.combiner = std::move(combiner);
resolver_args.result_handler = std::move(result_handler);
OrphanablePtr<Resolver> resolver =
factory == nullptr ? nullptr

@ -69,7 +69,7 @@ class ResolverRegistry {
/// \a result_handler is used to return results from the resolver.
static OrphanablePtr<Resolver> CreateResolver(
const char* target, const grpc_channel_args* args,
grpc_pollset_set* pollset_set, Combiner* combiner,
grpc_pollset_set* pollset_set, RefCountedPtr<LogicalThread> combiner,
std::unique_ptr<Resolver::ResultHandler> result_handler);
/// Returns the default authority to pass from a client for \a target.

@ -488,9 +488,10 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
void* arg, grpc_error* error) {
RetryableCall* calld = static_cast<RetryableCall*>(arg);
calld->chand_->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&calld->on_retry_timer_, OnRetryTimerLocked, calld,
nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(GRPC_CLOSURE_INIT(&calld->on_retry_timer_,
OnRetryTimerLocked, calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
template <typename T>
@ -633,9 +634,11 @@ void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ads_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_,
OnResponseReceivedLocked, ads_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&ads_calld->on_response_received_,
OnResponseReceivedLocked, ads_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked(
@ -790,9 +793,11 @@ void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
ads_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&ads_calld->on_status_received_, OnStatusReceivedLocked,
ads_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&ads_calld->on_status_received_,
OnStatusReceivedLocked, ads_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
@ -851,9 +856,11 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&self->on_next_report_timer_, OnNextReportTimerLocked,
self, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&self->on_next_report_timer_,
OnNextReportTimerLocked, self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
@ -911,9 +918,10 @@ void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
void* arg, grpc_error* error) {
Reporter* self = static_cast<Reporter*>(arg);
self->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&self->on_report_done_, OnReportDoneLocked, self,
nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(GRPC_CLOSURE_INIT(&self->on_report_done_,
OnReportDoneLocked, self, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
@ -1080,9 +1088,11 @@ void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_,
OnInitialRequestSentLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_initial_request_sent_,
OnInitialRequestSentLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked(
@ -1099,9 +1109,11 @@ void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_,
OnResponseReceivedLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_response_received_,
OnResponseReceivedLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked(
@ -1198,9 +1210,11 @@ void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
void* arg, grpc_error* error) {
LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
lrs_calld->xds_client()->combiner_->Run(
GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_, OnStatusReceivedLocked,
lrs_calld, nullptr),
GRPC_ERROR_REF(error));
Closure::ToFunction(
GRPC_CLOSURE_INIT(&lrs_calld->on_status_received_,
OnStatusReceivedLocked, lrs_calld, nullptr),
GRPC_ERROR_REF(error)),
DEBUG_LOCATION);
}
void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
@ -1249,12 +1263,13 @@ grpc_core::UniquePtr<char> GenerateBuildVersionString() {
} // namespace
XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
XdsClient::XdsClient(RefCountedPtr<LogicalThread> combiner,
grpc_pollset_set* interested_parties,
StringView server_name,
std::unique_ptr<ServiceConfigWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error)
: build_version_(GenerateBuildVersionString()),
combiner_(GRPC_COMBINER_REF(combiner, "xds_client")),
combiner_(std::move(combiner)),
interested_parties_(interested_parties),
bootstrap_(XdsBootstrap::ReadFromFile(error)),
server_name_(StringViewToCString(server_name)),
@ -1275,14 +1290,12 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
if (service_config_watcher_ != nullptr) {
// TODO(juanlishen): Start LDS call and do not return service config
// until we get the first LDS response.
GRPC_CLOSURE_INIT(&service_config_notify_, NotifyOnServiceConfig,
Ref().release(), nullptr);
combiner_->Run(&service_config_notify_, GRPC_ERROR_NONE);
XdsClient* self = Ref().release();
combiner_->Run([self]() { NotifyOnServiceConfig(self, GRPC_ERROR_NONE); },
DEBUG_LOCATION);
}
}
XdsClient::~XdsClient() { GRPC_COMBINER_UNREF(combiner_, "xds_client"); }
void XdsClient::Orphan() {
shutting_down_ = true;
chand_.reset();

@ -72,11 +72,10 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
// If *error is not GRPC_ERROR_NONE after construction, then there was
// an error initializing the client.
XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
StringView server_name,
XdsClient(RefCountedPtr<LogicalThread> combiner,
grpc_pollset_set* interested_parties, StringView server_name,
std::unique_ptr<ServiceConfigWatcherInterface> watcher,
const grpc_channel_args& channel_args, grpc_error** error);
~XdsClient();
void Orphan() override;
@ -196,14 +195,14 @@ class XdsClient : public InternallyRefCounted<XdsClient> {
static const grpc_arg_pointer_vtable kXdsClientVtable;
grpc_core::UniquePtr<char> build_version_;
UniquePtr<char> build_version_;
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
grpc_pollset_set* interested_parties_;
std::unique_ptr<XdsBootstrap> bootstrap_;
grpc_core::UniquePtr<char> server_name_;
UniquePtr<char> server_name_;
std::unique_ptr<ServiceConfigWatcherInterface> service_config_watcher_;
// TODO(juanlishen): Once we implement LDS support, this will no
// longer be needed.

@ -23,6 +23,7 @@
#include <assert.h>
#include <stdbool.h>
#include <functional>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@ -250,6 +251,11 @@ class Closure {
#endif
GRPC_ERROR_UNREF(error);
}
static std::function<void()> ToFunction(grpc_closure* closure,
grpc_error* error) {
return [closure, error] { Run(DEBUG_LOCATION, closure, error); };
}
};
} // namespace grpc_core

@ -33,8 +33,8 @@ struct CallbackWrapper {
const DebugLocation location;
};
void LogicalThread::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
void LogicalThreadImpl::Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::Run() %p Scheduling callback [%s:%d]",
this, location.file(), location.line());
@ -61,11 +61,23 @@ void LogicalThread::Run(std::function<void()> callback,
}
}
void LogicalThreadImpl::Orphan() {
ExecCtx::Run(DEBUG_LOCATION,
GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error* /*error*/) {
LogicalThreadImpl* self =
static_cast<LogicalThreadImpl*>(arg);
delete self;
},
this, nullptr),
GRPC_ERROR_NONE);
}
// The thread that calls this loans itself to the logical thread so as to
// execute all the scheduled callback. This is called from within
// LogicalThread::Run() after executing a callback immediately, and hence size_
// is atleast 1.
void LogicalThread::DrainQueue() {
void LogicalThreadImpl::DrainQueue() {
while (true) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_logical_thread_trace)) {
gpr_log(GPR_INFO, "LogicalThread::DrainQueue() %p", this);

@ -24,7 +24,9 @@
#include "src/core/lib/gprpp/atomic.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/mpscq.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#ifndef GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
#define GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H
@ -32,21 +34,37 @@
namespace grpc_core {
extern DebugOnlyTraceFlag grpc_logical_thread_trace;
// LogicalThread is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a LogicalThread instance will be executed serially
// in a borrowed thread. The API provides a FIFO guarantee to the execution of
// callbacks scheduled on the thread.
class LogicalThread : public RefCounted<LogicalThread> {
class LogicalThreadImpl : public Orphanable {
public:
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location);
void Orphan() override;
private:
void DrainQueue();
Atomic<size_t> size_{0};
MultiProducerSingleConsumerQueue queue_;
};
// LogicalThread is a mechanism to schedule callbacks in a synchronized manner.
// All callbacks scheduled on a LogicalThread instance will be executed serially
// in a borrowed thread. The API provides a FIFO guarantee to the execution of
// callbacks scheduled on the thread.
class LogicalThread : public RefCounted<LogicalThread> {
public:
LogicalThread() { impl_ = MakeOrphanable<LogicalThreadImpl>(); }
void Run(std::function<void()> callback,
const grpc_core::DebugLocation& location) {
impl_->Run(callback, location);
}
private:
OrphanablePtr<LogicalThreadImpl> impl_;
};
} /* namespace grpc_core */
#endif /* GRPC_CORE_LIB_IOMGR_LOGICAL_THREAD_H */

@ -58,12 +58,15 @@ const char* ConnectivityStateName(grpc_connectivity_state state) {
class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state, Combiner* combiner)
grpc_connectivity_state state,
const RefCountedPtr<LogicalThread>& combiner)
: watcher_(std::move(watcher)), state_(state) {
if (combiner != nullptr) {
combiner->Run(
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr),
GRPC_ERROR_NONE);
Closure::ToFunction(
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, nullptr),
GRPC_ERROR_NONE),
DEBUG_LOCATION);
} else {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);

@ -29,6 +29,7 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/logical_thread.h"
namespace grpc_core {
@ -70,14 +71,15 @@ class AsyncConnectivityStateWatcherInterface
// If \a combiner is nullptr, then the notification will be scheduled on the
// ExecCtx.
explicit AsyncConnectivityStateWatcherInterface(Combiner* combiner = nullptr)
: combiner_(combiner) {}
explicit AsyncConnectivityStateWatcherInterface(
RefCountedPtr<LogicalThread> combiner = nullptr)
: combiner_(std::move(combiner)) {}
// Invoked asynchronously when Notify() is called.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;
private:
Combiner* combiner_;
RefCountedPtr<LogicalThread> combiner_;
};
// Tracks connectivity state. Maintains a list of watchers that are

@ -26,14 +26,14 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/timer.h"
#include "test/core/util/test_config.h"
static gpr_mu g_mu;
static bool g_fail_resolution = true;
static grpc_core::Combiner* g_combiner;
static grpc_core::RefCountedPtr<grpc_core::LogicalThread>* g_combiner;
static void my_resolve_address(const char* addr, const char* /*default_port*/,
grpc_pollset_set* /*interested_parties*/,
@ -65,7 +65,8 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
bool /*check_grpclb*/, char** /*service_config_json*/,
int /*query_timeout_ms*/, grpc_core::Combiner* /*combiner*/) {
int /*query_timeout_ms*/,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> /*combiner*/) {
gpr_mu_lock(&g_mu);
GPR_ASSERT(0 == strcmp("test", addr));
grpc_error* error = GRPC_ERROR_NONE;
@ -98,7 +99,7 @@ static grpc_core::OrphanablePtr<grpc_core::Resolver> create_resolver(
GPR_ASSERT(uri);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = std::move(result_handler);
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -160,13 +161,13 @@ int main(int argc, char** argv) {
grpc_init();
gpr_mu_init(&g_mu);
g_combiner = grpc_combiner_create();
grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
{
grpc_core::ExecCtx exec_ctx;
auto combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_combiner = &combiner;
grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_cancel_ares_request_locked = my_cancel_ares_request_locked;
ResultHandler* result_handler = new ResultHandler();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver = create_resolver(
"dns:test",
@ -185,8 +186,6 @@ int main(int argc, char** argv) {
GPR_ASSERT(wait_loop(30, &output2.ev));
GPR_ASSERT(!output2.result.addresses.empty());
GPR_ASSERT(output2.error == GRPC_ERROR_NONE);
GRPC_COMBINER_UNREF(g_combiner, "test");
}
grpc_shutdown();

@ -26,7 +26,7 @@
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "test/core/util/test_config.h"
@ -37,14 +37,14 @@ constexpr int kMinResolutionPeriodForCheckMs = 900;
extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
static grpc_address_resolver_vtable* default_resolve_address;
static grpc_core::Combiner* g_combiner;
static grpc_core::RefCountedPtr<grpc_core::LogicalThread>* g_combiner;
static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner);
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner);
// Counter incremented by test_resolve_address_impl indicating the number of
// times a system-level resolution has happened.
@ -95,10 +95,11 @@ static grpc_ares_request* test_dns_lookup_ares_locked(
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) {
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
grpc_ares_request* result = g_default_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done,
addresses, check_grpclb, service_config_json, query_timeout_ms, combiner);
addresses, check_grpclb, service_config_json, query_timeout_ms,
std::move(combiner));
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
grpc_millis now =
@ -271,7 +272,7 @@ static void on_first_resolution(OnResolutionCallbackArg* cb_arg) {
gpr_mu_unlock(g_iomgr_args.mu);
}
static void start_test_under_combiner(void* arg, grpc_error* /*error*/) {
static void start_test_under_combiner(void* arg) {
OnResolutionCallbackArg* res_cb_arg =
static_cast<OnResolutionCallbackArg*>(arg);
res_cb_arg->result_handler = new ResultHandler();
@ -283,7 +284,7 @@ static void start_test_under_combiner(void* arg, grpc_error* /*error*/) {
GPR_ASSERT(uri != nullptr);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = std::unique_ptr<grpc_core::Resolver::ResultHandler>(
res_cb_arg->result_handler);
g_resolution_count = 0;
@ -307,9 +308,9 @@ static void test_cooldown() {
OnResolutionCallbackArg* res_cb_arg = new OnResolutionCallbackArg();
res_cb_arg->uri_str = "dns:127.0.0.1";
g_combiner->Run(
GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, nullptr),
GRPC_ERROR_NONE);
(*g_combiner)
->Run([res_cb_arg]() { start_test_under_combiner(res_cb_arg); },
DEBUG_LOCATION);
grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&g_iomgr_args);
iomgr_args_finish(&g_iomgr_args);
@ -319,17 +320,18 @@ int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
g_combiner = grpc_combiner_create();
g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked;
default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver);
test_cooldown();
{
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
auto combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_combiner = &combiner;
g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked;
default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver);
test_cooldown();
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown_blocking();
GPR_ASSERT(g_all_callbacks_invoked);

@ -28,7 +28,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "test/core/util/test_config.h"
static grpc_core::Combiner* g_combiner;
static grpc_core::RefCountedPtr<grpc_core::LogicalThread>* g_combiner;
class TestResultHandler : public grpc_core::Resolver::ResultHandler {
void ReturnResult(grpc_core::Resolver::Result /*result*/) override {}
@ -44,7 +44,7 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
GPR_ASSERT(uri);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = grpc_core::MakeUnique<TestResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -61,7 +61,7 @@ static void test_fails(grpc_core::ResolverFactory* factory,
GPR_ASSERT(uri);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = grpc_core::MakeUnique<TestResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -72,26 +72,27 @@ static void test_fails(grpc_core::ResolverFactory* factory,
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
g_combiner = grpc_combiner_create();
grpc_core::ResolverFactory* dns =
grpc_core::ResolverRegistry::LookupResolverFactory("dns");
test_succeeds(dns, "dns:10.2.1.1");
test_succeeds(dns, "dns:10.2.1.1:1234");
test_succeeds(dns, "dns:www.google.com");
test_succeeds(dns, "dns:///www.google.com");
grpc_core::UniquePtr<char> resolver =
GPR_GLOBAL_CONFIG_GET(grpc_dns_resolver);
if (gpr_stricmp(resolver.get(), "native") == 0) {
test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888");
} else {
test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");
}
{
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
{
auto combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_combiner = &combiner;
grpc_core::ResolverFactory* dns =
grpc_core::ResolverRegistry::LookupResolverFactory("dns");
test_succeeds(dns, "dns:10.2.1.1");
test_succeeds(dns, "dns:10.2.1.1:1234");
test_succeeds(dns, "dns:www.google.com");
test_succeeds(dns, "dns:///www.google.com");
grpc_core::UniquePtr<char> resolver =
GPR_GLOBAL_CONFIG_GET(grpc_dns_resolver);
if (gpr_stricmp(resolver.get(), "native") == 0) {
test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888");
} else {
test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");
}
}
}
grpc_shutdown();

@ -28,7 +28,7 @@
#include "src/core/ext/filters/client_channel/server_address.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "test/core/util/test_config.h"
@ -63,7 +63,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
};
static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
grpc_core::Combiner* combiner,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner,
grpc_core::FakeResolverResponseGenerator* response_generator,
std::unique_ptr<grpc_core::Resolver::ResultHandler> result_handler) {
grpc_core::ResolverFactory* factory =
@ -74,7 +74,7 @@ static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
grpc_channel_args channel_args = {1, &generator_arg};
grpc_core::ResolverArgs args;
args.args = &channel_args;
args.combiner = combiner;
args.combiner = std::move(combiner);
args.result_handler = std::move(result_handler);
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -118,7 +118,8 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
static void test_fake_resolver() {
grpc_core::ExecCtx exec_ctx;
grpc_core::Combiner* combiner = grpc_combiner_create();
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner =
grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
// Create resolver.
ResultHandler* result_handler = new ResultHandler();
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>
@ -206,7 +207,6 @@ static void test_fake_resolver() {
nullptr);
// Clean up.
resolver.reset();
GRPC_COMBINER_UNREF(combiner, "test_fake_resolver");
}
int main(int argc, char** argv) {

@ -24,11 +24,11 @@
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "test/core/util/test_config.h"
static grpc_core::Combiner* g_combiner;
static grpc_core::RefCountedPtr<grpc_core::LogicalThread>* g_combiner;
class ResultHandler : public grpc_core::Resolver::ResultHandler {
public:
@ -46,7 +46,7 @@ static void test_succeeds(grpc_core::ResolverFactory* factory,
GPR_ASSERT(uri);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = grpc_core::MakeUnique<ResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -67,7 +67,7 @@ static void test_fails(grpc_core::ResolverFactory* factory,
GPR_ASSERT(uri);
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
args.combiner = *g_combiner;
args.result_handler = grpc_core::MakeUnique<ResultHandler>();
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver =
factory->CreateResolver(std::move(args));
@ -78,31 +78,30 @@ static void test_fails(grpc_core::ResolverFactory* factory,
int main(int argc, char** argv) {
grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
g_combiner = grpc_combiner_create();
grpc_core::ResolverFactory* ipv4 =
grpc_core::ResolverRegistry::LookupResolverFactory("ipv4");
grpc_core::ResolverFactory* ipv6 =
grpc_core::ResolverRegistry::LookupResolverFactory("ipv6");
test_fails(ipv4, "ipv4:10.2.1.1");
test_succeeds(ipv4, "ipv4:10.2.1.1:1234");
test_succeeds(ipv4, "ipv4:10.2.1.1:1234,127.0.0.1:4321");
test_fails(ipv4, "ipv4:10.2.1.1:123456");
test_fails(ipv4, "ipv4:www.google.com");
test_fails(ipv4, "ipv4:[");
test_fails(ipv4, "ipv4://8.8.8.8/8.8.8.8:8888");
test_fails(ipv6, "ipv6:[");
test_fails(ipv6, "ipv6:[::]");
test_succeeds(ipv6, "ipv6:[::]:1234");
test_fails(ipv6, "ipv6:[::]:123456");
test_fails(ipv6, "ipv6:www.google.com");
{
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
auto combiner = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
g_combiner = &combiner;
grpc_core::ResolverFactory* ipv4 =
grpc_core::ResolverRegistry::LookupResolverFactory("ipv4");
grpc_core::ResolverFactory* ipv6 =
grpc_core::ResolverRegistry::LookupResolverFactory("ipv6");
test_fails(ipv4, "ipv4:10.2.1.1");
test_succeeds(ipv4, "ipv4:10.2.1.1:1234");
test_succeeds(ipv4, "ipv4:10.2.1.1:1234,127.0.0.1:4321");
test_fails(ipv4, "ipv4:10.2.1.1:123456");
test_fails(ipv4, "ipv4:www.google.com");
test_fails(ipv4, "ipv4:[");
test_fails(ipv4, "ipv4://8.8.8.8/8.8.8.8:8888");
test_fails(ipv6, "ipv6:[");
test_fails(ipv6, "ipv6:[::]");
test_succeeds(ipv6, "ipv6:[::]:1234");
test_fails(ipv6, "ipv6:[::]:123456");
test_fails(ipv6, "ipv6:www.google.com");
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown();

@ -379,7 +379,8 @@ grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses,
bool /*check_grpclb*/, char** /*service_config_json*/,
int /*query_timeout*/, grpc_core::Combiner* /*combiner*/) {
int /*query_timeout*/,
grpc_core::RefCountedPtr<grpc_core::LogicalThread> /*combiner*/) {
addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r)));
r->addr = gpr_strdup(addr);
r->on_done = on_done;

@ -49,7 +49,7 @@ static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner);
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner);
static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request);
@ -106,11 +106,12 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* interested_parties, grpc_closure* on_done,
std::unique_ptr<grpc_core::ServerAddressList>* addresses, bool check_grpclb,
char** service_config_json, int query_timeout_ms,
grpc_core::Combiner* combiner) {
grpc_core::RefCountedPtr<grpc_core::LogicalThread> combiner) {
if (0 != strcmp(addr, "test")) {
return iomgr_dns_lookup_ares_locked(
dns_server, addr, default_port, interested_parties, on_done, addresses,
check_grpclb, service_config_json, query_timeout_ms, combiner);
return iomgr_dns_lookup_ares_locked(dns_server, addr, default_port,
interested_parties, on_done, addresses,
check_grpclb, service_config_json,
query_timeout_ms, std::move(combiner));
}
grpc_error* error = GRPC_ERROR_NONE;

@ -31,16 +31,20 @@
namespace {
TEST(LogicalThreadTest, NoOp) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
grpc_core::ExecCtx exec_ctx;
{ auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>(); }
}
TEST(LogicalThreadTest, ExecuteOne) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_event done;
gpr_event_init(&done);
lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
grpc_core::ExecCtx exec_ctx;
{
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_event done;
gpr_event_init(&done);
lock->Run([&done]() { gpr_event_set(&done, (void*)1); }, DEBUG_LOCATION);
EXPECT_TRUE(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr);
}
}
class TestThread {
@ -93,11 +97,14 @@ class TestThread {
};
TEST(LogicalThreadTest, ExecuteMany) {
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
grpc_core::ExecCtx exec_ctx;
{
std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < 100; ++i) {
threads.push_back(std::unique_ptr<TestThread>(new TestThread(lock)));
auto lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
{
std::vector<std::unique_ptr<TestThread>> threads;
for (size_t i = 0; i < 100; ++i) {
threads.push_back(std::unique_ptr<TestThread>(new TestThread(lock)));
}
}
}
}

@ -36,7 +36,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "test/core/end2end/cq_verifier.h"
@ -81,7 +81,7 @@ struct ArgsStruct {
gpr_mu* mu;
grpc_pollset* pollset;
grpc_pollset_set* pollset_set;
grpc_core::Combiner* lock;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock;
grpc_channel_args* channel_args;
};
@ -90,7 +90,7 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = grpc_combiner_create();
args->lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_atm_rel_store(&args->done_atm, 0);
args->channel_args = nullptr;
}
@ -109,7 +109,6 @@ void ArgsFinish(ArgsStruct* args) {
grpc_core::ExecCtx::Get()->Flush();
grpc_pollset_destroy(args->pollset);
gpr_free(args->pollset);
GRPC_COMBINER_UNREF(args->lock, nullptr);
}
void PollPollsetUntilRequestDone(ArgsStruct* args) {

@ -49,9 +49,9 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/logical_thread.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_utils.h"
@ -192,7 +192,7 @@ struct ArgsStruct {
gpr_mu* mu;
grpc_pollset* pollset;
grpc_pollset_set* pollset_set;
grpc_core::Combiner* lock;
grpc_core::RefCountedPtr<grpc_core::LogicalThread> lock;
grpc_channel_args* channel_args;
vector<GrpcLBAddress> expected_addrs;
std::string expected_service_config_string;
@ -206,7 +206,7 @@ void ArgsInit(ArgsStruct* args) {
grpc_pollset_init(args->pollset, &args->mu);
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = grpc_combiner_create();
args->lock = grpc_core::MakeRefCounted<grpc_core::LogicalThread>();
gpr_atm_rel_store(&args->done_atm, 0);
args->channel_args = nullptr;
}
@ -226,7 +226,6 @@ void ArgsFinish(ArgsStruct* args) {
grpc_core::ExecCtx::Get()->Flush();
grpc_pollset_destroy(args->pollset);
gpr_free(args->pollset);
GRPC_COMBINER_UNREF(args->lock, nullptr);
}
gpr_timespec NSecondDeadline(int seconds) {
@ -534,10 +533,7 @@ void InjectBrokenNameServerList(ares_channel channel) {
GPR_ASSERT(ares_set_servers_ports(channel, dns_server_addrs) == ARES_SUCCESS);
}
void StartResolvingLocked(void* arg, grpc_error* /*unused*/) {
grpc_core::Resolver* r = static_cast<grpc_core::Resolver*>(arg);
r->StartLocked();
}
void StartResolvingLocked(grpc_core::Resolver* r) { r->StartLocked(); }
void RunResolvesRelevantRecordsTest(
std::unique_ptr<grpc_core::Resolver::ResultHandler> (*CreateResultHandler)(
@ -616,9 +612,9 @@ void RunResolvesRelevantRecordsTest(
CreateResultHandler(&args));
grpc_channel_args_destroy(resolver_args);
gpr_free(whole_uri);
args.lock->Run(
GRPC_CLOSURE_CREATE(StartResolvingLocked, resolver.get(), nullptr),
GRPC_ERROR_NONE);
auto* resolver_ptr = resolver.get();
args.lock->Run([resolver_ptr]() { StartResolvingLocked(resolver_ptr); },
DEBUG_LOCATION);
grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args);
ArgsFinish(&args);

Loading…
Cancel
Save