diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 35c3efab6aa..38525dbf97e 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -394,7 +394,7 @@ struct subchannel_batch_data { gpr_refcount refs; grpc_call_element* elem; - grpc_subchannel_call* subchannel_call; // Holds a ref. + grpc_core::RefCountedPtr subchannel_call; // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; @@ -478,7 +478,7 @@ struct pending_batch { bool send_ops_cached; }; -/** Call data. Holds a pointer to grpc_subchannel_call and the +/** Call data. Holds a pointer to SubchannelCall and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting for initial metadata before trying to create a call object, @@ -504,10 +504,6 @@ struct call_data { last_attempt_got_server_pushback(false) {} ~call_data() { - if (GPR_LIKELY(subchannel_call != nullptr)) { - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, - "client_channel_destroy_call"); - } grpc_slice_unref_internal(path); GRPC_ERROR_UNREF(cancel_error); for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { @@ -536,7 +532,7 @@ struct call_data { grpc_core::RefCountedPtr retry_throttle_data; grpc_core::RefCountedPtr method_params; - grpc_subchannel_call* subchannel_call = nullptr; + grpc_core::RefCountedPtr subchannel_call; // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; @@ -807,8 +803,8 @@ static void pending_batches_add(grpc_call_element* elem, calld->subchannel_call == nullptr ? nullptr : static_cast( - grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)); + + calld->subchannel_call->GetParentData()); retry_commit(elem, retry_state); // If we are not going to retry and have not yet started, pretend // retries are disabled so that we don't bother with retry overhead. @@ -896,10 +892,10 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); - grpc_subchannel_call* subchannel_call = - static_cast(batch->handler_private.extra_arg); + grpc_core::SubchannelCall* subchannel_call = + static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. - grpc_subchannel_call_process_op(subchannel_call, batch); + subchannel_call->StartTransportStreamOpBatch(batch); } // This is called via the call combiner, so access to calld is synchronized. @@ -919,7 +915,7 @@ static void pending_batches_resume(grpc_call_element* elem) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " pending batches on subchannel_call=%p", - chand, calld, num_batches, calld->subchannel_call); + chand, calld, num_batches, calld->subchannel_call.get()); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { @@ -930,7 +926,7 @@ static void pending_batches_resume(grpc_call_element* elem) { maybe_inject_recv_trailing_metadata_ready_for_lb( *calld->request->pick(), batch); } - batch->handler_private.extra_arg = calld->subchannel_call; + batch->handler_private.extra_arg = calld->subchannel_call.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); @@ -1019,12 +1015,7 @@ static void do_retry(grpc_call_element* elem, const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); GPR_ASSERT(retry_policy != nullptr); - // Reset subchannel call and connected subchannel. - if (calld->subchannel_call != nullptr) { - GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, - "client_channel_call_retry"); - calld->subchannel_call = nullptr; - } + calld->subchannel_call.reset(); if (calld->have_request) { calld->have_request = false; calld->request.Destroy(); @@ -1078,8 +1069,7 @@ static bool maybe_retry(grpc_call_element* elem, subchannel_call_retry_state* retry_state = nullptr; if (batch_data != nullptr) { retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); if (retry_state->retry_dispatched) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, @@ -1180,13 +1170,10 @@ namespace { subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) - : elem(elem), - subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, - "batch_data_create")) { + : elem(elem), subchannel_call(calld->subchannel_call) { subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)); + calld->subchannel_call->GetParentData()); batch.payload = &retry_state->batch_payload; gpr_ref_init(&refs, refcount); if (set_on_complete) { @@ -1200,7 +1187,7 @@ subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, void subchannel_batch_data::destroy() { subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data(subchannel_call)); + subchannel_call->GetParentData()); if (batch.send_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } @@ -1213,7 +1200,7 @@ void subchannel_batch_data::destroy() { if (batch.recv_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); + subchannel_call.reset(); call_data* calld = static_cast(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } @@ -1260,8 +1247,7 @@ static void invoke_recv_initial_metadata_callback(void* arg, // Return metadata. subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); grpc_metadata_batch_move( &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); @@ -1293,8 +1279,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); retry_state->completed_recv_initial_metadata = true; // If a retry was already dispatched, then we're not going to use the // result of this recv_initial_metadata op, so do nothing. @@ -1355,8 +1340,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { // Return payload. subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); *pending->batch->payload->recv_message.recv_message = std::move(retry_state->recv_message); // Update bookkeeping. @@ -1384,8 +1368,7 @@ static void recv_message_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); ++retry_state->completed_recv_message_count; // If a retry was already dispatched, then we're not going to use the // result of this recv_message op, so do nothing. @@ -1473,8 +1456,7 @@ static void add_closure_for_recv_trailing_metadata_ready( // Return metadata. subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); grpc_metadata_batch_move( &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); @@ -1576,8 +1558,7 @@ static void run_closures_for_completed_call(subchannel_batch_data* batch_data, call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. @@ -1611,8 +1592,7 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); retry_state->completed_recv_trailing_metadata = true; // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; @@ -1735,8 +1715,7 @@ static void on_complete(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - batch_data->subchannel_call)); + batch_data->subchannel_call->GetParentData()); // Update bookkeeping in retry_state. if (batch_data->batch.send_initial_metadata) { retry_state->completed_send_initial_metadata = true; @@ -1792,10 +1771,10 @@ static void on_complete(void* arg, grpc_error* error) { static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); - grpc_subchannel_call* subchannel_call = - static_cast(batch->handler_private.extra_arg); + grpc_core::SubchannelCall* subchannel_call = + static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. - grpc_subchannel_call_process_op(subchannel_call, batch); + subchannel_call->StartTransportStreamOpBatch(batch); } // Adds a closure to closures that will execute batch in the call combiner. @@ -1804,7 +1783,7 @@ static void add_closure_for_subchannel_batch( grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); - batch->handler_private.extra_arg = calld->subchannel_call; + batch->handler_private.extra_arg = calld->subchannel_call.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); @@ -1978,8 +1957,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)); + calld->subchannel_call->GetParentData()); // Create batch_data with 2 refs, since this batch will be unreffed twice: // once for the recv_trailing_metadata_ready callback when the subchannel // batch returns, and again when we actually get a recv_trailing_metadata @@ -1989,7 +1967,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); + calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch); } // If there are any cached send ops that need to be replayed on the @@ -2196,8 +2174,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { } subchannel_call_retry_state* retry_state = static_cast( - grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)); + calld->subchannel_call->GetParentData()); // Construct list of closures to execute, one for each pending batch. grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. @@ -2220,7 +2197,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, closures.size(), calld->subchannel_call); + chand, calld, closures.size(), calld->subchannel_call.get()); } // Note: This will yield the call combiner. closures.RunClosures(calld->call_combiner); @@ -2245,22 +2222,22 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { calld->call_combiner, // call_combiner parent_data_size // parent_data_size }; - grpc_error* new_error = - calld->request->pick()->connected_subchannel->CreateCall( - call_args, &calld->subchannel_call); + grpc_error* new_error = GRPC_ERROR_NONE; + calld->subchannel_call = + calld->request->pick()->connected_subchannel->CreateCall(call_args, + &new_error); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", - chand, calld, calld->subchannel_call, grpc_error_string(new_error)); + chand, calld, calld->subchannel_call.get(), + grpc_error_string(new_error)); } if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { if (parent_data_size > 0) { - new (grpc_connected_subchannel_call_get_parent_data( - calld->subchannel_call)) - subchannel_call_retry_state( - calld->request->pick()->subchannel_call_context); + new (calld->subchannel_call->GetParentData()) subchannel_call_retry_state( + calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } @@ -2488,7 +2465,7 @@ static void cc_start_transport_stream_op_batch( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); } else { // Note: This will release the call combiner. - grpc_subchannel_call_process_op(calld->subchannel_call, batch); + calld->subchannel_call->StartTransportStreamOpBatch(batch); } return; } @@ -2502,7 +2479,7 @@ static void cc_start_transport_stream_op_batch( if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, - calld, calld->subchannel_call); + calld, calld->subchannel_call.get()); } pending_batches_resume(elem); return; @@ -2545,8 +2522,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, grpc_closure* then_schedule_closure) { call_data* calld = static_cast(elem->call_data); if (GPR_LIKELY(calld->subchannel_call != nullptr)) { - grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, - then_schedule_closure); + calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure); then_schedule_closure = nullptr; } calld->~call_data(); @@ -2752,8 +2728,8 @@ void grpc_client_channel_watch_connectivity_state( GRPC_ERROR_NONE); } -grpc_subchannel_call* grpc_client_channel_get_subchannel_call( - grpc_call_element* elem) { +grpc_core::RefCountedPtr +grpc_client_channel_get_subchannel_call(grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); return calld->subchannel_call; } diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 4935fd24d87..5bfff4df9cd 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -60,7 +60,7 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure* watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ -grpc_subchannel_call* grpc_client_channel_get_subchannel_call( - grpc_call_element* elem); +grpc_core::RefCountedPtr +grpc_client_channel_get_subchannel_call(grpc_call_element* elem); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 8e5426081c4..76c5a786240 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -113,12 +113,11 @@ RefCountedPtr ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } -SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, +SubchannelNode::SubchannelNode(Subchannel* subchannel, size_t channel_tracer_max_nodes) : BaseNode(EntityType::kSubchannel), subchannel_(subchannel), - target_( - UniquePtr(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), + target_(UniquePtr(gpr_strdup(subchannel_->GetTargetAddress()))), trace_(channel_tracer_max_nodes) {} SubchannelNode::~SubchannelNode() {} @@ -128,8 +127,8 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) { if (subchannel_ == nullptr) { state = GRPC_CHANNEL_SHUTDOWN; } else { - state = grpc_subchannel_check_connectivity( - subchannel_, nullptr, true /* inhibit_health_checking */); + state = subchannel_->CheckConnectivity(nullptr, + true /* inhibit_health_checking */); } json = grpc_json_create_child(nullptr, json, "state", nullptr, GRPC_JSON_OBJECT, false); @@ -170,7 +169,7 @@ grpc_json* SubchannelNode::RenderJson() { call_counter_.PopulateCallCounts(json); json = top_level_json; // populate the child socket. - intptr_t socket_uuid = grpc_subchannel_get_child_socket_uuid(subchannel_); + intptr_t socket_uuid = subchannel_->GetChildSocketUuid(); if (socket_uuid != 0) { grpc_json* array_parent = grpc_json_create_child( nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false); diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 8a5c3e7e5e5..1dc1bf595be 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -26,9 +26,10 @@ #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" -typedef struct grpc_subchannel grpc_subchannel; - namespace grpc_core { + +class Subchannel; + namespace channelz { // Subtype of ChannelNode that overrides and provides client_channel specific @@ -59,7 +60,7 @@ class ClientChannelNode : public ChannelNode { // Handles channelz bookkeeping for sockets class SubchannelNode : public BaseNode { public: - SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); + SubchannelNode(Subchannel* subchannel, size_t channel_tracer_max_nodes); ~SubchannelNode() override; void MarkSubchannelDestroyed() { @@ -84,7 +85,7 @@ class SubchannelNode : public BaseNode { void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - grpc_subchannel* subchannel_; + Subchannel* subchannel_; UniquePtr target_; CallCountingHelper call_counter_; ChannelTrace trace_; diff --git a/src/core/ext/filters/client_channel/client_channel_factory.cc b/src/core/ext/filters/client_channel/client_channel_factory.cc index 130bbe04180..8c558382fdf 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.cc +++ b/src/core/ext/filters/client_channel/client_channel_factory.cc @@ -29,7 +29,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) { factory->vtable->unref(factory); } -grpc_subchannel* grpc_client_channel_factory_create_subchannel( +grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( grpc_client_channel_factory* factory, const grpc_channel_args* args) { return factory->vtable->create_subchannel(factory, args); } diff --git a/src/core/ext/filters/client_channel/client_channel_factory.h b/src/core/ext/filters/client_channel/client_channel_factory.h index 91dec12282f..4b72aa46499 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.h +++ b/src/core/ext/filters/client_channel/client_channel_factory.h @@ -48,8 +48,8 @@ struct grpc_client_channel_factory { struct grpc_client_channel_factory_vtable { void (*ref)(grpc_client_channel_factory* factory); void (*unref)(grpc_client_channel_factory* factory); - grpc_subchannel* (*create_subchannel)(grpc_client_channel_factory* factory, - const grpc_channel_args* args); + grpc_core::Subchannel* (*create_subchannel)( + grpc_client_channel_factory* factory, const grpc_channel_args* args); grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory, const char* target, grpc_client_channel_type type, @@ -60,7 +60,7 @@ void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory); void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory); /** Create a new grpc_subchannel */ -grpc_subchannel* grpc_client_channel_factory_create_subchannel( +grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( grpc_client_channel_factory* factory, const grpc_channel_args* args); /** Create a new grpc_channel */ diff --git a/src/core/ext/filters/client_channel/global_subchannel_pool.cc b/src/core/ext/filters/client_channel/global_subchannel_pool.cc index a41d993fe66..ee6e58159a0 100644 --- a/src/core/ext/filters/client_channel/global_subchannel_pool.cc +++ b/src/core/ext/filters/client_channel/global_subchannel_pool.cc @@ -54,9 +54,9 @@ RefCountedPtr GlobalSubchannelPool::instance() { return *instance_; } -grpc_subchannel* GlobalSubchannelPool::RegisterSubchannel( - SubchannelKey* key, grpc_subchannel* constructed) { - grpc_subchannel* c = nullptr; +Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key, + Subchannel* constructed) { + Subchannel* c = nullptr; // Compare and swap (CAS) loop: while (c == nullptr) { // Ref the shared map to have a local copy. @@ -64,7 +64,7 @@ grpc_subchannel* GlobalSubchannelPool::RegisterSubchannel( grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr); gpr_mu_unlock(&mu_); // Check to see if a subchannel already exists. - c = static_cast(grpc_avl_get(old_map, key, nullptr)); + c = static_cast(grpc_avl_get(old_map, key, nullptr)); if (c != nullptr) { // The subchannel already exists. Reuse it. c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "subchannel_register+reuse"); @@ -121,15 +121,14 @@ void GlobalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) { } } -grpc_subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) { +Subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) { // Lock, and take a reference to the subchannel map. // We don't need to do the search under a lock as AVL's are immutable. gpr_mu_lock(&mu_); grpc_avl index = grpc_avl_ref(subchannel_map_, nullptr); gpr_mu_unlock(&mu_); - grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( - static_cast(grpc_avl_get(index, key, nullptr)), - "found_from_pool"); + Subchannel* c = static_cast(grpc_avl_get(index, key, nullptr)); + if (c != nullptr) GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "found_from_pool"); grpc_avl_unref(index, nullptr); return c; } @@ -156,11 +155,11 @@ long sck_avl_compare(void* a, void* b, void* unused) { } void scv_avl_destroy(void* p, void* user_data) { - GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "global_subchannel_pool"); + GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool"); } void* scv_avl_copy(void* p, void* unused) { - GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "global_subchannel_pool"); + GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool"); return p; } diff --git a/src/core/ext/filters/client_channel/global_subchannel_pool.h b/src/core/ext/filters/client_channel/global_subchannel_pool.h index 0deb3769360..96dc8d7b3a4 100644 --- a/src/core/ext/filters/client_channel/global_subchannel_pool.h +++ b/src/core/ext/filters/client_channel/global_subchannel_pool.h @@ -45,10 +45,10 @@ class GlobalSubchannelPool final : public SubchannelPoolInterface { static RefCountedPtr instance(); // Implements interface methods. - grpc_subchannel* RegisterSubchannel(SubchannelKey* key, - grpc_subchannel* constructed) override; + Subchannel* RegisterSubchannel(SubchannelKey* key, + Subchannel* constructed) override; void UnregisterSubchannel(SubchannelKey* key) override; - grpc_subchannel* FindSubchannel(SubchannelKey* key) override; + Subchannel* FindSubchannel(SubchannelKey* key) override; private: // The singleton instance. (It's a pointer to RefCountedPtr so that this diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index 2232c57120e..e845d63d295 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -295,7 +295,9 @@ HealthCheckClient::CallState::~CallState() { gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p", health_check_client_.get(), this); } - if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended"); + // The subchannel call is in the arena, so reset the pointer before we destroy + // the arena. + call_.reset(); for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (context_[i].destroy != nullptr) { context_[i].destroy(context_[i].value); @@ -329,8 +331,8 @@ void HealthCheckClient::CallState::StartCall() { &call_combiner_, 0, // parent_data_size }; - grpc_error* error = - health_check_client_->connected_subchannel_->CreateCall(args, &call_); + grpc_error* error = GRPC_ERROR_NONE; + call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "HealthCheckClient %p CallState %p: error creating health " @@ -423,14 +425,14 @@ void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); - grpc_subchannel_call* call = - static_cast(batch->handler_private.extra_arg); - grpc_subchannel_call_process_op(call, batch); + SubchannelCall* call = + static_cast(batch->handler_private.extra_arg); + call->StartTransportStreamOpBatch(batch); } void HealthCheckClient::CallState::StartBatch( grpc_transport_stream_op_batch* batch) { - batch->handler_private.extra_arg = call_; + batch->handler_private.extra_arg = call_.get(); GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, batch, grpc_schedule_on_exec_ctx); GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure, @@ -452,7 +454,7 @@ void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) { GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx)); batch->cancel_stream = true; batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - grpc_subchannel_call_process_op(self->call_, batch); + self->call_->StartTransportStreamOpBatch(batch); } void HealthCheckClient::CallState::Cancel() { diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 2369b73feac..7af88a54cfc 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -99,7 +99,7 @@ class HealthCheckClient : public InternallyRefCounted { grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; // The streaming call to the backend. Always non-NULL. - grpc_subchannel_call* call_; + RefCountedPtr call_; grpc_transport_stream_op_batch_payload payload_; grpc_transport_stream_op_batch batch_; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index ec5c782c469..dc716a6adac 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -79,7 +79,7 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, grpc_subchannel* subchannel, + const ServerAddress& address, Subchannel* subchannel, grpc_combiner* combiner) : SubchannelData(subchannel_list, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 30316689ea7..aab6dd68216 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -94,7 +94,7 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, grpc_subchannel* subchannel, + const ServerAddress& address, Subchannel* subchannel, grpc_combiner* combiner) : SubchannelData(subchannel_list, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 2eb92b7ead0..0174a98a73d 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -88,7 +88,7 @@ class SubchannelData { } // Returns a pointer to the subchannel. - grpc_subchannel* subchannel() const { return subchannel_; } + Subchannel* subchannel() const { return subchannel_; } // Returns the connected subchannel. Will be null if the subchannel // is not connected. @@ -103,8 +103,8 @@ class SubchannelData { // ProcessConnectivityChangeLocked()). grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) { GPR_ASSERT(!connectivity_notification_pending_); - pending_connectivity_state_unsafe_ = grpc_subchannel_check_connectivity( - subchannel(), error, subchannel_list_->inhibit_health_checking()); + pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity( + error, subchannel_list_->inhibit_health_checking()); UpdateConnectedSubchannelLocked(); return pending_connectivity_state_unsafe_; } @@ -142,7 +142,7 @@ class SubchannelData { protected: SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, grpc_subchannel* subchannel, + const ServerAddress& address, Subchannel* subchannel, grpc_combiner* combiner); virtual ~SubchannelData(); @@ -170,7 +170,7 @@ class SubchannelData { SubchannelList* subchannel_list_; // The subchannel and connected subchannel. - grpc_subchannel* subchannel_; + Subchannel* subchannel_; RefCountedPtr connected_subchannel_; // Notification that connectivity has changed on subchannel. @@ -203,7 +203,7 @@ class SubchannelList : public InternallyRefCounted { for (size_t i = 0; i < subchannels_.size(); ++i) { if (subchannels_[i].subchannel() != nullptr) { grpc_core::channelz::SubchannelNode* subchannel_node = - grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); + subchannels_[i].subchannel()->channelz_node(); if (subchannel_node != nullptr) { refs_list->push_back(subchannel_node->uuid()); } @@ -276,7 +276,7 @@ class SubchannelList : public InternallyRefCounted { template SubchannelData::SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, grpc_subchannel* subchannel, + const ServerAddress& address, Subchannel* subchannel, grpc_combiner* combiner) : subchannel_list_(subchannel_list), subchannel_(subchannel), @@ -317,7 +317,7 @@ template void SubchannelData::ResetBackoffLocked() { if (subchannel_ != nullptr) { - grpc_subchannel_reset_backoff(subchannel_); + subchannel_->ResetBackoff(); } } @@ -337,8 +337,8 @@ void SubchannelDataRef(DEBUG_LOCATION, "connectivity_watch").release(); - grpc_subchannel_notify_on_state_change( - subchannel_, subchannel_list_->policy()->interested_parties(), + subchannel_->NotifyOnStateChange( + subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, subchannel_list_->inhibit_health_checking()); } @@ -357,8 +357,8 @@ void SubchannelDatapolicy()->interested_parties(), + subchannel_->NotifyOnStateChange( + subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, subchannel_list_->inhibit_health_checking()); } @@ -391,9 +391,9 @@ void SubchannelData:: subchannel_, reason); } GPR_ASSERT(connectivity_notification_pending_); - grpc_subchannel_notify_on_state_change( - subchannel_, nullptr, nullptr, &connectivity_changed_closure_, - subchannel_list_->inhibit_health_checking()); + subchannel_->NotifyOnStateChange(nullptr, nullptr, + &connectivity_changed_closure_, + subchannel_list_->inhibit_health_checking()); } template @@ -401,8 +401,7 @@ bool SubchannelData::UpdateConnectedSubchannelLocked() { // If the subchannel is READY, take a ref to the connected subchannel. if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) { - connected_subchannel_ = - grpc_subchannel_get_connected_subchannel(subchannel_); + connected_subchannel_ = subchannel_->connected_subchannel(); // If the subchannel became disconnected between the time that READY // was reported and the time we got here (e.g., between when a // notification callback is scheduled and when it was actually run in @@ -518,7 +517,7 @@ SubchannelList::SubchannelList( SubchannelPoolInterface::CreateChannelArg(policy_->subchannel_pool())); const size_t subchannel_address_arg_index = args_to_add.size(); args_to_add.emplace_back( - grpc_create_subchannel_address_arg(&addresses[i].address())); + Subchannel::CreateSubchannelAddressArg(&addresses[i].address())); if (addresses[i].args() != nullptr) { for (size_t j = 0; j < addresses[i].args()->num_args; ++j) { args_to_add.emplace_back(addresses[i].args()->args[j]); @@ -528,7 +527,7 @@ SubchannelList::SubchannelList( &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add.data(), args_to_add.size()); gpr_free(args_to_add[subchannel_address_arg_index].value.string); - grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( + Subchannel* subchannel = grpc_client_channel_factory_create_subchannel( client_channel_factory, new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { diff --git a/src/core/ext/filters/client_channel/local_subchannel_pool.cc b/src/core/ext/filters/client_channel/local_subchannel_pool.cc index 145fa4e0374..d1c1cacb441 100644 --- a/src/core/ext/filters/client_channel/local_subchannel_pool.cc +++ b/src/core/ext/filters/client_channel/local_subchannel_pool.cc @@ -32,11 +32,11 @@ LocalSubchannelPool::~LocalSubchannelPool() { grpc_avl_unref(subchannel_map_, nullptr); } -grpc_subchannel* LocalSubchannelPool::RegisterSubchannel( - SubchannelKey* key, grpc_subchannel* constructed) { +Subchannel* LocalSubchannelPool::RegisterSubchannel(SubchannelKey* key, + Subchannel* constructed) { // Check to see if a subchannel already exists. - grpc_subchannel* c = static_cast( - grpc_avl_get(subchannel_map_, key, nullptr)); + Subchannel* c = + static_cast(grpc_avl_get(subchannel_map_, key, nullptr)); if (c != nullptr) { // The subchannel already exists. Reuse it. c = GRPC_SUBCHANNEL_REF(c, "subchannel_register+reuse"); @@ -54,9 +54,9 @@ void LocalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) { subchannel_map_ = grpc_avl_remove(subchannel_map_, key, nullptr); } -grpc_subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) { - grpc_subchannel* c = static_cast( - grpc_avl_get(subchannel_map_, key, nullptr)); +Subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) { + Subchannel* c = + static_cast(grpc_avl_get(subchannel_map_, key, nullptr)); return c == nullptr ? c : GRPC_SUBCHANNEL_REF(c, "found_from_pool"); } diff --git a/src/core/ext/filters/client_channel/local_subchannel_pool.h b/src/core/ext/filters/client_channel/local_subchannel_pool.h index 9929cdb3627..a6b7e259fbb 100644 --- a/src/core/ext/filters/client_channel/local_subchannel_pool.h +++ b/src/core/ext/filters/client_channel/local_subchannel_pool.h @@ -39,10 +39,10 @@ class LocalSubchannelPool final : public SubchannelPoolInterface { // Implements interface methods. // Thread-unsafe. Intended to be invoked within the client_channel combiner. - grpc_subchannel* RegisterSubchannel(SubchannelKey* key, - grpc_subchannel* constructed) override; + Subchannel* RegisterSubchannel(SubchannelKey* key, + Subchannel* constructed) override; void UnregisterSubchannel(SubchannelKey* key) override; - grpc_subchannel* FindSubchannel(SubchannelKey* key) override; + Subchannel* FindSubchannel(SubchannelKey* key) override; private: // The vtable for subchannel operations in an AVL tree. diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index d77bb3c286b..70285659aad 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -44,7 +44,6 @@ #include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" @@ -55,153 +54,256 @@ #include "src/core/lib/transport/status_metadata.h" #include "src/core/lib/uri/uri_parser.h" +// Strong and weak refs. #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) +// Backoff parameters. #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -typedef struct external_state_watcher { - grpc_subchannel* subchannel; - grpc_pollset_set* pollset_set; - grpc_closure* notify; - grpc_closure closure; - struct external_state_watcher* next; - struct external_state_watcher* prev; -} external_state_watcher; +// Conversion between subchannel call and call stack. +#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ + (grpc_call_stack*)((char*)(call) + \ + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) +#define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \ + (SubchannelCall*)(((char*)(call_stack)) - \ + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) namespace grpc_core { -class ConnectedSubchannelStateWatcher; - -} // namespace grpc_core +// +// ConnectedSubchannel +// -struct grpc_subchannel { - /** The subchannel pool this subchannel is in */ - grpc_core::RefCountedPtr subchannel_pool; +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, const grpc_channel_args* args, + RefCountedPtr channelz_subchannel, + intptr_t socket_uuid) + : RefCounted(&grpc_trace_stream_refcount), + channel_stack_(channel_stack), + args_(grpc_channel_args_copy(args)), + channelz_subchannel_(std::move(channelz_subchannel)), + socket_uuid_(socket_uuid) {} - grpc_connector* connector; +ConnectedSubchannel::~ConnectedSubchannel() { + grpc_channel_args_destroy(args_); + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); +} - /** refcount - - lower INTERNAL_REF_BITS bits are for internal references: - these do not keep the subchannel open. - - upper remaining bits are for public references: these do - keep the subchannel open */ - gpr_atm ref_pair; +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} - /** channel arguments */ - grpc_channel_args* args; +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} - grpc_core::SubchannelKey* key; +namespace { - /** set during connection */ - grpc_connect_out_args connecting_result; +void SubchannelCallDestroy(void* arg, grpc_error* error) { + GPR_TIMER_SCOPE("subchannel_call_destroy", 0); + SubchannelCall* call = static_cast(arg); + grpc_closure* after_call_stack_destroy = call->after_call_stack_destroy(); + call->~SubchannelCall(); + // This should be the last step to destroy the subchannel call, because + // call->after_call_stack_destroy(), if not null, will free the call arena. + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call), nullptr, + after_call_stack_destroy); +} - /** callback for connection finishing */ - grpc_closure on_connected; +} // namespace - /** callback for our alarm */ - grpc_closure on_alarm; +RefCountedPtr ConnectedSubchannel::CreateCall( + const CallArgs& args, grpc_error** error) { + const size_t allocation_size = + GetInitialCallSizeEstimate(args.parent_data_size); + RefCountedPtr call( + new (gpr_arena_alloc(args.arena, allocation_size)) + SubchannelCall(Ref(DEBUG_LOCATION, "subchannel_call"), args)); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call.get()); + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ + }; + *error = grpc_call_stack_init(channel_stack_, 1, SubchannelCallDestroy, + call.get(), &call_args); + if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) { + const char* error_string = grpc_error_string(*error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return call; + } + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + if (channelz_subchannel_ != nullptr) { + channelz_subchannel_->RecordCallStarted(); + } + return call; +} - /** pollset_set tracking who's interested in a connection - being setup */ - grpc_pollset_set* pollset_set; +size_t ConnectedSubchannel::GetInitialCallSizeEstimate( + size_t parent_data_size) const { + size_t allocation_size = + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); + if (parent_data_size > 0) { + allocation_size += + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + + parent_data_size; + } else { + allocation_size += channel_stack_->call_stack_size; + } + return allocation_size; +} - grpc_core::UniquePtr health_check_service_name; +// +// SubchannelCall +// - /** mutex protecting remaining elements */ - gpr_mu mu; +void SubchannelCall::StartTransportStreamOpBatch( + grpc_transport_stream_op_batch* batch) { + GPR_TIMER_SCOPE("subchannel_call_process_op", 0); + MaybeInterceptRecvTrailingMetadata(batch); + grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); + grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); + GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); + top_elem->filter->start_transport_stream_op_batch(top_elem, batch); +} - /** active connection, or null */ - grpc_core::RefCountedPtr connected_subchannel; - grpc_core::OrphanablePtr - connected_subchannel_watcher; +void* SubchannelCall::GetParentData() { + grpc_channel_stack* chanstk = connected_subchannel_->channel_stack(); + return (char*)this + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); +} - /** have we seen a disconnection? */ - bool disconnected; - /** are we connecting */ - bool connecting; +grpc_call_stack* SubchannelCall::GetCallStack() { + return SUBCHANNEL_CALL_TO_CALL_STACK(this); +} - /** connectivity state tracking */ - grpc_connectivity_state_tracker state_tracker; - grpc_connectivity_state_tracker state_and_health_tracker; +void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { + GPR_ASSERT(after_call_stack_destroy_ == nullptr); + GPR_ASSERT(closure != nullptr); + after_call_stack_destroy_ = closure; +} - external_state_watcher root_external_state_watcher; +RefCountedPtr SubchannelCall::Ref() { + IncrementRefCount(); + return RefCountedPtr(this); +} - /** backoff state */ - grpc_core::ManualConstructor backoff; - grpc_millis next_attempt_deadline; - grpc_millis min_connect_timeout_ms; +RefCountedPtr SubchannelCall::Ref( + const grpc_core::DebugLocation& location, const char* reason) { + IncrementRefCount(location, reason); + return RefCountedPtr(this); +} - /** do we have an active alarm? */ - bool have_alarm; - /** have we started the backoff loop */ - bool backoff_begun; - // reset_backoff() was called while alarm was pending - bool retry_immediately; - /** our alarm */ - grpc_timer alarm; +void SubchannelCall::Unref() { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); +} - grpc_core::RefCountedPtr - channelz_subchannel; -}; +void SubchannelCall::Unref(const DebugLocation& location, const char* reason) { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); +} -struct grpc_subchannel_call { - grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, - const grpc_core::ConnectedSubchannel::CallArgs& args) - : connection(connection), deadline(args.deadline) {} - - grpc_core::ConnectedSubchannel* connection; - grpc_closure* schedule_closure_after_destroy = nullptr; - // state needed to support channelz interception of recv trailing metadata. - grpc_closure recv_trailing_metadata_ready; - grpc_closure* original_recv_trailing_metadata; - grpc_metadata_batch* recv_trailing_metadata = nullptr; - grpc_millis deadline; -}; +void SubchannelCall::MaybeInterceptRecvTrailingMetadata( + grpc_transport_stream_op_batch* batch) { + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; + } + // only add interceptor is channelz is enabled. + if (connected_subchannel_->channelz_subchannel() == nullptr) { + return; + } + GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, + this, grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(recv_trailing_metadata_ == nullptr); + recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + original_recv_trailing_metadata_ = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &recv_trailing_metadata_ready_; +} -static void maybe_start_connecting_locked(grpc_subchannel* c); +namespace { -static const char* subchannel_connectivity_state_change_string( - grpc_connectivity_state state) { - switch (state) { - case GRPC_CHANNEL_IDLE: - return "Subchannel state change to IDLE"; - case GRPC_CHANNEL_CONNECTING: - return "Subchannel state change to CONNECTING"; - case GRPC_CHANNEL_READY: - return "Subchannel state change to READY"; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - return "Subchannel state change to TRANSIENT_FAILURE"; - case GRPC_CHANNEL_SHUTDOWN: - return "Subchannel state change to SHUTDOWN"; +// Sets *status based on the rest of the parameters. +void GetCallStatus(grpc_status_code* status, grpc_millis deadline, + grpc_metadata_batch* md_batch, grpc_error* error) { + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); + } else { + if (md_batch->idx.named.grpc_status != nullptr) { + *status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); + } else { + *status = GRPC_STATUS_UNKNOWN; + } } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); + GRPC_ERROR_UNREF(error); } -static void set_subchannel_connectivity_state_locked( - grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error, - const char* reason) { - if (c->channelz_subchannel != nullptr) { - c->channelz_subchannel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - subchannel_connectivity_state_change_string(state))); +} // namespace + +void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { + SubchannelCall* call = static_cast(arg); + GPR_ASSERT(call->recv_trailing_metadata_ != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, + GRPC_ERROR_REF(error)); + channelz::SubchannelNode* channelz_subchannel = + call->connected_subchannel_->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); + } else { + channelz_subchannel->RecordCallFailed(); } - grpc_connectivity_state_set(&c->state_tracker, state, error, reason); + GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata_, + GRPC_ERROR_REF(error)); } -namespace grpc_core { +void SubchannelCall::IncrementRefCount() { + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); +} + +void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location, + const char* reason) { + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); +} + +// +// Subchannel::ConnectedSubchannelStateWatcher +// -class ConnectedSubchannelStateWatcher +class Subchannel::ConnectedSubchannelStateWatcher : public InternallyRefCounted { public: // Must be instantiated while holding c->mu. - explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c) - : subchannel_(c) { + explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { // Steal subchannel ref for connecting. GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); @@ -209,15 +311,15 @@ class ConnectedSubchannelStateWatcher // Callback uses initial ref to this. GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, grpc_schedule_on_exec_ctx); - c->connected_subchannel->NotifyOnStateChange(c->pollset_set, - &pending_connectivity_state_, - &on_connectivity_changed_); + c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_, + &pending_connectivity_state_, + &on_connectivity_changed_); // Start health check if needed. grpc_connectivity_state health_state = GRPC_CHANNEL_READY; - if (c->health_check_service_name != nullptr) { - health_check_client_ = grpc_core::MakeOrphanable( - c->health_check_service_name.get(), c->connected_subchannel, - c->pollset_set, c->channelz_subchannel); + if (c->health_check_service_name_ != nullptr) { + health_check_client_ = MakeOrphanable( + c->health_check_service_name_.get(), c->connected_subchannel_, + c->pollset_set_, c->channelz_node_); GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, grpc_schedule_on_exec_ctx); Ref().release(); // Ref for health callback tracked manually. @@ -226,9 +328,9 @@ class ConnectedSubchannelStateWatcher health_state = GRPC_CHANNEL_CONNECTING; } // Report initial state. - set_subchannel_connectivity_state_locked( - c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected"); - grpc_connectivity_state_set(&c->state_and_health_tracker, health_state, + c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, GRPC_ERROR_NONE, + "subchannel_connected"); + grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state, GRPC_ERROR_NONE, "subchannel_connected"); } @@ -242,33 +344,33 @@ class ConnectedSubchannelStateWatcher private: static void OnConnectivityChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); - grpc_subchannel* c = self->subchannel_; + Subchannel* c = self->subchannel_; { - MutexLock lock(&c->mu); + MutexLock lock(&c->mu_); switch (self->pending_connectivity_state_) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected && c->connected_subchannel != nullptr) { + if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, "Connected subchannel %p of subchannel %p has gone into " "%s. Attempting to reconnect.", - c->connected_subchannel.get(), c, + c->connected_subchannel_.get(), c, grpc_connectivity_state_name( self->pending_connectivity_state_)); } - c->connected_subchannel.reset(); - c->connected_subchannel_watcher.reset(); + c->connected_subchannel_.reset(); + c->connected_subchannel_watcher_.reset(); self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; - set_subchannel_connectivity_state_locked( - c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), - "reflect_child"); - grpc_connectivity_state_set(&c->state_and_health_tracker, + c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, + GRPC_ERROR_REF(error), + "reflect_child"); + grpc_connectivity_state_set(&c->state_and_health_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); - c->backoff_begun = false; - c->backoff->Reset(); - maybe_start_connecting_locked(c); + c->backoff_begun_ = false; + c->backoff_.Reset(); + c->MaybeStartConnectingLocked(); } else { self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; } @@ -281,15 +383,14 @@ class ConnectedSubchannelStateWatcher // this watch from. And a connected subchannel should never go // from READY to CONNECTING or IDLE. self->last_connectivity_state_ = self->pending_connectivity_state_; - set_subchannel_connectivity_state_locked( - c, self->pending_connectivity_state_, GRPC_ERROR_REF(error), - "reflect_child"); + c->SetConnectivityStateLocked(self->pending_connectivity_state_, + GRPC_ERROR_REF(error), "reflect_child"); if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker, + grpc_connectivity_state_set(&c->state_and_health_tracker_, self->pending_connectivity_state_, GRPC_ERROR_REF(error), "reflect_child"); } - c->connected_subchannel->NotifyOnStateChange( + c->connected_subchannel_->NotifyOnStateChange( nullptr, &self->pending_connectivity_state_, &self->on_connectivity_changed_); self = nullptr; // So we don't unref below. @@ -303,14 +404,14 @@ class ConnectedSubchannelStateWatcher static void OnHealthChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); - grpc_subchannel* c = self->subchannel_; - MutexLock lock(&c->mu); + Subchannel* c = self->subchannel_; + MutexLock lock(&c->mu_); if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) { self->Unref(); return; } if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker, + grpc_connectivity_state_set(&c->state_and_health_tracker_, self->health_state_, GRPC_ERROR_REF(error), "health_changed"); } @@ -318,163 +419,63 @@ class ConnectedSubchannelStateWatcher &self->on_health_changed_); } - grpc_subchannel* subchannel_; + Subchannel* subchannel_; grpc_closure on_connectivity_changed_; grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; - grpc_core::OrphanablePtr health_check_client_; + OrphanablePtr health_check_client_; grpc_closure on_health_changed_; grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; }; -} // namespace grpc_core - -#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ - (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ - sizeof(grpc_subchannel_call))) -#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ - (grpc_subchannel_call*)(((char*)(call_stack)) - \ - GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ - sizeof(grpc_subchannel_call))) - -static void on_subchannel_connected(void* subchannel, grpc_error* error); - -#ifndef NDEBUG -#define REF_REASON reason -#define REF_MUTATE_EXTRA_ARGS \ - GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose -#define REF_MUTATE_PURPOSE(x) , file, line, reason, x -#else -#define REF_REASON "" -#define REF_MUTATE_EXTRA_ARGS -#define REF_MUTATE_PURPOSE(x) -#endif - -/* - * connection implementation - */ - -static void connection_destroy(void* arg, grpc_error* error) { - grpc_channel_stack* stk = static_cast(arg); - grpc_channel_stack_destroy(stk); - gpr_free(stk); -} - -/* - * grpc_subchannel implementation - */ - -static void subchannel_destroy(void* arg, grpc_error* error) { - grpc_subchannel* c = static_cast(arg); - if (c->channelz_subchannel != nullptr) { - c->channelz_subchannel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Subchannel destroyed")); - c->channelz_subchannel->MarkSubchannelDestroyed(); - c->channelz_subchannel.reset(); - } - c->health_check_service_name.reset(); - grpc_channel_args_destroy(c->args); - grpc_connectivity_state_destroy(&c->state_tracker); - grpc_connectivity_state_destroy(&c->state_and_health_tracker); - grpc_connector_unref(c->connector); - grpc_pollset_set_destroy(c->pollset_set); - grpc_core::Delete(c->key); - gpr_mu_destroy(&c->mu); - gpr_free(c); -} +// +// Subchannel::ExternalStateWatcher +// -static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta, - int barrier REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) - : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); -#ifndef NDEBUG - if (grpc_trace_stream_refcount.enabled()) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, - purpose, old_val, old_val + delta, reason); +struct Subchannel::ExternalStateWatcher { + ExternalStateWatcher(Subchannel* subchannel, grpc_pollset_set* pollset_set, + grpc_closure* notify) + : subchannel(subchannel), pollset_set(pollset_set), notify(notify) { + GRPC_SUBCHANNEL_WEAK_REF(subchannel, "external_state_watcher+init"); + GRPC_CLOSURE_INIT(&on_state_changed, OnStateChanged, this, + grpc_schedule_on_exec_ctx); } -#endif - return old_val; -} - -grpc_subchannel* grpc_subchannel_ref( - grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), - 0 REF_MUTATE_PURPOSE("STRONG_REF")); - GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); - return c; -} - -grpc_subchannel* grpc_subchannel_weak_ref( - grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); - GPR_ASSERT(old_refs != 0); - return c; -} -grpc_subchannel* grpc_subchannel_ref_from_weak_ref( - grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - if (!c) return nullptr; - for (;;) { - gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); - if (old_refs >= (1 << INTERNAL_REF_BITS)) { - gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); - if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { - return c; - } - } else { - return nullptr; + static void OnStateChanged(void* arg, grpc_error* error) { + ExternalStateWatcher* w = static_cast(arg); + grpc_closure* follow_up = w->notify; + if (w->pollset_set != nullptr) { + grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, + w->pollset_set); } + gpr_mu_lock(&w->subchannel->mu_); + if (w->subchannel->external_state_watcher_list_ == w) { + w->subchannel->external_state_watcher_list_ = w->next; + } + if (w->next != nullptr) w->next->prev = w->prev; + if (w->prev != nullptr) w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu_); + GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); + Delete(w); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } -} -static void disconnect(grpc_subchannel* c) { - // The subchannel_pool is only used once here in this subchannel, so the - // access can be outside of the lock. - if (c->subchannel_pool != nullptr) { - c->subchannel_pool->UnregisterSubchannel(c->key); - c->subchannel_pool.reset(); - } - gpr_mu_lock(&c->mu); - GPR_ASSERT(!c->disconnected); - c->disconnected = true; - grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Subchannel disconnected")); - c->connected_subchannel.reset(); - c->connected_subchannel_watcher.reset(); - gpr_mu_unlock(&c->mu); -} + Subchannel* subchannel; + grpc_pollset_set* pollset_set; + grpc_closure* notify; + grpc_closure on_state_changed; + ExternalStateWatcher* next = nullptr; + ExternalStateWatcher* prev = nullptr; +}; -void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - // add a weak ref and subtract a strong ref (atomically) - old_refs = ref_mutate( - c, static_cast(1) - static_cast(1 << INTERNAL_REF_BITS), - 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { - disconnect(c); - } - GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref"); -} +// +// Subchannel +// -void grpc_subchannel_weak_unref( - grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = ref_mutate(c, -static_cast(1), - 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); - if (old_refs == 1) { - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - } -} +namespace { -static void parse_args_for_backoff_values( - const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, - grpc_millis* min_connect_timeout_ms) { +BackOff::Options ParseArgsForBackoffValues( + const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) { grpc_millis initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; *min_connect_timeout_ms = @@ -511,7 +512,8 @@ static void parse_args_for_backoff_values( } } } - backoff_options->set_initial_backoff(initial_backoff_ms) + return BackOff::Options() + .set_initial_backoff(initial_backoff_ms) .set_multiplier(fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) @@ -520,9 +522,6 @@ static void parse_args_for_backoff_values( .set_max_backoff(max_backoff_ms); } -namespace grpc_core { -namespace { - struct HealthCheckParams { UniquePtr service_name; @@ -543,31 +542,19 @@ struct HealthCheckParams { }; } // namespace -} // namespace grpc_core -grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, - const grpc_channel_args* args) { - grpc_core::SubchannelKey* key = - grpc_core::New(args); - grpc_core::SubchannelPoolInterface* subchannel_pool = - grpc_core::SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs( - args); - GPR_ASSERT(subchannel_pool != nullptr); - grpc_subchannel* c = subchannel_pool->FindSubchannel(key); - if (c != nullptr) { - grpc_core::Delete(key); - return c; - } +Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, + const grpc_channel_args* args) + : key_(key), + connector_(connector), + backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) { GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); - c = static_cast(gpr_zalloc(sizeof(*c))); - c->key = key; - gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); - c->connector = connector; - grpc_connector_ref(c->connector); - c->pollset_set = grpc_pollset_set_create(); + gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS); + grpc_connector_ref(connector_); + pollset_set_ = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast(gpr_malloc(sizeof(*addr))); - grpc_get_subchannel_address_arg(args, addr); + GetAddressFromSubchannelAddressArg(args, addr); grpc_resolved_address* new_address = nullptr; grpc_channel_args* new_args = nullptr; if (grpc_proxy_mappers_map_address(addr, args, &new_address, &new_args)) { @@ -576,569 +563,492 @@ grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, addr = new_address; } static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); + grpc_arg new_arg = CreateSubchannelAddressArg(addr); gpr_free(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( + args_ = grpc_channel_args_copy_and_add_and_remove( new_args != nullptr ? new_args : args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); gpr_free(new_arg.value.string); if (new_args != nullptr) grpc_channel_args_destroy(new_args); - c->root_external_state_watcher.next = c->root_external_state_watcher.prev = - &c->root_external_state_watcher; - GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, + GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, grpc_schedule_on_exec_ctx); - grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "subchannel"); - grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, "subchannel"); - grpc_core::BackOff::Options backoff_options; - parse_args_for_backoff_values(args, &backoff_options, - &c->min_connect_timeout_ms); - c->backoff.Init(backoff_options); - gpr_mu_init(&c->mu); - + gpr_mu_init(&mu_); // Check whether we should enable health checking. const char* service_config_json = grpc_channel_arg_get_string( - grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG)); + grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG)); if (service_config_json != nullptr) { - grpc_core::UniquePtr service_config = - grpc_core::ServiceConfig::Create(service_config_json); + UniquePtr service_config = + ServiceConfig::Create(service_config_json); if (service_config != nullptr) { - grpc_core::HealthCheckParams params; - service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse, - ¶ms); - c->health_check_service_name = std::move(params.service_name); + HealthCheckParams params; + service_config->ParseGlobalParams(HealthCheckParams::Parse, ¶ms); + health_check_service_name_ = std::move(params.service_name); } } - - const grpc_arg* arg = - grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); - bool channelz_enabled = + const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ); + const bool channelz_enabled = grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); arg = grpc_channel_args_find( - c->args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); + args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; size_t channel_tracer_max_memory = (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { - c->channelz_subchannel = - grpc_core::MakeRefCounted( - c, channel_tracer_max_memory); - c->channelz_subchannel->AddTraceEvent( - grpc_core::channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Subchannel created")); + channelz_node_ = MakeRefCounted( + this, channel_tracer_max_memory); + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("subchannel created")); + } +} + +Subchannel::~Subchannel() { + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + channelz_node_->MarkSubchannelDestroyed(); + } + grpc_channel_args_destroy(args_); + grpc_connectivity_state_destroy(&state_tracker_); + grpc_connectivity_state_destroy(&state_and_health_tracker_); + grpc_connector_unref(connector_); + grpc_pollset_set_destroy(pollset_set_); + Delete(key_); + gpr_mu_destroy(&mu_); +} + +Subchannel* Subchannel::Create(grpc_connector* connector, + const grpc_channel_args* args) { + SubchannelKey* key = New(args); + SubchannelPoolInterface* subchannel_pool = + SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args); + GPR_ASSERT(subchannel_pool != nullptr); + Subchannel* c = subchannel_pool->FindSubchannel(key); + if (c != nullptr) { + Delete(key); + return c; } + c = New(key, connector, args); // Try to register the subchannel before setting the subchannel pool. // Otherwise, in case of a registration race, unreffing c in - // RegisterSubchannel() will cause c to be tried to be unregistered, while its - // key maps to a different subchannel. - grpc_subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); - if (registered == c) c->subchannel_pool = subchannel_pool->Ref(); + // RegisterSubchannel() will cause c to be tried to be unregistered, while + // its key maps to a different subchannel. + Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); + if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); return registered; } -grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( - grpc_subchannel* subchannel) { - return subchannel->channelz_subchannel.get(); +Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = RefMutate((1 << INTERNAL_REF_BITS), + 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF")); + GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); + return this; } -intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) { - if (subchannel->connected_subchannel != nullptr) { - return subchannel->connected_subchannel->socket_uuid(); - } else { - return 0; +void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + // add a weak ref and subtract a strong ref (atomically) + old_refs = RefMutate( + static_cast(1) - static_cast(1 << INTERNAL_REF_BITS), + 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF")); + if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { + Disconnect(); } + GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref"); } -static void continue_connect_locked(grpc_subchannel* c) { - grpc_connect_in_args args; - args.interested_parties = c->pollset_set; - const grpc_millis min_deadline = - c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); - c->next_attempt_deadline = c->backoff->NextAttemptTime(); - args.deadline = std::max(c->next_attempt_deadline, min_deadline); - args.channel_args = c->args; - set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING, - GRPC_ERROR_NONE, "connecting"); - grpc_connectivity_state_set(&c->state_and_health_tracker, - GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, - "connecting"); - grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->on_connected); +Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF")); + GPR_ASSERT(old_refs != 0); + return this; } -grpc_connectivity_state grpc_subchannel_check_connectivity( - grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) { - gpr_mu_lock(&c->mu); - grpc_connectivity_state_tracker* tracker = - inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; - grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); - gpr_mu_unlock(&c->mu); - return state; -} +namespace { -static void on_external_state_watcher_done(void* arg, grpc_error* error) { - external_state_watcher* w = static_cast(arg); - grpc_closure* follow_up = w->notify; - if (w->pollset_set != nullptr) { - grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set, - w->pollset_set); - } - gpr_mu_lock(&w->subchannel->mu); - w->next->prev = w->prev; - w->prev->next = w->next; - gpr_mu_unlock(&w->subchannel->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); - gpr_free(w); - GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); +void subchannel_destroy(void* arg, grpc_error* error) { + Subchannel* self = static_cast(arg); + Delete(self); } -static void on_alarm(void* arg, grpc_error* error) { - grpc_subchannel* c = static_cast(arg); - gpr_mu_lock(&c->mu); - c->have_alarm = false; - if (c->disconnected) { - error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", - &error, 1); - } else if (c->retry_immediately) { - c->retry_immediately = false; - error = GRPC_ERROR_NONE; - } else { - GRPC_ERROR_REF(error); - } - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - continue_connect_locked(c); - gpr_mu_unlock(&c->mu); - } else { - gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); +} // namespace + +void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = RefMutate(-static_cast(1), + 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF")); + if (old_refs == 1) { + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } - GRPC_ERROR_UNREF(error); } -static void maybe_start_connecting_locked(grpc_subchannel* c) { - if (c->disconnected) { - /* Don't try to connect if we're already disconnected */ - return; - } - if (c->connecting) { - /* Already connecting: don't restart */ - return; - } - if (c->connected_subchannel != nullptr) { - /* Already connected: don't restart */ - return; - } - if (!grpc_connectivity_state_has_watchers(&c->state_tracker) && - !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) { - /* Nobody is interested in connecting: so don't just yet */ - return; - } - c->connecting = true; - GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); - if (!c->backoff_begun) { - c->backoff_begun = true; - continue_connect_locked(c); - } else { - GPR_ASSERT(!c->have_alarm); - c->have_alarm = true; - const grpc_millis time_til_next = - c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); - if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); +Subchannel* Subchannel::RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + for (;;) { + gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_); + if (old_refs >= (1 << INTERNAL_REF_BITS)) { + gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); + if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) { + return this; + } } else { - gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c, - time_til_next); + return nullptr; } - GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); - grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); } } -void grpc_subchannel_notify_on_state_change( - grpc_subchannel* c, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify, - bool inhibit_health_checks) { +intptr_t Subchannel::GetChildSocketUuid() { + if (connected_subchannel_ != nullptr) { + return connected_subchannel_->socket_uuid(); + } else { + return 0; + } +} + +const char* Subchannel::GetTargetAddress() { + const grpc_arg* addr_arg = + grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} + +RefCountedPtr Subchannel::connected_subchannel() { + MutexLock lock(&mu_); + return connected_subchannel_; +} + +channelz::SubchannelNode* Subchannel::channelz_node() { + return channelz_node_.get(); +} + +grpc_connectivity_state Subchannel::CheckConnectivity( + grpc_error** error, bool inhibit_health_checks) { + MutexLock lock(&mu_); + grpc_connectivity_state_tracker* tracker = + inhibit_health_checks ? &state_tracker_ : &state_and_health_tracker_; + grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); + return state; +} + +void Subchannel::NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, + grpc_closure* notify, + bool inhibit_health_checks) { grpc_connectivity_state_tracker* tracker = - inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; - external_state_watcher* w; + inhibit_health_checks ? &state_tracker_ : &state_and_health_tracker_; + ExternalStateWatcher* w; if (state == nullptr) { - gpr_mu_lock(&c->mu); - for (w = c->root_external_state_watcher.next; - w != &c->root_external_state_watcher; w = w->next) { + MutexLock lock(&mu_); + for (w = external_state_watcher_list_; w != nullptr; w = w->next) { if (w->notify == notify) { grpc_connectivity_state_notify_on_state_change(tracker, nullptr, - &w->closure); + &w->on_state_changed); } } - gpr_mu_unlock(&c->mu); } else { - w = static_cast(gpr_malloc(sizeof(*w))); - w->subchannel = c; - w->pollset_set = interested_parties; - w->notify = notify; - GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, - grpc_schedule_on_exec_ctx); + w = New(this, interested_parties, notify); if (interested_parties != nullptr) { - grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties); + grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); } - GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); - gpr_mu_lock(&c->mu); - w->next = &c->root_external_state_watcher; - w->prev = w->next->prev; - w->next->prev = w->prev->next = w; - grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure); - maybe_start_connecting_locked(c); - gpr_mu_unlock(&c->mu); + MutexLock lock(&mu_); + if (external_state_watcher_list_ != nullptr) { + w->next = external_state_watcher_list_; + w->next->prev = w; + } + external_state_watcher_list_ = w; + grpc_connectivity_state_notify_on_state_change(tracker, state, + &w->on_state_changed); + MaybeStartConnectingLocked(); } } -static bool publish_transport_locked(grpc_subchannel* c) { - /* construct channel stack */ - grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); - grpc_channel_stack_builder_set_channel_arguments( - builder, c->connecting_result.channel_args); - grpc_channel_stack_builder_set_transport(builder, - c->connecting_result.transport); - - if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { - grpc_channel_stack_builder_destroy(builder); - return false; - } - grpc_channel_stack* stk; - grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, connection_destroy, nullptr, - reinterpret_cast(&stk)); - if (error != GRPC_ERROR_NONE) { - grpc_transport_destroy(c->connecting_result.transport); - gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", - grpc_error_string(error)); - GRPC_ERROR_UNREF(error); - return false; - } - intptr_t socket_uuid = c->connecting_result.socket_uuid; - memset(&c->connecting_result, 0, sizeof(c->connecting_result)); - - if (c->disconnected) { - grpc_channel_stack_destroy(stk); - gpr_free(stk); - return false; +void Subchannel::ResetBackoff() { + MutexLock lock(&mu_); + backoff_.Reset(); + if (have_retry_alarm_) { + retry_immediately_ = true; + grpc_timer_cancel(&retry_alarm_); + } else { + backoff_begun_ = false; + MaybeStartConnectingLocked(); } - - /* publish */ - c->connected_subchannel.reset(grpc_core::New( - stk, c->args, c->channelz_subchannel, socket_uuid)); - gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", - c->connected_subchannel.get(), c); - - // Instantiate state watcher. Will clean itself up. - c->connected_subchannel_watcher = - grpc_core::MakeOrphanable(c); - - return true; } -static void on_subchannel_connected(void* arg, grpc_error* error) { - grpc_subchannel* c = static_cast(arg); - grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; - - GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); - gpr_mu_lock(&c->mu); - c->connecting = false; - if (c->connecting_result.transport != nullptr && - publish_transport_locked(c)) { - /* do nothing, transport was published */ - } else if (c->disconnected) { - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } else { - set_subchannel_connectivity_state_locked( - c, GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - "connect_failed"); - grpc_connectivity_state_set( - &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - "connect_failed"); - - const char* errmsg = grpc_error_string(error); - gpr_log(GPR_INFO, "Connect failed: %s", errmsg); - - maybe_start_connecting_locked(c); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } - gpr_mu_unlock(&c->mu); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected"); - grpc_channel_args_destroy(delete_channel_args); +grpc_arg Subchannel::CreateSubchannelAddressArg( + const grpc_resolved_address* addr) { + return grpc_channel_arg_string_create( + (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, + addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); } -void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { - gpr_mu_lock(&subchannel->mu); - subchannel->backoff->Reset(); - if (subchannel->have_alarm) { - subchannel->retry_immediately = true; - grpc_timer_cancel(&subchannel->alarm); - } else { - subchannel->backoff_begun = false; - maybe_start_connecting_locked(subchannel); - } - gpr_mu_unlock(&subchannel->mu); +const char* Subchannel::GetUriFromSubchannelAddressArg( + const grpc_channel_args* args) { + const grpc_arg* addr_arg = + grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; } -/* - * grpc_subchannel_call implementation - */ +namespace { -static void subchannel_call_destroy(void* call, grpc_error* error) { - GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); - grpc_subchannel_call* c = static_cast(call); - grpc_core::ConnectedSubchannel* connection = c->connection; - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, - c->schedule_closure_after_destroy); - connection->Unref(DEBUG_LOCATION, "subchannel_call"); - c->~grpc_subchannel_call(); +void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) { + grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + GPR_ASSERT(uri != nullptr); + if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); + grpc_uri_destroy(uri); } -void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, - grpc_closure* closure) { - GPR_ASSERT(call->schedule_closure_after_destroy == nullptr); - GPR_ASSERT(closure != nullptr); - call->schedule_closure_after_destroy = closure; -} +} // namespace -grpc_subchannel_call* grpc_subchannel_call_ref( - grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); - return c; +void Subchannel::GetAddressFromSubchannelAddressArg( + const grpc_channel_args* args, grpc_resolved_address* addr) { + const char* addr_uri_str = GetUriFromSubchannelAddressArg(args); + memset(addr, 0, sizeof(*addr)); + if (*addr_uri_str != '\0') { + UriToSockaddr(addr_uri_str, addr); + } } -void grpc_subchannel_call_unref( - grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); -} +namespace { -// Sets *status based on md_batch and error. -static void get_call_status(grpc_subchannel_call* call, - grpc_metadata_batch* md_batch, grpc_error* error, - grpc_status_code* status) { - if (error != GRPC_ERROR_NONE) { - grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, - nullptr); - } else { - if (md_batch->idx.named.grpc_status != nullptr) { - *status = grpc_get_status_code_from_metadata( - md_batch->idx.named.grpc_status->md); - } else { - *status = GRPC_STATUS_UNKNOWN; - } +// Returns a string indicating the subchannel's connectivity state change to +// \a state. +const char* SubchannelConnectivityStateChangeString( + grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "Subchannel state change to IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "Subchannel state change to CONNECTING"; + case GRPC_CHANNEL_READY: + return "Subchannel state change to READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "Subchannel state change to TRANSIENT_FAILURE"; + case GRPC_CHANNEL_SHUTDOWN: + return "Subchannel state change to SHUTDOWN"; } - GRPC_ERROR_UNREF(error); + GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { - grpc_subchannel_call* call = static_cast(arg); - GPR_ASSERT(call->recv_trailing_metadata != nullptr); - grpc_status_code status = GRPC_STATUS_OK; - grpc_metadata_batch* md_batch = call->recv_trailing_metadata; - get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); - grpc_core::channelz::SubchannelNode* channelz_subchannel = - call->connection->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); +} // namespace + +void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, + const char* reason) { + if (channelz_node_ != nullptr) { + channelz_node_->AddTraceEvent( + channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + SubchannelConnectivityStateChangeString(state))); } - GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, - GRPC_ERROR_REF(error)); + grpc_connectivity_state_set(&state_tracker_, state, error, reason); } -// If channelz is enabled, intercept recv_trailing so that we may check the -// status and associate it to a subchannel. -static void maybe_intercept_recv_trailing_metadata( - grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { +void Subchannel::MaybeStartConnectingLocked() { + if (disconnected_) { + // Don't try to connect if we're already disconnected. return; } - // only add interceptor is channelz is enabled. - if (call->connection->channelz_subchannel() == nullptr) { + if (connecting_) { + // Already connecting: don't restart. return; } - GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready, - recv_trailing_metadata_ready, call, - grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - GPR_ASSERT(call->recv_trailing_metadata == nullptr); - call->recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - call->original_recv_trailing_metadata = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &call->recv_trailing_metadata_ready; -} - -void grpc_subchannel_call_process_op(grpc_subchannel_call* call, - grpc_transport_stream_op_batch* batch) { - GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); - maybe_intercept_recv_trailing_metadata(call, batch); - grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); - grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); - GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); - top_elem->filter->start_transport_stream_op_batch(top_elem, batch); + if (connected_subchannel_ != nullptr) { + // Already connected: don't restart. + return; + } + if (!grpc_connectivity_state_has_watchers(&state_tracker_) && + !grpc_connectivity_state_has_watchers(&state_and_health_tracker_)) { + // Nobody is interested in connecting: so don't just yet. + return; + } + connecting_ = true; + GRPC_SUBCHANNEL_WEAK_REF(this, "connecting"); + if (!backoff_begun_) { + backoff_begun_ = true; + ContinueConnectingLocked(); + } else { + GPR_ASSERT(!have_retry_alarm_); + have_retry_alarm_ = true; + const grpc_millis time_til_next = + next_attempt_deadline_ - ExecCtx::Get()->Now(); + if (time_til_next <= 0) { + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this); + } else { + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", + this, time_til_next); + } + GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_); + } } -grpc_core::RefCountedPtr -grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { - gpr_mu_lock(&c->mu); - auto copy = c->connected_subchannel; - gpr_mu_unlock(&c->mu); - return copy; +void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { + Subchannel* c = static_cast(arg); + gpr_mu_lock(&c->mu_); + c->have_retry_alarm_ = false; + if (c->disconnected_) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", + &error, 1); + } else if (c->retry_immediately_) { + c->retry_immediately_ = false; + error = GRPC_ERROR_NONE; + } else { + GRPC_ERROR_REF(error); + } + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + c->ContinueConnectingLocked(); + gpr_mu_unlock(&c->mu_); + } else { + gpr_mu_unlock(&c->mu_); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } + GRPC_ERROR_UNREF(error); } -void* grpc_connected_subchannel_call_get_parent_data( - grpc_subchannel_call* subchannel_call) { - grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack(); - return (char*)subchannel_call + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); +void Subchannel::ContinueConnectingLocked() { + grpc_connect_in_args args; + args.interested_parties = pollset_set_; + const grpc_millis min_deadline = + min_connect_timeout_ms_ + ExecCtx::Get()->Now(); + next_attempt_deadline_ = backoff_.NextAttemptTime(); + args.deadline = std::max(next_attempt_deadline_, min_deadline); + args.channel_args = args_; + SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "connecting"); + grpc_connectivity_state_set(&state_and_health_tracker_, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "connecting"); + grpc_connector_connect(connector_, &args, &connecting_result_, + &on_connecting_finished_); } -grpc_call_stack* grpc_subchannel_call_get_call_stack( - grpc_subchannel_call* subchannel_call) { - return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); -} +void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { + auto* c = static_cast(arg); + grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; + GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); + gpr_mu_lock(&c->mu_); + c->connecting_ = false; + if (c->connecting_result_.transport != nullptr && + c->PublishTransportLocked()) { + // Do nothing, transport was published. + } else if (c->disconnected_) { + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } else { + c->SetConnectivityStateLocked( + GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + "connect_failed"); + grpc_connectivity_state_set( + &c->state_and_health_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + "connect_failed"); -static void grpc_uri_to_sockaddr(const char* uri_str, - grpc_resolved_address* addr) { - grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); - GPR_ASSERT(uri != nullptr); - if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); - grpc_uri_destroy(uri); -} + const char* errmsg = grpc_error_string(error); + gpr_log(GPR_INFO, "Connect failed: %s", errmsg); -void grpc_get_subchannel_address_arg(const grpc_channel_args* args, - grpc_resolved_address* addr) { - const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); - memset(addr, 0, sizeof(*addr)); - if (*addr_uri_str != '\0') { - grpc_uri_to_sockaddr(addr_uri_str, addr); + c->MaybeStartConnectingLocked(); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } + gpr_mu_unlock(&c->mu_); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); + grpc_channel_args_destroy(delete_channel_args); } -const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { - const grpc_arg* addr_arg = - grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); - const char* addr_str = grpc_channel_arg_get_string(addr_arg); - GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. - return addr_str; -} - -const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { - const grpc_arg* addr_arg = - grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); - const char* addr_str = grpc_channel_arg_get_string(addr_arg); - GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. - return addr_str; -} - -grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { - return grpc_channel_arg_string_create( - (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, - addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); -} - -namespace grpc_core { - -ConnectedSubchannel::ConnectedSubchannel( - grpc_channel_stack* channel_stack, const grpc_channel_args* args, - grpc_core::RefCountedPtr - channelz_subchannel, - intptr_t socket_uuid) - : RefCounted(&grpc_trace_stream_refcount), - channel_stack_(channel_stack), - args_(grpc_channel_args_copy(args)), - channelz_subchannel_(std::move(channelz_subchannel)), - socket_uuid_(socket_uuid) {} +namespace { -ConnectedSubchannel::~ConnectedSubchannel() { - grpc_channel_args_destroy(args_); - GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); +void ConnectionDestroy(void* arg, grpc_error* error) { + grpc_channel_stack* stk = static_cast(arg); + grpc_channel_stack_destroy(stk); + gpr_free(stk); } -void ConnectedSubchannel::NotifyOnStateChange( - grpc_pollset_set* interested_parties, grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} +} // namespace -void ConnectedSubchannel::Ping(grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); +bool Subchannel::PublishTransportLocked() { + // Construct channel stack. + grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments( + builder, connecting_result_.channel_args); + grpc_channel_stack_builder_set_transport(builder, + connecting_result_.transport); + if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { + grpc_channel_stack_builder_destroy(builder); + return false; + } + grpc_channel_stack* stk; + grpc_error* error = grpc_channel_stack_builder_finish( + builder, 0, 1, ConnectionDestroy, nullptr, + reinterpret_cast(&stk)); + if (error != GRPC_ERROR_NONE) { + grpc_transport_destroy(connecting_result_.transport); + gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + return false; + } + intptr_t socket_uuid = connecting_result_.socket_uuid; + memset(&connecting_result_, 0, sizeof(connecting_result_)); + if (disconnected_) { + grpc_channel_stack_destroy(stk); + gpr_free(stk); + return false; + } + // Publish. + connected_subchannel_.reset( + New(stk, args_, channelz_node_, socket_uuid)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + connected_subchannel_.get(), this); + // Instantiate state watcher. Will clean itself up. + connected_subchannel_watcher_ = + MakeOrphanable(this); + return true; } -grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, - grpc_subchannel_call** call) { - const size_t allocation_size = - GetInitialCallSizeEstimate(args.parent_data_size); - *call = new (gpr_arena_alloc(args.arena, allocation_size)) - grpc_subchannel_call(this, args); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); - RefCountedPtr connection = - Ref(DEBUG_LOCATION, "subchannel_call"); - connection.release(); // Ref is passed to the grpc_subchannel_call object. - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args.context, /* context */ - args.path, /* path */ - args.start_time, /* start_time */ - args.deadline, /* deadline */ - args.arena, /* arena */ - args.call_combiner /* call_combiner */ - }; - grpc_error* error = grpc_call_stack_init( - channel_stack_, 1, subchannel_call_destroy, *call, &call_args); - if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { - const char* error_string = grpc_error_string(error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return error; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); - if (channelz_subchannel_ != nullptr) { - channelz_subchannel_->RecordCallStarted(); +void Subchannel::Disconnect() { + // The subchannel_pool is only used once here in this subchannel, so the + // access can be outside of the lock. + if (subchannel_pool_ != nullptr) { + subchannel_pool_->UnregisterSubchannel(key_); + subchannel_pool_.reset(); } - return GRPC_ERROR_NONE; + MutexLock lock(&mu_); + GPR_ASSERT(!disconnected_); + disconnected_ = true; + grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Subchannel disconnected")); + connected_subchannel_.reset(); + connected_subchannel_watcher_.reset(); } -size_t ConnectedSubchannel::GetInitialCallSizeEstimate( - size_t parent_data_size) const { - size_t allocation_size = - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); - if (parent_data_size > 0) { - allocation_size += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + - parent_data_size; - } else { - allocation_size += channel_stack_->call_stack_size; +gpr_atm Subchannel::RefMutate( + gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta) + : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta); +#ifndef NDEBUG + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this, + purpose, old_val, old_val + delta, reason); } - return allocation_size; +#endif + return old_val; } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index fac515eee5c..88282c9d95e 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -24,53 +24,49 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" +#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" // Channel arg containing a grpc_resolved_address to connect to. #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" -/** A (sub-)channel that knows how to connect to exactly one target - address. Provides a target for load balancing. */ -typedef struct grpc_subchannel grpc_subchannel; -typedef struct grpc_subchannel_call grpc_subchannel_call; - +// For debugging refcounting. #ifndef NDEBUG -#define GRPC_SUBCHANNEL_REF(p, r) \ - grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ - grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_UNREF(p, r) \ - grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ - grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ - grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_REF(p, r) \ - grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ - grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) + (p)->RefFromWeakRef(__FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ - , const char *file, int line, const char *reason + const char *file, int line, const char *reason +#define GRPC_SUBCHANNEL_REF_REASON reason +#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \ + , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose +#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x #else -#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) -#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ - grpc_subchannel_ref_from_weak_ref((p)) -#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) -#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) -#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) +#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref() +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() +#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref() +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef() +#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref() #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS +#define GRPC_SUBCHANNEL_REF_REASON "" +#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS +#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) #endif namespace grpc_core { +class SubchannelCall; + class ConnectedSubchannel : public RefCounted { public: struct CallArgs { @@ -86,8 +82,7 @@ class ConnectedSubchannel : public RefCounted { ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, - grpc_core::RefCountedPtr - channelz_subchannel, + RefCountedPtr channelz_subchannel, intptr_t socket_uuid); ~ConnectedSubchannel(); @@ -95,7 +90,8 @@ class ConnectedSubchannel : public RefCounted { grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); - grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); + RefCountedPtr CreateCall(const CallArgs& args, + grpc_error** error); grpc_channel_stack* channel_stack() const { return channel_stack_; } const grpc_channel_args* args() const { return args_; } @@ -111,91 +107,204 @@ class ConnectedSubchannel : public RefCounted { grpc_channel_args* args_; // ref counted pointer to the channelz node in this connected subchannel's // owning subchannel. - grpc_core::RefCountedPtr - channelz_subchannel_; + RefCountedPtr channelz_subchannel_; // uuid of this subchannel's socket. 0 if this subchannel is not connected. const intptr_t socket_uuid_; }; -} // namespace grpc_core +// Implements the interface of RefCounted<>. +class SubchannelCall { + public: + SubchannelCall(RefCountedPtr connected_subchannel, + const ConnectedSubchannel::CallArgs& args) + : connected_subchannel_(std::move(connected_subchannel)), + deadline_(args.deadline) {} + + // Continues processing a transport stream op batch. + void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); + + // Returns a pointer to the parent data associated with the subchannel call. + // The data will be of the size specified in \a parent_data_size field of + // the args passed to \a ConnectedSubchannel::CreateCall(). + void* GetParentData(); + + // Returns the call stack of the subchannel call. + grpc_call_stack* GetCallStack(); + + grpc_closure* after_call_stack_destroy() const { + return after_call_stack_destroy_; + } + + // Sets the 'then_schedule_closure' argument for call stack destruction. + // Must be called once per call. + void SetAfterCallStackDestroy(grpc_closure* closure); + + // Interface of RefCounted<>. + RefCountedPtr Ref() GRPC_MUST_USE_RESULT; + RefCountedPtr Ref(const DebugLocation& location, + const char* reason) GRPC_MUST_USE_RESULT; + // When refcount drops to 0, destroys itself and the associated call stack, + // but does NOT free the memory because it's in the call arena. + void Unref(); + void Unref(const DebugLocation& location, const char* reason); + + private: + // Allow RefCountedPtr<> to access IncrementRefCount(). + template + friend class RefCountedPtr; + + // If channelz is enabled, intercepts recv_trailing so that we may check the + // status and associate it to a subchannel. + void MaybeInterceptRecvTrailingMetadata( + grpc_transport_stream_op_batch* batch); + + static void RecvTrailingMetadataReady(void* arg, grpc_error* error); + + // Interface of RefCounted<>. + void IncrementRefCount(); + void IncrementRefCount(const DebugLocation& location, const char* reason); + + RefCountedPtr connected_subchannel_; + grpc_closure* after_call_stack_destroy_ = nullptr; + // State needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready_; + grpc_closure* original_recv_trailing_metadata_ = nullptr; + grpc_metadata_batch* recv_trailing_metadata_ = nullptr; + grpc_millis deadline_; +}; + +// A subchannel that knows how to connect to exactly one target address. It +// provides a target for load balancing. +class Subchannel { + public: + // The ctor and dtor are not intended to use directly. + Subchannel(SubchannelKey* key, grpc_connector* connector, + const grpc_channel_args* args); + ~Subchannel(); + + // Creates a subchannel given \a connector and \a args. + static Subchannel* Create(grpc_connector* connector, + const grpc_channel_args* args); + + // Strong and weak refcounting. + Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + + intptr_t GetChildSocketUuid(); -grpc_subchannel* grpc_subchannel_ref( - grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_subchannel* grpc_subchannel_ref_from_weak_ref( - grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_unref( - grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_subchannel* grpc_subchannel_weak_ref( - grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_weak_unref( - grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -grpc_subchannel_call* grpc_subchannel_call_ref( - grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_unref( - grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - -grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( - grpc_subchannel* subchannel); - -intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel); - -/** Returns a pointer to the parent data associated with \a subchannel_call. - The data will be of the size specified in \a parent_data_size - field of the args passed to \a grpc_connected_subchannel_create_call(). */ -void* grpc_connected_subchannel_call_get_parent_data( - grpc_subchannel_call* subchannel_call); - -/** poll the current connectivity state of a channel */ -grpc_connectivity_state grpc_subchannel_check_connectivity( - grpc_subchannel* channel, grpc_error** error, bool inhibit_health_checking); - -/** Calls notify when the connectivity state of a channel becomes different - from *state. Updates *state with the new state of the channel. */ -void grpc_subchannel_notify_on_state_change( - grpc_subchannel* channel, grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify, - bool inhibit_health_checks); - -/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected - * (which may happen before it initially connects or during transient failures) - * */ -grpc_core::RefCountedPtr -grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); - -// Resets the connection backoff of the subchannel. -// TODO(roth): Move connection backoff out of subchannels and up into LB -// policy code (probably by adding a SubchannelGroup between -// SubchannelList and SubchannelData), at which point this method can -// go away. -void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel); - -/** continue processing a transport op */ -void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call, - grpc_transport_stream_op_batch* op); - -/** Must be called once per call. Sets the 'then_schedule_closure' argument for - call stack destruction. */ -void grpc_subchannel_call_set_cleanup_closure( - grpc_subchannel_call* subchannel_call, grpc_closure* closure); - -grpc_call_stack* grpc_subchannel_call_get_call_stack( - grpc_subchannel_call* subchannel_call); - -/** create a subchannel given a connector */ -grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, - const grpc_channel_args* args); - -/// Sets \a addr from \a args. -void grpc_get_subchannel_address_arg(const grpc_channel_args* args, - grpc_resolved_address* addr); - -const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); - -/// Returns the URI string for the address to connect to. -const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); - -/// Returns a new channel arg encoding the subchannel address as a string. -/// Caller is responsible for freeing the string. -grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr); + // Gets the string representing the subchannel address. + // Caller doesn't take ownership. + const char* GetTargetAddress(); + + // Gets the connected subchannel - or nullptr if not connected (which may + // happen before it initially connects or during transient failures). + RefCountedPtr connected_subchannel(); + + channelz::SubchannelNode* channelz_node(); + + // Polls the current connectivity state of the subchannel. + grpc_connectivity_state CheckConnectivity(grpc_error** error, + bool inhibit_health_checking); + + // When the connectivity state of the subchannel changes from \a *state, + // invokes \a notify and updates \a *state with the new state. + void NotifyOnStateChange(grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* notify, + bool inhibit_health_checks); + + // Resets the connection backoff of the subchannel. + // TODO(roth): Move connection backoff out of subchannels and up into LB + // policy code (probably by adding a SubchannelGroup between + // SubchannelList and SubchannelData), at which point this method can + // go away. + void ResetBackoff(); + + // Returns a new channel arg encoding the subchannel address as a URI + // string. Caller is responsible for freeing the string. + static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr); + + // Returns the URI string from the subchannel address arg in \a args. + static const char* GetUriFromSubchannelAddressArg( + const grpc_channel_args* args); + + // Sets \a addr from the subchannel address arg in \a args. + static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args, + grpc_resolved_address* addr); + + private: + struct ExternalStateWatcher; + class ConnectedSubchannelStateWatcher; + + // Sets the subchannel's connectivity state to \a state. + void SetConnectivityStateLocked(grpc_connectivity_state state, + grpc_error* error, const char* reason); + + // Methods for connection. + void MaybeStartConnectingLocked(); + static void OnRetryAlarm(void* arg, grpc_error* error); + void ContinueConnectingLocked(); + static void OnConnectingFinished(void* arg, grpc_error* error); + bool PublishTransportLocked(); + void Disconnect(); + + gpr_atm RefMutate(gpr_atm delta, + int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS); + + // The subchannel pool this subchannel is in. + RefCountedPtr subchannel_pool_; + // TODO(juanlishen): Consider using args_ as key_ directly. + // Subchannel key that identifies this subchannel in the subchannel pool. + SubchannelKey* key_; + // Channel args. + grpc_channel_args* args_; + // pollset_set tracking who's interested in a connection being setup. + grpc_pollset_set* pollset_set_; + // Protects the other members. + gpr_mu mu_; + // Refcount + // - lower INTERNAL_REF_BITS bits are for internal references: + // these do not keep the subchannel open. + // - upper remaining bits are for public references: these do + // keep the subchannel open + gpr_atm ref_pair_; + + // Connection states. + grpc_connector* connector_ = nullptr; + // Set during connection. + grpc_connect_out_args connecting_result_; + grpc_closure on_connecting_finished_; + // Active connection, or null. + RefCountedPtr connected_subchannel_; + OrphanablePtr connected_subchannel_watcher_; + bool connecting_ = false; + bool disconnected_ = false; + + // Connectivity state tracking. + grpc_connectivity_state_tracker state_tracker_; + grpc_connectivity_state_tracker state_and_health_tracker_; + UniquePtr health_check_service_name_; + ExternalStateWatcher* external_state_watcher_list_ = nullptr; + + // Backoff state. + BackOff backoff_; + grpc_millis next_attempt_deadline_; + grpc_millis min_connect_timeout_ms_; + bool backoff_begun_ = false; + + // Retry alarm. + grpc_timer retry_alarm_; + grpc_closure on_retry_alarm_; + bool have_retry_alarm_ = false; + // reset_backoff() was called while alarm was pending. + bool retry_immediately_ = false; + + // Channelz tracking. + RefCountedPtr channelz_node_; +}; + +} // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */ diff --git a/src/core/ext/filters/client_channel/subchannel_pool_interface.h b/src/core/ext/filters/client_channel/subchannel_pool_interface.h index 21597bf4276..eeb56faf0c0 100644 --- a/src/core/ext/filters/client_channel/subchannel_pool_interface.h +++ b/src/core/ext/filters/client_channel/subchannel_pool_interface.h @@ -26,10 +26,10 @@ #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/ref_counted.h" -struct grpc_subchannel; - namespace grpc_core { +class Subchannel; + extern TraceFlag grpc_subchannel_pool_trace; // A key that can uniquely identify a subchannel. @@ -69,15 +69,15 @@ class SubchannelPoolInterface : public RefCounted { // Registers a subchannel against a key. Returns the subchannel registered // with \a key, which may be different from \a constructed because we reuse // (instead of update) any existing subchannel already registered with \a key. - virtual grpc_subchannel* RegisterSubchannel( - SubchannelKey* key, grpc_subchannel* constructed) GRPC_ABSTRACT; + virtual Subchannel* RegisterSubchannel(SubchannelKey* key, + Subchannel* constructed) GRPC_ABSTRACT; // Removes the registered subchannel found by \a key. virtual void UnregisterSubchannel(SubchannelKey* key) GRPC_ABSTRACT; // Finds the subchannel registered for the given subchannel key. Returns NULL // if no such channel exists. Thread-safe. - virtual grpc_subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT; + virtual Subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT; // Creates a channel arg from \a subchannel pool. static grpc_arg CreateChannelArg(SubchannelPoolInterface* subchannel_pool); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 42a2e2e896c..1e9a75d0630 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -202,7 +202,8 @@ static void chttp2_connector_connect(grpc_connector* con, grpc_closure* notify) { chttp2_connector* c = reinterpret_cast(con); grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_core::Subchannel::GetAddressFromSubchannelAddressArg(args->channel_args, + &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == nullptr); c->notify = notify; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index a5bf1bf21d4..8aabcfa2000 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -39,11 +39,11 @@ static void client_channel_factory_ref( static void client_channel_factory_unref( grpc_client_channel_factory* cc_factory) {} -static grpc_subchannel* client_channel_factory_create_subchannel( +static grpc_core::Subchannel* client_channel_factory_create_subchannel( grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args); grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_subchannel* s = grpc_subchannel_create(connector, new_args); + grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args); grpc_connector_unref(connector); grpc_channel_args_destroy(new_args); return s; diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index 5985fa0cbdb..eb2fee2af91 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -76,7 +76,8 @@ static grpc_channel_args* get_secure_naming_channel_args( grpc_core::UniquePtr authority; if (target_authority_table != nullptr) { // Find the authority for the target. - const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args); + const char* target_uri_str = + grpc_core::Subchannel::GetUriFromSubchannelAddressArg(args); grpc_uri* target_uri = grpc_uri_parse(target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != nullptr); @@ -138,7 +139,7 @@ static grpc_channel_args* get_secure_naming_channel_args( return new_args; } -static grpc_subchannel* client_channel_factory_create_subchannel( +static grpc_core::Subchannel* client_channel_factory_create_subchannel( grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { grpc_channel_args* new_args = get_secure_naming_channel_args(args); if (new_args == nullptr) { @@ -147,7 +148,7 @@ static grpc_subchannel* client_channel_factory_create_subchannel( return nullptr; } grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_subchannel* s = grpc_subchannel_create(connector, new_args); + grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args); grpc_connector_unref(connector); grpc_channel_args_destroy(new_args); return s; diff --git a/test/core/util/debugger_macros.cc b/test/core/util/debugger_macros.cc index 05fb1461733..fed6ad97285 100644 --- a/test/core/util/debugger_macros.cc +++ b/test/core/util/debugger_macros.cc @@ -36,13 +36,14 @@ grpc_stream* grpc_transport_stream_from_call(grpc_call* call) { for (;;) { grpc_call_element* el = grpc_call_stack_element(cs, cs->count - 1); if (el->filter == &grpc_client_channel_filter) { - grpc_subchannel_call* scc = grpc_client_channel_get_subchannel_call(el); + grpc_core::RefCountedPtr scc = + grpc_client_channel_get_subchannel_call(el); if (scc == nullptr) { fprintf(stderr, "No subchannel-call"); fflush(stderr); return nullptr; } - cs = grpc_subchannel_call_get_call_stack(scc); + cs = scc->GetCallStack(); } else if (el->filter == &grpc_connected_filter) { return grpc_connected_channel_get_stream(el); } else { diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 125b1ce5c4e..973f47beaf7 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -325,8 +325,8 @@ class FakeClientChannelFactory : public grpc_client_channel_factory { private: static void NoRef(grpc_client_channel_factory* factory) {} static void NoUnref(grpc_client_channel_factory* factory) {} - static grpc_subchannel* CreateSubchannel(grpc_client_channel_factory* factory, - const grpc_channel_args* args) { + static grpc_core::Subchannel* CreateSubchannel( + grpc_client_channel_factory* factory, const grpc_channel_args* args) { return nullptr; } static grpc_channel* CreateClientChannel(grpc_client_channel_factory* factory,