diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 38525dbf97e..35c3efab6aa 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -394,7 +394,7 @@ struct subchannel_batch_data { gpr_refcount refs; grpc_call_element* elem; - grpc_core::RefCountedPtr subchannel_call; + grpc_subchannel_call* subchannel_call; // Holds a ref. // The batch to use in the subchannel call. // Its payload field points to subchannel_call_retry_state.batch_payload. grpc_transport_stream_op_batch batch; @@ -478,7 +478,7 @@ struct pending_batch { bool send_ops_cached; }; -/** Call data. Holds a pointer to SubchannelCall and the +/** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting for initial metadata before trying to create a call object, @@ -504,6 +504,10 @@ struct call_data { last_attempt_got_server_pushback(false) {} ~call_data() { + if (GPR_LIKELY(subchannel_call != nullptr)) { + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, + "client_channel_destroy_call"); + } grpc_slice_unref_internal(path); GRPC_ERROR_UNREF(cancel_error); for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) { @@ -532,7 +536,7 @@ struct call_data { grpc_core::RefCountedPtr retry_throttle_data; grpc_core::RefCountedPtr method_params; - grpc_core::RefCountedPtr subchannel_call; + grpc_subchannel_call* subchannel_call = nullptr; // Set when we get a cancel_stream op. grpc_error* cancel_error = GRPC_ERROR_NONE; @@ -803,8 +807,8 @@ static void pending_batches_add(grpc_call_element* elem, calld->subchannel_call == nullptr ? nullptr : static_cast( - - calld->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); retry_commit(elem, retry_state); // If we are not going to retry and have not yet started, pretend // retries are disabled so that we don't bother with retry overhead. @@ -892,10 +896,10 @@ static void resume_pending_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); - grpc_core::SubchannelCall* subchannel_call = - static_cast(batch->handler_private.extra_arg); + grpc_subchannel_call* subchannel_call = + static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. - subchannel_call->StartTransportStreamOpBatch(batch); + grpc_subchannel_call_process_op(subchannel_call, batch); } // This is called via the call combiner, so access to calld is synchronized. @@ -915,7 +919,7 @@ static void pending_batches_resume(grpc_call_element* elem) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " pending batches on subchannel_call=%p", - chand, calld, num_batches, calld->subchannel_call.get()); + chand, calld, num_batches, calld->subchannel_call); } grpc_core::CallCombinerClosureList closures; for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) { @@ -926,7 +930,7 @@ static void pending_batches_resume(grpc_call_element* elem) { maybe_inject_recv_trailing_metadata_ready_for_lb( *calld->request->pick(), batch); } - batch->handler_private.extra_arg = calld->subchannel_call.get(); + batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, resume_pending_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); @@ -1015,7 +1019,12 @@ static void do_retry(grpc_call_element* elem, const ClientChannelMethodParams::RetryPolicy* retry_policy = calld->method_params->retry_policy(); GPR_ASSERT(retry_policy != nullptr); - calld->subchannel_call.reset(); + // Reset subchannel call and connected subchannel. + if (calld->subchannel_call != nullptr) { + GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call, + "client_channel_call_retry"); + calld->subchannel_call = nullptr; + } if (calld->have_request) { calld->have_request = false; calld->request.Destroy(); @@ -1069,7 +1078,8 @@ static bool maybe_retry(grpc_call_element* elem, subchannel_call_retry_state* retry_state = nullptr; if (batch_data != nullptr) { retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); if (retry_state->retry_dispatched) { if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand, @@ -1170,10 +1180,13 @@ namespace { subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, call_data* calld, int refcount, bool set_on_complete) - : elem(elem), subchannel_call(calld->subchannel_call) { + : elem(elem), + subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, + "batch_data_create")) { subchannel_call_retry_state* retry_state = static_cast( - calld->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); batch.payload = &retry_state->batch_payload; gpr_ref_init(&refs, refcount); if (set_on_complete) { @@ -1187,7 +1200,7 @@ subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem, void subchannel_batch_data::destroy() { subchannel_call_retry_state* retry_state = static_cast( - subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data(subchannel_call)); if (batch.send_initial_metadata) { grpc_metadata_batch_destroy(&retry_state->send_initial_metadata); } @@ -1200,7 +1213,7 @@ void subchannel_batch_data::destroy() { if (batch.recv_trailing_metadata) { grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata); } - subchannel_call.reset(); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref"); call_data* calld = static_cast(elem->call_data); GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data"); } @@ -1247,7 +1260,8 @@ static void invoke_recv_initial_metadata_callback(void* arg, // Return metadata. subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_initial_metadata, pending->batch->payload->recv_initial_metadata.recv_initial_metadata); @@ -1279,7 +1293,8 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); retry_state->completed_recv_initial_metadata = true; // If a retry was already dispatched, then we're not going to use the // result of this recv_initial_metadata op, so do nothing. @@ -1340,7 +1355,8 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { // Return payload. subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); *pending->batch->payload->recv_message.recv_message = std::move(retry_state->recv_message); // Update bookkeeping. @@ -1368,7 +1384,8 @@ static void recv_message_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); ++retry_state->completed_recv_message_count; // If a retry was already dispatched, then we're not going to use the // result of this recv_message op, so do nothing. @@ -1456,7 +1473,8 @@ static void add_closure_for_recv_trailing_metadata_ready( // Return metadata. subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); grpc_metadata_batch_move( &retry_state->recv_trailing_metadata, pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata); @@ -1558,7 +1576,8 @@ static void run_closures_for_completed_call(subchannel_batch_data* batch_data, call_data* calld = static_cast(elem->call_data); subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); // Construct list of closures to execute. grpc_core::CallCombinerClosureList closures; // First, add closure for recv_trailing_metadata_ready. @@ -1592,7 +1611,8 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); retry_state->completed_recv_trailing_metadata = true; // Get the call's status and check for server pushback metadata. grpc_status_code status = GRPC_STATUS_OK; @@ -1715,7 +1735,8 @@ static void on_complete(void* arg, grpc_error* error) { } subchannel_call_retry_state* retry_state = static_cast( - batch_data->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + batch_data->subchannel_call)); // Update bookkeeping in retry_state. if (batch_data->batch.send_initial_metadata) { retry_state->completed_send_initial_metadata = true; @@ -1771,10 +1792,10 @@ static void on_complete(void* arg, grpc_error* error) { static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) { grpc_transport_stream_op_batch* batch = static_cast(arg); - grpc_core::SubchannelCall* subchannel_call = - static_cast(batch->handler_private.extra_arg); + grpc_subchannel_call* subchannel_call = + static_cast(batch->handler_private.extra_arg); // Note: This will release the call combiner. - subchannel_call->StartTransportStreamOpBatch(batch); + grpc_subchannel_call_process_op(subchannel_call, batch); } // Adds a closure to closures that will execute batch in the call combiner. @@ -1783,7 +1804,7 @@ static void add_closure_for_subchannel_batch( grpc_core::CallCombinerClosureList* closures) { channel_data* chand = static_cast(elem->channel_data); call_data* calld = static_cast(elem->call_data); - batch->handler_private.extra_arg = calld->subchannel_call.get(); + batch->handler_private.extra_arg = calld->subchannel_call; GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); @@ -1957,7 +1978,8 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { } subchannel_call_retry_state* retry_state = static_cast( - calld->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); // Create batch_data with 2 refs, since this batch will be unreffed twice: // once for the recv_trailing_metadata_ready callback when the subchannel // batch returns, and again when we actually get a recv_trailing_metadata @@ -1967,7 +1989,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) { add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data); retry_state->recv_trailing_metadata_internal_batch = batch_data; // Note: This will release the call combiner. - calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch); + grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch); } // If there are any cached send ops that need to be replayed on the @@ -2174,7 +2196,8 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { } subchannel_call_retry_state* retry_state = static_cast( - calld->subchannel_call->GetParentData()); + grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)); // Construct list of closures to execute, one for each pending batch. grpc_core::CallCombinerClosureList closures; // Replay previously-returned send_* ops if needed. @@ -2197,7 +2220,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting %" PRIuPTR " retriable batches on subchannel_call=%p", - chand, calld, closures.size(), calld->subchannel_call.get()); + chand, calld, closures.size(), calld->subchannel_call); } // Note: This will yield the call combiner. closures.RunClosures(calld->call_combiner); @@ -2222,22 +2245,22 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) { calld->call_combiner, // call_combiner parent_data_size // parent_data_size }; - grpc_error* new_error = GRPC_ERROR_NONE; - calld->subchannel_call = - calld->request->pick()->connected_subchannel->CreateCall(call_args, - &new_error); + grpc_error* new_error = + calld->request->pick()->connected_subchannel->CreateCall( + call_args, &calld->subchannel_call); if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s", - chand, calld, calld->subchannel_call.get(), - grpc_error_string(new_error)); + chand, calld, calld->subchannel_call, grpc_error_string(new_error)); } if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) { new_error = grpc_error_add_child(new_error, error); pending_batches_fail(elem, new_error, true /* yield_call_combiner */); } else { if (parent_data_size > 0) { - new (calld->subchannel_call->GetParentData()) subchannel_call_retry_state( - calld->request->pick()->subchannel_call_context); + new (grpc_connected_subchannel_call_get_parent_data( + calld->subchannel_call)) + subchannel_call_retry_state( + calld->request->pick()->subchannel_call_context); } pending_batches_resume(elem); } @@ -2465,7 +2488,7 @@ static void cc_start_transport_stream_op_batch( batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner); } else { // Note: This will release the call combiner. - calld->subchannel_call->StartTransportStreamOpBatch(batch); + grpc_subchannel_call_process_op(calld->subchannel_call, batch); } return; } @@ -2479,7 +2502,7 @@ static void cc_start_transport_stream_op_batch( if (grpc_client_channel_trace.enabled()) { gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on subchannel_call=%p", chand, - calld, calld->subchannel_call.get()); + calld, calld->subchannel_call); } pending_batches_resume(elem); return; @@ -2522,7 +2545,8 @@ static void cc_destroy_call_elem(grpc_call_element* elem, grpc_closure* then_schedule_closure) { call_data* calld = static_cast(elem->call_data); if (GPR_LIKELY(calld->subchannel_call != nullptr)) { - calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure); + grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call, + then_schedule_closure); then_schedule_closure = nullptr; } calld->~call_data(); @@ -2728,8 +2752,8 @@ void grpc_client_channel_watch_connectivity_state( GRPC_ERROR_NONE); } -grpc_core::RefCountedPtr -grpc_client_channel_get_subchannel_call(grpc_call_element* elem) { +grpc_subchannel_call* grpc_client_channel_get_subchannel_call( + grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); return calld->subchannel_call; } diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 5bfff4df9cd..4935fd24d87 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -60,7 +60,7 @@ void grpc_client_channel_watch_connectivity_state( grpc_closure* watcher_timer_init); /* Debug helper: pull the subchannel call from a call stack element */ -grpc_core::RefCountedPtr -grpc_client_channel_get_subchannel_call(grpc_call_element* elem); +grpc_subchannel_call* grpc_client_channel_get_subchannel_call( + grpc_call_element* elem); #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H */ diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.cc b/src/core/ext/filters/client_channel/client_channel_channelz.cc index 76c5a786240..8e5426081c4 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.cc +++ b/src/core/ext/filters/client_channel/client_channel_channelz.cc @@ -113,11 +113,12 @@ RefCountedPtr ClientChannelNode::MakeClientChannelNode( is_top_level_channel); } -SubchannelNode::SubchannelNode(Subchannel* subchannel, +SubchannelNode::SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes) : BaseNode(EntityType::kSubchannel), subchannel_(subchannel), - target_(UniquePtr(gpr_strdup(subchannel_->GetTargetAddress()))), + target_( + UniquePtr(gpr_strdup(grpc_subchannel_get_target(subchannel_)))), trace_(channel_tracer_max_nodes) {} SubchannelNode::~SubchannelNode() {} @@ -127,8 +128,8 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) { if (subchannel_ == nullptr) { state = GRPC_CHANNEL_SHUTDOWN; } else { - state = subchannel_->CheckConnectivity(nullptr, - true /* inhibit_health_checking */); + state = grpc_subchannel_check_connectivity( + subchannel_, nullptr, true /* inhibit_health_checking */); } json = grpc_json_create_child(nullptr, json, "state", nullptr, GRPC_JSON_OBJECT, false); @@ -169,7 +170,7 @@ grpc_json* SubchannelNode::RenderJson() { call_counter_.PopulateCallCounts(json); json = top_level_json; // populate the child socket. - intptr_t socket_uuid = subchannel_->GetChildSocketUuid(); + intptr_t socket_uuid = grpc_subchannel_get_child_socket_uuid(subchannel_); if (socket_uuid != 0) { grpc_json* array_parent = grpc_json_create_child( nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false); diff --git a/src/core/ext/filters/client_channel/client_channel_channelz.h b/src/core/ext/filters/client_channel/client_channel_channelz.h index 1dc1bf595be..8a5c3e7e5e5 100644 --- a/src/core/ext/filters/client_channel/client_channel_channelz.h +++ b/src/core/ext/filters/client_channel/client_channel_channelz.h @@ -26,10 +26,9 @@ #include "src/core/lib/channel/channel_trace.h" #include "src/core/lib/channel/channelz.h" -namespace grpc_core { - -class Subchannel; +typedef struct grpc_subchannel grpc_subchannel; +namespace grpc_core { namespace channelz { // Subtype of ChannelNode that overrides and provides client_channel specific @@ -60,7 +59,7 @@ class ClientChannelNode : public ChannelNode { // Handles channelz bookkeeping for sockets class SubchannelNode : public BaseNode { public: - SubchannelNode(Subchannel* subchannel, size_t channel_tracer_max_nodes); + SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes); ~SubchannelNode() override; void MarkSubchannelDestroyed() { @@ -85,7 +84,7 @@ class SubchannelNode : public BaseNode { void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); } private: - Subchannel* subchannel_; + grpc_subchannel* subchannel_; UniquePtr target_; CallCountingHelper call_counter_; ChannelTrace trace_; diff --git a/src/core/ext/filters/client_channel/client_channel_factory.cc b/src/core/ext/filters/client_channel/client_channel_factory.cc index 8c558382fdf..130bbe04180 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.cc +++ b/src/core/ext/filters/client_channel/client_channel_factory.cc @@ -29,7 +29,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) { factory->vtable->unref(factory); } -grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( +grpc_subchannel* grpc_client_channel_factory_create_subchannel( grpc_client_channel_factory* factory, const grpc_channel_args* args) { return factory->vtable->create_subchannel(factory, args); } diff --git a/src/core/ext/filters/client_channel/client_channel_factory.h b/src/core/ext/filters/client_channel/client_channel_factory.h index 4b72aa46499..91dec12282f 100644 --- a/src/core/ext/filters/client_channel/client_channel_factory.h +++ b/src/core/ext/filters/client_channel/client_channel_factory.h @@ -48,8 +48,8 @@ struct grpc_client_channel_factory { struct grpc_client_channel_factory_vtable { void (*ref)(grpc_client_channel_factory* factory); void (*unref)(grpc_client_channel_factory* factory); - grpc_core::Subchannel* (*create_subchannel)( - grpc_client_channel_factory* factory, const grpc_channel_args* args); + grpc_subchannel* (*create_subchannel)(grpc_client_channel_factory* factory, + const grpc_channel_args* args); grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory, const char* target, grpc_client_channel_type type, @@ -60,7 +60,7 @@ void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory); void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory); /** Create a new grpc_subchannel */ -grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel( +grpc_subchannel* grpc_client_channel_factory_create_subchannel( grpc_client_channel_factory* factory, const grpc_channel_args* args); /** Create a new grpc_channel */ diff --git a/src/core/ext/filters/client_channel/global_subchannel_pool.cc b/src/core/ext/filters/client_channel/global_subchannel_pool.cc index ee6e58159a0..a41d993fe66 100644 --- a/src/core/ext/filters/client_channel/global_subchannel_pool.cc +++ b/src/core/ext/filters/client_channel/global_subchannel_pool.cc @@ -54,9 +54,9 @@ RefCountedPtr GlobalSubchannelPool::instance() { return *instance_; } -Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key, - Subchannel* constructed) { - Subchannel* c = nullptr; +grpc_subchannel* GlobalSubchannelPool::RegisterSubchannel( + SubchannelKey* key, grpc_subchannel* constructed) { + grpc_subchannel* c = nullptr; // Compare and swap (CAS) loop: while (c == nullptr) { // Ref the shared map to have a local copy. @@ -64,7 +64,7 @@ Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key, grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr); gpr_mu_unlock(&mu_); // Check to see if a subchannel already exists. - c = static_cast(grpc_avl_get(old_map, key, nullptr)); + c = static_cast(grpc_avl_get(old_map, key, nullptr)); if (c != nullptr) { // The subchannel already exists. Reuse it. c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "subchannel_register+reuse"); @@ -121,14 +121,15 @@ void GlobalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) { } } -Subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) { +grpc_subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) { // Lock, and take a reference to the subchannel map. // We don't need to do the search under a lock as AVL's are immutable. gpr_mu_lock(&mu_); grpc_avl index = grpc_avl_ref(subchannel_map_, nullptr); gpr_mu_unlock(&mu_); - Subchannel* c = static_cast(grpc_avl_get(index, key, nullptr)); - if (c != nullptr) GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "found_from_pool"); + grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( + static_cast(grpc_avl_get(index, key, nullptr)), + "found_from_pool"); grpc_avl_unref(index, nullptr); return c; } @@ -155,11 +156,11 @@ long sck_avl_compare(void* a, void* b, void* unused) { } void scv_avl_destroy(void* p, void* user_data) { - GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool"); + GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "global_subchannel_pool"); } void* scv_avl_copy(void* p, void* unused) { - GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool"); + GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "global_subchannel_pool"); return p; } diff --git a/src/core/ext/filters/client_channel/global_subchannel_pool.h b/src/core/ext/filters/client_channel/global_subchannel_pool.h index 96dc8d7b3a4..0deb3769360 100644 --- a/src/core/ext/filters/client_channel/global_subchannel_pool.h +++ b/src/core/ext/filters/client_channel/global_subchannel_pool.h @@ -45,10 +45,10 @@ class GlobalSubchannelPool final : public SubchannelPoolInterface { static RefCountedPtr instance(); // Implements interface methods. - Subchannel* RegisterSubchannel(SubchannelKey* key, - Subchannel* constructed) override; + grpc_subchannel* RegisterSubchannel(SubchannelKey* key, + grpc_subchannel* constructed) override; void UnregisterSubchannel(SubchannelKey* key) override; - Subchannel* FindSubchannel(SubchannelKey* key) override; + grpc_subchannel* FindSubchannel(SubchannelKey* key) override; private: // The singleton instance. (It's a pointer to RefCountedPtr so that this diff --git a/src/core/ext/filters/client_channel/health/health_check_client.cc b/src/core/ext/filters/client_channel/health/health_check_client.cc index e845d63d295..2232c57120e 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.cc +++ b/src/core/ext/filters/client_channel/health/health_check_client.cc @@ -295,9 +295,7 @@ HealthCheckClient::CallState::~CallState() { gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p", health_check_client_.get(), this); } - // The subchannel call is in the arena, so reset the pointer before we destroy - // the arena. - call_.reset(); + if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended"); for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (context_[i].destroy != nullptr) { context_[i].destroy(context_[i].value); @@ -331,8 +329,8 @@ void HealthCheckClient::CallState::StartCall() { &call_combiner_, 0, // parent_data_size }; - grpc_error* error = GRPC_ERROR_NONE; - call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error); + grpc_error* error = + health_check_client_->connected_subchannel_->CreateCall(args, &call_); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "HealthCheckClient %p CallState %p: error creating health " @@ -425,14 +423,14 @@ void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg, grpc_error* error) { grpc_transport_stream_op_batch* batch = static_cast(arg); - SubchannelCall* call = - static_cast(batch->handler_private.extra_arg); - call->StartTransportStreamOpBatch(batch); + grpc_subchannel_call* call = + static_cast(batch->handler_private.extra_arg); + grpc_subchannel_call_process_op(call, batch); } void HealthCheckClient::CallState::StartBatch( grpc_transport_stream_op_batch* batch) { - batch->handler_private.extra_arg = call_.get(); + batch->handler_private.extra_arg = call_; GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner, batch, grpc_schedule_on_exec_ctx); GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure, @@ -454,7 +452,7 @@ void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) { GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx)); batch->cancel_stream = true; batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED; - self->call_->StartTransportStreamOpBatch(batch); + grpc_subchannel_call_process_op(self->call_, batch); } void HealthCheckClient::CallState::Cancel() { diff --git a/src/core/ext/filters/client_channel/health/health_check_client.h b/src/core/ext/filters/client_channel/health/health_check_client.h index 7af88a54cfc..2369b73feac 100644 --- a/src/core/ext/filters/client_channel/health/health_check_client.h +++ b/src/core/ext/filters/client_channel/health/health_check_client.h @@ -99,7 +99,7 @@ class HealthCheckClient : public InternallyRefCounted { grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; // The streaming call to the backend. Always non-NULL. - RefCountedPtr call_; + grpc_subchannel_call* call_; grpc_transport_stream_op_batch_payload payload_; grpc_transport_stream_op_batch batch_; diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index dc716a6adac..ec5c782c469 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -79,7 +79,7 @@ class PickFirst : public LoadBalancingPolicy { PickFirstSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) : SubchannelData(subchannel_list, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index aab6dd68216..30316689ea7 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -94,7 +94,7 @@ class RoundRobin : public LoadBalancingPolicy { RoundRobinSubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) : SubchannelData(subchannel_list, address, subchannel, combiner) {} diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h index 0174a98a73d..2eb92b7ead0 100644 --- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h +++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h @@ -88,7 +88,7 @@ class SubchannelData { } // Returns a pointer to the subchannel. - Subchannel* subchannel() const { return subchannel_; } + grpc_subchannel* subchannel() const { return subchannel_; } // Returns the connected subchannel. Will be null if the subchannel // is not connected. @@ -103,8 +103,8 @@ class SubchannelData { // ProcessConnectivityChangeLocked()). grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) { GPR_ASSERT(!connectivity_notification_pending_); - pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity( - error, subchannel_list_->inhibit_health_checking()); + pending_connectivity_state_unsafe_ = grpc_subchannel_check_connectivity( + subchannel(), error, subchannel_list_->inhibit_health_checking()); UpdateConnectedSubchannelLocked(); return pending_connectivity_state_unsafe_; } @@ -142,7 +142,7 @@ class SubchannelData { protected: SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner); virtual ~SubchannelData(); @@ -170,7 +170,7 @@ class SubchannelData { SubchannelList* subchannel_list_; // The subchannel and connected subchannel. - Subchannel* subchannel_; + grpc_subchannel* subchannel_; RefCountedPtr connected_subchannel_; // Notification that connectivity has changed on subchannel. @@ -203,7 +203,7 @@ class SubchannelList : public InternallyRefCounted { for (size_t i = 0; i < subchannels_.size(); ++i) { if (subchannels_[i].subchannel() != nullptr) { grpc_core::channelz::SubchannelNode* subchannel_node = - subchannels_[i].subchannel()->channelz_node(); + grpc_subchannel_get_channelz_node(subchannels_[i].subchannel()); if (subchannel_node != nullptr) { refs_list->push_back(subchannel_node->uuid()); } @@ -276,7 +276,7 @@ class SubchannelList : public InternallyRefCounted { template SubchannelData::SubchannelData( SubchannelList* subchannel_list, - const ServerAddress& address, Subchannel* subchannel, + const ServerAddress& address, grpc_subchannel* subchannel, grpc_combiner* combiner) : subchannel_list_(subchannel_list), subchannel_(subchannel), @@ -317,7 +317,7 @@ template void SubchannelData::ResetBackoffLocked() { if (subchannel_ != nullptr) { - subchannel_->ResetBackoff(); + grpc_subchannel_reset_backoff(subchannel_); } } @@ -337,8 +337,8 @@ void SubchannelDataRef(DEBUG_LOCATION, "connectivity_watch").release(); - subchannel_->NotifyOnStateChange( - subchannel_list_->policy()->interested_parties(), + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, subchannel_list_->inhibit_health_checking()); } @@ -357,8 +357,8 @@ void SubchannelDataNotifyOnStateChange( - subchannel_list_->policy()->interested_parties(), + grpc_subchannel_notify_on_state_change( + subchannel_, subchannel_list_->policy()->interested_parties(), &pending_connectivity_state_unsafe_, &connectivity_changed_closure_, subchannel_list_->inhibit_health_checking()); } @@ -391,9 +391,9 @@ void SubchannelData:: subchannel_, reason); } GPR_ASSERT(connectivity_notification_pending_); - subchannel_->NotifyOnStateChange(nullptr, nullptr, - &connectivity_changed_closure_, - subchannel_list_->inhibit_health_checking()); + grpc_subchannel_notify_on_state_change( + subchannel_, nullptr, nullptr, &connectivity_changed_closure_, + subchannel_list_->inhibit_health_checking()); } template @@ -401,7 +401,8 @@ bool SubchannelData::UpdateConnectedSubchannelLocked() { // If the subchannel is READY, take a ref to the connected subchannel. if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) { - connected_subchannel_ = subchannel_->connected_subchannel(); + connected_subchannel_ = + grpc_subchannel_get_connected_subchannel(subchannel_); // If the subchannel became disconnected between the time that READY // was reported and the time we got here (e.g., between when a // notification callback is scheduled and when it was actually run in @@ -517,7 +518,7 @@ SubchannelList::SubchannelList( SubchannelPoolInterface::CreateChannelArg(policy_->subchannel_pool())); const size_t subchannel_address_arg_index = args_to_add.size(); args_to_add.emplace_back( - Subchannel::CreateSubchannelAddressArg(&addresses[i].address())); + grpc_create_subchannel_address_arg(&addresses[i].address())); if (addresses[i].args() != nullptr) { for (size_t j = 0; j < addresses[i].args()->num_args; ++j) { args_to_add.emplace_back(addresses[i].args()->args[j]); @@ -527,7 +528,7 @@ SubchannelList::SubchannelList( &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add.data(), args_to_add.size()); gpr_free(args_to_add[subchannel_address_arg_index].value.string); - Subchannel* subchannel = grpc_client_channel_factory_create_subchannel( + grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel( client_channel_factory, new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { diff --git a/src/core/ext/filters/client_channel/local_subchannel_pool.cc b/src/core/ext/filters/client_channel/local_subchannel_pool.cc index d1c1cacb441..145fa4e0374 100644 --- a/src/core/ext/filters/client_channel/local_subchannel_pool.cc +++ b/src/core/ext/filters/client_channel/local_subchannel_pool.cc @@ -32,11 +32,11 @@ LocalSubchannelPool::~LocalSubchannelPool() { grpc_avl_unref(subchannel_map_, nullptr); } -Subchannel* LocalSubchannelPool::RegisterSubchannel(SubchannelKey* key, - Subchannel* constructed) { +grpc_subchannel* LocalSubchannelPool::RegisterSubchannel( + SubchannelKey* key, grpc_subchannel* constructed) { // Check to see if a subchannel already exists. - Subchannel* c = - static_cast(grpc_avl_get(subchannel_map_, key, nullptr)); + grpc_subchannel* c = static_cast( + grpc_avl_get(subchannel_map_, key, nullptr)); if (c != nullptr) { // The subchannel already exists. Reuse it. c = GRPC_SUBCHANNEL_REF(c, "subchannel_register+reuse"); @@ -54,9 +54,9 @@ void LocalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) { subchannel_map_ = grpc_avl_remove(subchannel_map_, key, nullptr); } -Subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) { - Subchannel* c = - static_cast(grpc_avl_get(subchannel_map_, key, nullptr)); +grpc_subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) { + grpc_subchannel* c = static_cast( + grpc_avl_get(subchannel_map_, key, nullptr)); return c == nullptr ? c : GRPC_SUBCHANNEL_REF(c, "found_from_pool"); } diff --git a/src/core/ext/filters/client_channel/local_subchannel_pool.h b/src/core/ext/filters/client_channel/local_subchannel_pool.h index a6b7e259fbb..9929cdb3627 100644 --- a/src/core/ext/filters/client_channel/local_subchannel_pool.h +++ b/src/core/ext/filters/client_channel/local_subchannel_pool.h @@ -39,10 +39,10 @@ class LocalSubchannelPool final : public SubchannelPoolInterface { // Implements interface methods. // Thread-unsafe. Intended to be invoked within the client_channel combiner. - Subchannel* RegisterSubchannel(SubchannelKey* key, - Subchannel* constructed) override; + grpc_subchannel* RegisterSubchannel(SubchannelKey* key, + grpc_subchannel* constructed) override; void UnregisterSubchannel(SubchannelKey* key) override; - Subchannel* FindSubchannel(SubchannelKey* key) override; + grpc_subchannel* FindSubchannel(SubchannelKey* key) override; private: // The vtable for subchannel operations in an AVL tree. diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 70285659aad..d77bb3c286b 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -44,6 +44,7 @@ #include "src/core/lib/gprpp/mutex_lock.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/channel.h" @@ -54,256 +55,153 @@ #include "src/core/lib/transport/status_metadata.h" #include "src/core/lib/uri/uri_parser.h" -// Strong and weak refs. #define INTERNAL_REF_BITS 16 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1)) -// Backoff parameters. #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2 -// Conversion between subchannel call and call stack. -#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ - (grpc_call_stack*)((char*)(call) + \ - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) -#define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \ - (SubchannelCall*)(((char*)(call_stack)) - \ - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall))) +typedef struct external_state_watcher { + grpc_subchannel* subchannel; + grpc_pollset_set* pollset_set; + grpc_closure* notify; + grpc_closure closure; + struct external_state_watcher* next; + struct external_state_watcher* prev; +} external_state_watcher; namespace grpc_core { -// -// ConnectedSubchannel -// - -ConnectedSubchannel::ConnectedSubchannel( - grpc_channel_stack* channel_stack, const grpc_channel_args* args, - RefCountedPtr channelz_subchannel, - intptr_t socket_uuid) - : RefCounted(&grpc_trace_stream_refcount), - channel_stack_(channel_stack), - args_(grpc_channel_args_copy(args)), - channelz_subchannel_(std::move(channelz_subchannel)), - socket_uuid_(socket_uuid) {} +class ConnectedSubchannelStateWatcher; -ConnectedSubchannel::~ConnectedSubchannel() { - grpc_channel_args_destroy(args_); - GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); -} +} // namespace grpc_core -void ConnectedSubchannel::NotifyOnStateChange( - grpc_pollset_set* interested_parties, grpc_connectivity_state* state, - grpc_closure* closure) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->connectivity_state = state; - op->on_connectivity_state_change = closure; - op->bind_pollset_set = interested_parties; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} +struct grpc_subchannel { + /** The subchannel pool this subchannel is in */ + grpc_core::RefCountedPtr subchannel_pool; -void ConnectedSubchannel::Ping(grpc_closure* on_initiate, - grpc_closure* on_ack) { - grpc_transport_op* op = grpc_make_transport_op(nullptr); - grpc_channel_element* elem; - op->send_ping.on_initiate = on_initiate; - op->send_ping.on_ack = on_ack; - elem = grpc_channel_stack_element(channel_stack_, 0); - elem->filter->start_transport_op(elem, op); -} + grpc_connector* connector; -namespace { + /** refcount + - lower INTERNAL_REF_BITS bits are for internal references: + these do not keep the subchannel open. + - upper remaining bits are for public references: these do + keep the subchannel open */ + gpr_atm ref_pair; -void SubchannelCallDestroy(void* arg, grpc_error* error) { - GPR_TIMER_SCOPE("subchannel_call_destroy", 0); - SubchannelCall* call = static_cast(arg); - grpc_closure* after_call_stack_destroy = call->after_call_stack_destroy(); - call->~SubchannelCall(); - // This should be the last step to destroy the subchannel call, because - // call->after_call_stack_destroy(), if not null, will free the call arena. - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call), nullptr, - after_call_stack_destroy); -} + /** channel arguments */ + grpc_channel_args* args; -} // namespace + grpc_core::SubchannelKey* key; -RefCountedPtr ConnectedSubchannel::CreateCall( - const CallArgs& args, grpc_error** error) { - const size_t allocation_size = - GetInitialCallSizeEstimate(args.parent_data_size); - RefCountedPtr call( - new (gpr_arena_alloc(args.arena, allocation_size)) - SubchannelCall(Ref(DEBUG_LOCATION, "subchannel_call"), args)); - grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call.get()); - const grpc_call_element_args call_args = { - callstk, /* call_stack */ - nullptr, /* server_transport_data */ - args.context, /* context */ - args.path, /* path */ - args.start_time, /* start_time */ - args.deadline, /* deadline */ - args.arena, /* arena */ - args.call_combiner /* call_combiner */ - }; - *error = grpc_call_stack_init(channel_stack_, 1, SubchannelCallDestroy, - call.get(), &call_args); - if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) { - const char* error_string = grpc_error_string(*error); - gpr_log(GPR_ERROR, "error: %s", error_string); - return call; - } - grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); - if (channelz_subchannel_ != nullptr) { - channelz_subchannel_->RecordCallStarted(); - } - return call; -} + /** set during connection */ + grpc_connect_out_args connecting_result; -size_t ConnectedSubchannel::GetInitialCallSizeEstimate( - size_t parent_data_size) const { - size_t allocation_size = - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)); - if (parent_data_size > 0) { - allocation_size += - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + - parent_data_size; - } else { - allocation_size += channel_stack_->call_stack_size; - } - return allocation_size; -} + /** callback for connection finishing */ + grpc_closure on_connected; -// -// SubchannelCall -// + /** callback for our alarm */ + grpc_closure on_alarm; -void SubchannelCall::StartTransportStreamOpBatch( - grpc_transport_stream_op_batch* batch) { - GPR_TIMER_SCOPE("subchannel_call_process_op", 0); - MaybeInterceptRecvTrailingMetadata(batch); - grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this); - grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); - GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); - top_elem->filter->start_transport_stream_op_batch(top_elem, batch); -} + /** pollset_set tracking who's interested in a connection + being setup */ + grpc_pollset_set* pollset_set; -void* SubchannelCall::GetParentData() { - grpc_channel_stack* chanstk = connected_subchannel_->channel_stack(); - return (char*)this + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) + - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); -} + grpc_core::UniquePtr health_check_service_name; -grpc_call_stack* SubchannelCall::GetCallStack() { - return SUBCHANNEL_CALL_TO_CALL_STACK(this); -} + /** mutex protecting remaining elements */ + gpr_mu mu; -void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) { - GPR_ASSERT(after_call_stack_destroy_ == nullptr); - GPR_ASSERT(closure != nullptr); - after_call_stack_destroy_ = closure; -} + /** active connection, or null */ + grpc_core::RefCountedPtr connected_subchannel; + grpc_core::OrphanablePtr + connected_subchannel_watcher; -RefCountedPtr SubchannelCall::Ref() { - IncrementRefCount(); - return RefCountedPtr(this); -} + /** have we seen a disconnection? */ + bool disconnected; + /** are we connecting */ + bool connecting; -RefCountedPtr SubchannelCall::Ref( - const grpc_core::DebugLocation& location, const char* reason) { - IncrementRefCount(location, reason); - return RefCountedPtr(this); -} + /** connectivity state tracking */ + grpc_connectivity_state_tracker state_tracker; + grpc_connectivity_state_tracker state_and_health_tracker; -void SubchannelCall::Unref() { - GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); -} + external_state_watcher root_external_state_watcher; -void SubchannelCall::Unref(const DebugLocation& location, const char* reason) { - GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); -} + /** backoff state */ + grpc_core::ManualConstructor backoff; + grpc_millis next_attempt_deadline; + grpc_millis min_connect_timeout_ms; -void SubchannelCall::MaybeInterceptRecvTrailingMetadata( - grpc_transport_stream_op_batch* batch) { - // only intercept payloads with recv trailing. - if (!batch->recv_trailing_metadata) { - return; - } - // only add interceptor is channelz is enabled. - if (connected_subchannel_->channelz_subchannel() == nullptr) { - return; - } - GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady, - this, grpc_schedule_on_exec_ctx); - // save some state needed for the interception callback. - GPR_ASSERT(recv_trailing_metadata_ == nullptr); - recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata; - original_recv_trailing_metadata_ = - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; - batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = - &recv_trailing_metadata_ready_; -} + /** do we have an active alarm? */ + bool have_alarm; + /** have we started the backoff loop */ + bool backoff_begun; + // reset_backoff() was called while alarm was pending + bool retry_immediately; + /** our alarm */ + grpc_timer alarm; -namespace { + grpc_core::RefCountedPtr + channelz_subchannel; +}; -// Sets *status based on the rest of the parameters. -void GetCallStatus(grpc_status_code* status, grpc_millis deadline, - grpc_metadata_batch* md_batch, grpc_error* error) { - if (error != GRPC_ERROR_NONE) { - grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr); - } else { - if (md_batch->idx.named.grpc_status != nullptr) { - *status = grpc_get_status_code_from_metadata( - md_batch->idx.named.grpc_status->md); - } else { - *status = GRPC_STATUS_UNKNOWN; - } - } - GRPC_ERROR_UNREF(error); -} +struct grpc_subchannel_call { + grpc_subchannel_call(grpc_core::ConnectedSubchannel* connection, + const grpc_core::ConnectedSubchannel::CallArgs& args) + : connection(connection), deadline(args.deadline) {} + + grpc_core::ConnectedSubchannel* connection; + grpc_closure* schedule_closure_after_destroy = nullptr; + // state needed to support channelz interception of recv trailing metadata. + grpc_closure recv_trailing_metadata_ready; + grpc_closure* original_recv_trailing_metadata; + grpc_metadata_batch* recv_trailing_metadata = nullptr; + grpc_millis deadline; +}; -} // namespace +static void maybe_start_connecting_locked(grpc_subchannel* c); -void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) { - SubchannelCall* call = static_cast(arg); - GPR_ASSERT(call->recv_trailing_metadata_ != nullptr); - grpc_status_code status = GRPC_STATUS_OK; - GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, - GRPC_ERROR_REF(error)); - channelz::SubchannelNode* channelz_subchannel = - call->connected_subchannel_->channelz_subchannel(); - GPR_ASSERT(channelz_subchannel != nullptr); - if (status == GRPC_STATUS_OK) { - channelz_subchannel->RecordCallSucceeded(); - } else { - channelz_subchannel->RecordCallFailed(); +static const char* subchannel_connectivity_state_change_string( + grpc_connectivity_state state) { + switch (state) { + case GRPC_CHANNEL_IDLE: + return "Subchannel state change to IDLE"; + case GRPC_CHANNEL_CONNECTING: + return "Subchannel state change to CONNECTING"; + case GRPC_CHANNEL_READY: + return "Subchannel state change to READY"; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + return "Subchannel state change to TRANSIENT_FAILURE"; + case GRPC_CHANNEL_SHUTDOWN: + return "Subchannel state change to SHUTDOWN"; } - GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata_, - GRPC_ERROR_REF(error)); -} - -void SubchannelCall::IncrementRefCount() { - GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), ""); + GPR_UNREACHABLE_CODE(return "UNKNOWN"); } -void SubchannelCall::IncrementRefCount(const grpc_core::DebugLocation& location, - const char* reason) { - GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason); +static void set_subchannel_connectivity_state_locked( + grpc_subchannel* c, grpc_connectivity_state state, grpc_error* error, + const char* reason) { + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string( + subchannel_connectivity_state_change_string(state))); + } + grpc_connectivity_state_set(&c->state_tracker, state, error, reason); } -// -// Subchannel::ConnectedSubchannelStateWatcher -// +namespace grpc_core { -class Subchannel::ConnectedSubchannelStateWatcher +class ConnectedSubchannelStateWatcher : public InternallyRefCounted { public: // Must be instantiated while holding c->mu. - explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) { + explicit ConnectedSubchannelStateWatcher(grpc_subchannel* c) + : subchannel_(c) { // Steal subchannel ref for connecting. GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting"); @@ -311,15 +209,15 @@ class Subchannel::ConnectedSubchannelStateWatcher // Callback uses initial ref to this. GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChanged, this, grpc_schedule_on_exec_ctx); - c->connected_subchannel_->NotifyOnStateChange(c->pollset_set_, - &pending_connectivity_state_, - &on_connectivity_changed_); + c->connected_subchannel->NotifyOnStateChange(c->pollset_set, + &pending_connectivity_state_, + &on_connectivity_changed_); // Start health check if needed. grpc_connectivity_state health_state = GRPC_CHANNEL_READY; - if (c->health_check_service_name_ != nullptr) { - health_check_client_ = MakeOrphanable( - c->health_check_service_name_.get(), c->connected_subchannel_, - c->pollset_set_, c->channelz_node_); + if (c->health_check_service_name != nullptr) { + health_check_client_ = grpc_core::MakeOrphanable( + c->health_check_service_name.get(), c->connected_subchannel, + c->pollset_set, c->channelz_subchannel); GRPC_CLOSURE_INIT(&on_health_changed_, OnHealthChanged, this, grpc_schedule_on_exec_ctx); Ref().release(); // Ref for health callback tracked manually. @@ -328,9 +226,9 @@ class Subchannel::ConnectedSubchannelStateWatcher health_state = GRPC_CHANNEL_CONNECTING; } // Report initial state. - c->SetConnectivityStateLocked(GRPC_CHANNEL_READY, GRPC_ERROR_NONE, - "subchannel_connected"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, health_state, + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "subchannel_connected"); + grpc_connectivity_state_set(&c->state_and_health_tracker, health_state, GRPC_ERROR_NONE, "subchannel_connected"); } @@ -344,33 +242,33 @@ class Subchannel::ConnectedSubchannelStateWatcher private: static void OnConnectivityChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); - Subchannel* c = self->subchannel_; + grpc_subchannel* c = self->subchannel_; { - MutexLock lock(&c->mu_); + MutexLock lock(&c->mu); switch (self->pending_connectivity_state_) { case GRPC_CHANNEL_TRANSIENT_FAILURE: case GRPC_CHANNEL_SHUTDOWN: { - if (!c->disconnected_ && c->connected_subchannel_ != nullptr) { + if (!c->disconnected && c->connected_subchannel != nullptr) { if (grpc_trace_stream_refcount.enabled()) { gpr_log(GPR_INFO, "Connected subchannel %p of subchannel %p has gone into " "%s. Attempting to reconnect.", - c->connected_subchannel_.get(), c, + c->connected_subchannel.get(), c, grpc_connectivity_state_name( self->pending_connectivity_state_)); } - c->connected_subchannel_.reset(); - c->connected_subchannel_watcher_.reset(); + c->connected_subchannel.reset(); + c->connected_subchannel_watcher.reset(); self->last_connectivity_state_ = GRPC_CHANNEL_TRANSIENT_FAILURE; - c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, - GRPC_ERROR_REF(error), - "reflect_child"); - grpc_connectivity_state_set(&c->state_and_health_tracker_, + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), + "reflect_child"); + grpc_connectivity_state_set(&c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "reflect_child"); - c->backoff_begun_ = false; - c->backoff_.Reset(); - c->MaybeStartConnectingLocked(); + c->backoff_begun = false; + c->backoff->Reset(); + maybe_start_connecting_locked(c); } else { self->last_connectivity_state_ = GRPC_CHANNEL_SHUTDOWN; } @@ -383,14 +281,15 @@ class Subchannel::ConnectedSubchannelStateWatcher // this watch from. And a connected subchannel should never go // from READY to CONNECTING or IDLE. self->last_connectivity_state_ = self->pending_connectivity_state_; - c->SetConnectivityStateLocked(self->pending_connectivity_state_, - GRPC_ERROR_REF(error), "reflect_child"); + set_subchannel_connectivity_state_locked( + c, self->pending_connectivity_state_, GRPC_ERROR_REF(error), + "reflect_child"); if (self->pending_connectivity_state_ != GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker_, + grpc_connectivity_state_set(&c->state_and_health_tracker, self->pending_connectivity_state_, GRPC_ERROR_REF(error), "reflect_child"); } - c->connected_subchannel_->NotifyOnStateChange( + c->connected_subchannel->NotifyOnStateChange( nullptr, &self->pending_connectivity_state_, &self->on_connectivity_changed_); self = nullptr; // So we don't unref below. @@ -404,14 +303,14 @@ class Subchannel::ConnectedSubchannelStateWatcher static void OnHealthChanged(void* arg, grpc_error* error) { auto* self = static_cast(arg); - Subchannel* c = self->subchannel_; - MutexLock lock(&c->mu_); + grpc_subchannel* c = self->subchannel_; + MutexLock lock(&c->mu); if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) { self->Unref(); return; } if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) { - grpc_connectivity_state_set(&c->state_and_health_tracker_, + grpc_connectivity_state_set(&c->state_and_health_tracker, self->health_state_, GRPC_ERROR_REF(error), "health_changed"); } @@ -419,63 +318,163 @@ class Subchannel::ConnectedSubchannelStateWatcher &self->on_health_changed_); } - Subchannel* subchannel_; + grpc_subchannel* subchannel_; grpc_closure on_connectivity_changed_; grpc_connectivity_state pending_connectivity_state_ = GRPC_CHANNEL_READY; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_READY; - OrphanablePtr health_check_client_; + grpc_core::OrphanablePtr health_check_client_; grpc_closure on_health_changed_; grpc_connectivity_state health_state_ = GRPC_CHANNEL_CONNECTING; }; -// -// Subchannel::ExternalStateWatcher -// +} // namespace grpc_core + +#define SUBCHANNEL_CALL_TO_CALL_STACK(call) \ + (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ + sizeof(grpc_subchannel_call))) +#define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \ + (grpc_subchannel_call*)(((char*)(call_stack)) - \ + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \ + sizeof(grpc_subchannel_call))) -struct Subchannel::ExternalStateWatcher { - ExternalStateWatcher(Subchannel* subchannel, grpc_pollset_set* pollset_set, - grpc_closure* notify) - : subchannel(subchannel), pollset_set(pollset_set), notify(notify) { - GRPC_SUBCHANNEL_WEAK_REF(subchannel, "external_state_watcher+init"); - GRPC_CLOSURE_INIT(&on_state_changed, OnStateChanged, this, - grpc_schedule_on_exec_ctx); +static void on_subchannel_connected(void* subchannel, grpc_error* error); + +#ifndef NDEBUG +#define REF_REASON reason +#define REF_MUTATE_EXTRA_ARGS \ + GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose +#define REF_MUTATE_PURPOSE(x) , file, line, reason, x +#else +#define REF_REASON "" +#define REF_MUTATE_EXTRA_ARGS +#define REF_MUTATE_PURPOSE(x) +#endif + +/* + * connection implementation + */ + +static void connection_destroy(void* arg, grpc_error* error) { + grpc_channel_stack* stk = static_cast(arg); + grpc_channel_stack_destroy(stk); + gpr_free(stk); +} + +/* + * grpc_subchannel implementation + */ + +static void subchannel_destroy(void* arg, grpc_error* error) { + grpc_subchannel* c = static_cast(arg); + if (c->channelz_subchannel != nullptr) { + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel destroyed")); + c->channelz_subchannel->MarkSubchannelDestroyed(); + c->channelz_subchannel.reset(); } + c->health_check_service_name.reset(); + grpc_channel_args_destroy(c->args); + grpc_connectivity_state_destroy(&c->state_tracker); + grpc_connectivity_state_destroy(&c->state_and_health_tracker); + grpc_connector_unref(c->connector); + grpc_pollset_set_destroy(c->pollset_set); + grpc_core::Delete(c->key); + gpr_mu_destroy(&c->mu); + gpr_free(c); +} - static void OnStateChanged(void* arg, grpc_error* error) { - ExternalStateWatcher* w = static_cast(arg); - grpc_closure* follow_up = w->notify; - if (w->pollset_set != nullptr) { - grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set_, - w->pollset_set); - } - gpr_mu_lock(&w->subchannel->mu_); - if (w->subchannel->external_state_watcher_list_ == w) { - w->subchannel->external_state_watcher_list_ = w->next; +static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta, + int barrier REF_MUTATE_EXTRA_ARGS) { + gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta) + : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); +#ifndef NDEBUG + if (grpc_trace_stream_refcount.enabled()) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, + purpose, old_val, old_val + delta, reason); + } +#endif + return old_val; +} + +grpc_subchannel* grpc_subchannel_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), + 0 REF_MUTATE_PURPOSE("STRONG_REF")); + GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); + return c; +} + +grpc_subchannel* grpc_subchannel_weak_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); + GPR_ASSERT(old_refs != 0); + return c; +} + +grpc_subchannel* grpc_subchannel_ref_from_weak_ref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + if (!c) return nullptr; + for (;;) { + gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair); + if (old_refs >= (1 << INTERNAL_REF_BITS)) { + gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); + if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) { + return c; + } + } else { + return nullptr; } - if (w->next != nullptr) w->next->prev = w->prev; - if (w->prev != nullptr) w->prev->next = w->next; - gpr_mu_unlock(&w->subchannel->mu_); - GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher+done"); - Delete(w); - GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } +} - Subchannel* subchannel; - grpc_pollset_set* pollset_set; - grpc_closure* notify; - grpc_closure on_state_changed; - ExternalStateWatcher* next = nullptr; - ExternalStateWatcher* prev = nullptr; -}; +static void disconnect(grpc_subchannel* c) { + // The subchannel_pool is only used once here in this subchannel, so the + // access can be outside of the lock. + if (c->subchannel_pool != nullptr) { + c->subchannel_pool->UnregisterSubchannel(c->key); + c->subchannel_pool.reset(); + } + gpr_mu_lock(&c->mu); + GPR_ASSERT(!c->disconnected); + c->disconnected = true; + grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Subchannel disconnected")); + c->connected_subchannel.reset(); + c->connected_subchannel_watcher.reset(); + gpr_mu_unlock(&c->mu); +} -// -// Subchannel -// +void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + // add a weak ref and subtract a strong ref (atomically) + old_refs = ref_mutate( + c, static_cast(1) - static_cast(1 << INTERNAL_REF_BITS), + 1 REF_MUTATE_PURPOSE("STRONG_UNREF")); + if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { + disconnect(c); + } + GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref"); +} -namespace { +void grpc_subchannel_weak_unref( + grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + gpr_atm old_refs; + old_refs = ref_mutate(c, -static_cast(1), + 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); + if (old_refs == 1) { + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); + } +} -BackOff::Options ParseArgsForBackoffValues( - const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) { +static void parse_args_for_backoff_values( + const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options, + grpc_millis* min_connect_timeout_ms) { grpc_millis initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; *min_connect_timeout_ms = @@ -512,8 +511,7 @@ BackOff::Options ParseArgsForBackoffValues( } } } - return BackOff::Options() - .set_initial_backoff(initial_backoff_ms) + backoff_options->set_initial_backoff(initial_backoff_ms) .set_multiplier(fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER) @@ -522,6 +520,9 @@ BackOff::Options ParseArgsForBackoffValues( .set_max_backoff(max_backoff_ms); } +namespace grpc_core { +namespace { + struct HealthCheckParams { UniquePtr service_name; @@ -542,19 +543,31 @@ struct HealthCheckParams { }; } // namespace +} // namespace grpc_core -Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, - const grpc_channel_args* args) - : key_(key), - connector_(connector), - backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) { +grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, + const grpc_channel_args* args) { + grpc_core::SubchannelKey* key = + grpc_core::New(args); + grpc_core::SubchannelPoolInterface* subchannel_pool = + grpc_core::SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs( + args); + GPR_ASSERT(subchannel_pool != nullptr); + grpc_subchannel* c = subchannel_pool->FindSubchannel(key); + if (c != nullptr) { + grpc_core::Delete(key); + return c; + } GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED(); - gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS); - grpc_connector_ref(connector_); - pollset_set_ = grpc_pollset_set_create(); + c = static_cast(gpr_zalloc(sizeof(*c))); + c->key = key; + gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); + c->connector = connector; + grpc_connector_ref(c->connector); + c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address* addr = static_cast(gpr_malloc(sizeof(*addr))); - GetAddressFromSubchannelAddressArg(args, addr); + grpc_get_subchannel_address_arg(args, addr); grpc_resolved_address* new_address = nullptr; grpc_channel_args* new_args = nullptr; if (grpc_proxy_mappers_map_address(addr, args, &new_address, &new_args)) { @@ -563,492 +576,569 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector, addr = new_address; } static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = CreateSubchannelAddressArg(addr); + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); - args_ = grpc_channel_args_copy_and_add_and_remove( + c->args = grpc_channel_args_copy_and_add_and_remove( new_args != nullptr ? new_args : args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); gpr_free(new_arg.value.string); if (new_args != nullptr) grpc_channel_args_destroy(new_args); - GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this, + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = + &c->root_external_state_watcher; + GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c, grpc_schedule_on_exec_ctx); - grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - grpc_connectivity_state_init(&state_and_health_tracker_, GRPC_CHANNEL_IDLE, + grpc_connectivity_state_init(&c->state_and_health_tracker, GRPC_CHANNEL_IDLE, "subchannel"); - gpr_mu_init(&mu_); + grpc_core::BackOff::Options backoff_options; + parse_args_for_backoff_values(args, &backoff_options, + &c->min_connect_timeout_ms); + c->backoff.Init(backoff_options); + gpr_mu_init(&c->mu); + // Check whether we should enable health checking. const char* service_config_json = grpc_channel_arg_get_string( - grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG)); + grpc_channel_args_find(c->args, GRPC_ARG_SERVICE_CONFIG)); if (service_config_json != nullptr) { - UniquePtr service_config = - ServiceConfig::Create(service_config_json); + grpc_core::UniquePtr service_config = + grpc_core::ServiceConfig::Create(service_config_json); if (service_config != nullptr) { - HealthCheckParams params; - service_config->ParseGlobalParams(HealthCheckParams::Parse, ¶ms); - health_check_service_name_ = std::move(params.service_name); + grpc_core::HealthCheckParams params; + service_config->ParseGlobalParams(grpc_core::HealthCheckParams::Parse, + ¶ms); + c->health_check_service_name = std::move(params.service_name); } } - const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ); - const bool channelz_enabled = + + const grpc_arg* arg = + grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ); + bool channelz_enabled = grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT); arg = grpc_channel_args_find( - args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); + c->args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE); const grpc_integer_options options = { GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX}; size_t channel_tracer_max_memory = (size_t)grpc_channel_arg_get_integer(arg, options); if (channelz_enabled) { - channelz_node_ = MakeRefCounted( - this, channel_tracer_max_memory); - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("subchannel created")); - } -} - -Subchannel::~Subchannel() { - if (channelz_node_ != nullptr) { - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string("Subchannel destroyed")); - channelz_node_->MarkSubchannelDestroyed(); - } - grpc_channel_args_destroy(args_); - grpc_connectivity_state_destroy(&state_tracker_); - grpc_connectivity_state_destroy(&state_and_health_tracker_); - grpc_connector_unref(connector_); - grpc_pollset_set_destroy(pollset_set_); - Delete(key_); - gpr_mu_destroy(&mu_); -} - -Subchannel* Subchannel::Create(grpc_connector* connector, - const grpc_channel_args* args) { - SubchannelKey* key = New(args); - SubchannelPoolInterface* subchannel_pool = - SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args); - GPR_ASSERT(subchannel_pool != nullptr); - Subchannel* c = subchannel_pool->FindSubchannel(key); - if (c != nullptr) { - Delete(key); - return c; + c->channelz_subchannel = + grpc_core::MakeRefCounted( + c, channel_tracer_max_memory); + c->channelz_subchannel->AddTraceEvent( + grpc_core::channelz::ChannelTrace::Severity::Info, + grpc_slice_from_static_string("Subchannel created")); } - c = New(key, connector, args); // Try to register the subchannel before setting the subchannel pool. // Otherwise, in case of a registration race, unreffing c in - // RegisterSubchannel() will cause c to be tried to be unregistered, while - // its key maps to a different subchannel. - Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); - if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref(); + // RegisterSubchannel() will cause c to be tried to be unregistered, while its + // key maps to a different subchannel. + grpc_subchannel* registered = subchannel_pool->RegisterSubchannel(key, c); + if (registered == c) c->subchannel_pool = subchannel_pool->Ref(); return registered; } -Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate((1 << INTERNAL_REF_BITS), - 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF")); - GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0); - return this; +grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( + grpc_subchannel* subchannel) { + return subchannel->channelz_subchannel.get(); } -void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - // add a weak ref and subtract a strong ref (atomically) - old_refs = RefMutate( - static_cast(1) - static_cast(1 << INTERNAL_REF_BITS), - 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF")); - if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) { - Disconnect(); +intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel) { + if (subchannel->connected_subchannel != nullptr) { + return subchannel->connected_subchannel->socket_uuid(); + } else { + return 0; } - GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref"); -} - -Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF")); - GPR_ASSERT(old_refs != 0); - return this; } -namespace { - -void subchannel_destroy(void* arg, grpc_error* error) { - Subchannel* self = static_cast(arg); - Delete(self); +static void continue_connect_locked(grpc_subchannel* c) { + grpc_connect_in_args args; + args.interested_parties = c->pollset_set; + const grpc_millis min_deadline = + c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now(); + c->next_attempt_deadline = c->backoff->NextAttemptTime(); + args.deadline = std::max(c->next_attempt_deadline, min_deadline); + args.channel_args = c->args; + set_subchannel_connectivity_state_locked(c, GRPC_CHANNEL_CONNECTING, + GRPC_ERROR_NONE, "connecting"); + grpc_connectivity_state_set(&c->state_and_health_tracker, + GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, + "connecting"); + grpc_connector_connect(c->connector, &args, &c->connecting_result, + &c->on_connected); } -} // namespace - -void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - gpr_atm old_refs; - old_refs = RefMutate(-static_cast(1), - 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF")); - if (old_refs == 1) { - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(subchannel_destroy, this, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); - } +grpc_connectivity_state grpc_subchannel_check_connectivity( + grpc_subchannel* c, grpc_error** error, bool inhibit_health_checks) { + gpr_mu_lock(&c->mu); + grpc_connectivity_state_tracker* tracker = + inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; + grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); + gpr_mu_unlock(&c->mu); + return state; } -Subchannel* Subchannel::RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { - for (;;) { - gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_); - if (old_refs >= (1 << INTERNAL_REF_BITS)) { - gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS); - if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) { - return this; - } - } else { - return nullptr; - } +static void on_external_state_watcher_done(void* arg, grpc_error* error) { + external_state_watcher* w = static_cast(arg); + grpc_closure* follow_up = w->notify; + if (w->pollset_set != nullptr) { + grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set, + w->pollset_set); } + gpr_mu_lock(&w->subchannel->mu); + w->next->prev = w->prev; + w->prev->next = w->next; + gpr_mu_unlock(&w->subchannel->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); + gpr_free(w); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } -intptr_t Subchannel::GetChildSocketUuid() { - if (connected_subchannel_ != nullptr) { - return connected_subchannel_->socket_uuid(); +static void on_alarm(void* arg, grpc_error* error) { + grpc_subchannel* c = static_cast(arg); + gpr_mu_lock(&c->mu); + c->have_alarm = false; + if (c->disconnected) { + error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", + &error, 1); + } else if (c->retry_immediately) { + c->retry_immediately = false; + error = GRPC_ERROR_NONE; } else { - return 0; + GRPC_ERROR_REF(error); } + if (error == GRPC_ERROR_NONE) { + gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); + continue_connect_locked(c); + gpr_mu_unlock(&c->mu); + } else { + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } + GRPC_ERROR_UNREF(error); } -const char* Subchannel::GetTargetAddress() { - const grpc_arg* addr_arg = - grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS); - const char* addr_str = grpc_channel_arg_get_string(addr_arg); - GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. - return addr_str; -} - -RefCountedPtr Subchannel::connected_subchannel() { - MutexLock lock(&mu_); - return connected_subchannel_; -} - -channelz::SubchannelNode* Subchannel::channelz_node() { - return channelz_node_.get(); -} - -grpc_connectivity_state Subchannel::CheckConnectivity( - grpc_error** error, bool inhibit_health_checks) { - MutexLock lock(&mu_); - grpc_connectivity_state_tracker* tracker = - inhibit_health_checks ? &state_tracker_ : &state_and_health_tracker_; - grpc_connectivity_state state = grpc_connectivity_state_get(tracker, error); - return state; +static void maybe_start_connecting_locked(grpc_subchannel* c) { + if (c->disconnected) { + /* Don't try to connect if we're already disconnected */ + return; + } + if (c->connecting) { + /* Already connecting: don't restart */ + return; + } + if (c->connected_subchannel != nullptr) { + /* Already connected: don't restart */ + return; + } + if (!grpc_connectivity_state_has_watchers(&c->state_tracker) && + !grpc_connectivity_state_has_watchers(&c->state_and_health_tracker)) { + /* Nobody is interested in connecting: so don't just yet */ + return; + } + c->connecting = true; + GRPC_SUBCHANNEL_WEAK_REF(c, "connecting"); + if (!c->backoff_begun) { + c->backoff_begun = true; + continue_connect_locked(c); + } else { + GPR_ASSERT(!c->have_alarm); + c->have_alarm = true; + const grpc_millis time_til_next = + c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now(); + if (time_til_next <= 0) { + gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c); + } else { + gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c, + time_til_next); + } + GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); + grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm); + } } -void Subchannel::NotifyOnStateChange(grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, - grpc_closure* notify, - bool inhibit_health_checks) { +void grpc_subchannel_notify_on_state_change( + grpc_subchannel* c, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* notify, + bool inhibit_health_checks) { grpc_connectivity_state_tracker* tracker = - inhibit_health_checks ? &state_tracker_ : &state_and_health_tracker_; - ExternalStateWatcher* w; + inhibit_health_checks ? &c->state_tracker : &c->state_and_health_tracker; + external_state_watcher* w; if (state == nullptr) { - MutexLock lock(&mu_); - for (w = external_state_watcher_list_; w != nullptr; w = w->next) { + gpr_mu_lock(&c->mu); + for (w = c->root_external_state_watcher.next; + w != &c->root_external_state_watcher; w = w->next) { if (w->notify == notify) { grpc_connectivity_state_notify_on_state_change(tracker, nullptr, - &w->on_state_changed); + &w->closure); } } + gpr_mu_unlock(&c->mu); } else { - w = New(this, interested_parties, notify); + w = static_cast(gpr_malloc(sizeof(*w))); + w->subchannel = c; + w->pollset_set = interested_parties; + w->notify = notify; + GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w, + grpc_schedule_on_exec_ctx); if (interested_parties != nullptr) { - grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties); + grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties); } - MutexLock lock(&mu_); - if (external_state_watcher_list_ != nullptr) { - w->next = external_state_watcher_list_; - w->next->prev = w; - } - external_state_watcher_list_ = w; - grpc_connectivity_state_notify_on_state_change(tracker, state, - &w->on_state_changed); - MaybeStartConnectingLocked(); + GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher"); + gpr_mu_lock(&c->mu); + w->next = &c->root_external_state_watcher; + w->prev = w->next->prev; + w->next->prev = w->prev->next = w; + grpc_connectivity_state_notify_on_state_change(tracker, state, &w->closure); + maybe_start_connecting_locked(c); + gpr_mu_unlock(&c->mu); } } -void Subchannel::ResetBackoff() { - MutexLock lock(&mu_); - backoff_.Reset(); - if (have_retry_alarm_) { - retry_immediately_ = true; - grpc_timer_cancel(&retry_alarm_); - } else { - backoff_begun_ = false; - MaybeStartConnectingLocked(); +static bool publish_transport_locked(grpc_subchannel* c) { + /* construct channel stack */ + grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments( + builder, c->connecting_result.channel_args); + grpc_channel_stack_builder_set_transport(builder, + c->connecting_result.transport); + + if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { + grpc_channel_stack_builder_destroy(builder); + return false; } -} + grpc_channel_stack* stk; + grpc_error* error = grpc_channel_stack_builder_finish( + builder, 0, 1, connection_destroy, nullptr, + reinterpret_cast(&stk)); + if (error != GRPC_ERROR_NONE) { + grpc_transport_destroy(c->connecting_result.transport); + gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", + grpc_error_string(error)); + GRPC_ERROR_UNREF(error); + return false; + } + intptr_t socket_uuid = c->connecting_result.socket_uuid; + memset(&c->connecting_result, 0, sizeof(c->connecting_result)); -grpc_arg Subchannel::CreateSubchannelAddressArg( - const grpc_resolved_address* addr) { - return grpc_channel_arg_string_create( - (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, - addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); -} + if (c->disconnected) { + grpc_channel_stack_destroy(stk); + gpr_free(stk); + return false; + } -const char* Subchannel::GetUriFromSubchannelAddressArg( - const grpc_channel_args* args) { - const grpc_arg* addr_arg = - grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); - const char* addr_str = grpc_channel_arg_get_string(addr_arg); - GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. - return addr_str; -} + /* publish */ + c->connected_subchannel.reset(grpc_core::New( + stk, c->args, c->channelz_subchannel, socket_uuid)); + gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", + c->connected_subchannel.get(), c); -namespace { + // Instantiate state watcher. Will clean itself up. + c->connected_subchannel_watcher = + grpc_core::MakeOrphanable(c); -void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) { - grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); - GPR_ASSERT(uri != nullptr); - if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); - grpc_uri_destroy(uri); + return true; } -} // namespace +static void on_subchannel_connected(void* arg, grpc_error* error) { + grpc_subchannel* c = static_cast(arg); + grpc_channel_args* delete_channel_args = c->connecting_result.channel_args; + + GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected"); + gpr_mu_lock(&c->mu); + c->connecting = false; + if (c->connecting_result.transport != nullptr && + publish_transport_locked(c)) { + /* do nothing, transport was published */ + } else if (c->disconnected) { + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + } else { + set_subchannel_connectivity_state_locked( + c, GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + "connect_failed"); + grpc_connectivity_state_set( + &c->state_and_health_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, + grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Connect Failed", &error, 1), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), + "connect_failed"); -void Subchannel::GetAddressFromSubchannelAddressArg( - const grpc_channel_args* args, grpc_resolved_address* addr) { - const char* addr_uri_str = GetUriFromSubchannelAddressArg(args); - memset(addr, 0, sizeof(*addr)); - if (*addr_uri_str != '\0') { - UriToSockaddr(addr_uri_str, addr); + const char* errmsg = grpc_error_string(error); + gpr_log(GPR_INFO, "Connect failed: %s", errmsg); + + maybe_start_connecting_locked(c); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); } + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected"); + grpc_channel_args_destroy(delete_channel_args); } -namespace { - -// Returns a string indicating the subchannel's connectivity state change to -// \a state. -const char* SubchannelConnectivityStateChangeString( - grpc_connectivity_state state) { - switch (state) { - case GRPC_CHANNEL_IDLE: - return "Subchannel state change to IDLE"; - case GRPC_CHANNEL_CONNECTING: - return "Subchannel state change to CONNECTING"; - case GRPC_CHANNEL_READY: - return "Subchannel state change to READY"; - case GRPC_CHANNEL_TRANSIENT_FAILURE: - return "Subchannel state change to TRANSIENT_FAILURE"; - case GRPC_CHANNEL_SHUTDOWN: - return "Subchannel state change to SHUTDOWN"; +void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) { + gpr_mu_lock(&subchannel->mu); + subchannel->backoff->Reset(); + if (subchannel->have_alarm) { + subchannel->retry_immediately = true; + grpc_timer_cancel(&subchannel->alarm); + } else { + subchannel->backoff_begun = false; + maybe_start_connecting_locked(subchannel); } - GPR_UNREACHABLE_CODE(return "UNKNOWN"); + gpr_mu_unlock(&subchannel->mu); } -} // namespace +/* + * grpc_subchannel_call implementation + */ -void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state, - grpc_error* error, - const char* reason) { - if (channelz_node_ != nullptr) { - channelz_node_->AddTraceEvent( - channelz::ChannelTrace::Severity::Info, - grpc_slice_from_static_string( - SubchannelConnectivityStateChangeString(state))); - } - grpc_connectivity_state_set(&state_tracker_, state, error, reason); +static void subchannel_call_destroy(void* call, grpc_error* error) { + GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0); + grpc_subchannel_call* c = static_cast(call); + grpc_core::ConnectedSubchannel* connection = c->connection; + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr, + c->schedule_closure_after_destroy); + connection->Unref(DEBUG_LOCATION, "subchannel_call"); + c->~grpc_subchannel_call(); } -void Subchannel::MaybeStartConnectingLocked() { - if (disconnected_) { - // Don't try to connect if we're already disconnected. - return; - } - if (connecting_) { - // Already connecting: don't restart. - return; - } - if (connected_subchannel_ != nullptr) { - // Already connected: don't restart. - return; - } - if (!grpc_connectivity_state_has_watchers(&state_tracker_) && - !grpc_connectivity_state_has_watchers(&state_and_health_tracker_)) { - // Nobody is interested in connecting: so don't just yet. - return; - } - connecting_ = true; - GRPC_SUBCHANNEL_WEAK_REF(this, "connecting"); - if (!backoff_begun_) { - backoff_begun_ = true; - ContinueConnectingLocked(); +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call, + grpc_closure* closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == nullptr); + GPR_ASSERT(closure != nullptr); + call->schedule_closure_after_destroy = closure; +} + +grpc_subchannel_call* grpc_subchannel_call_ref( + grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); + return c; +} + +void grpc_subchannel_call_unref( + grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); +} + +// Sets *status based on md_batch and error. +static void get_call_status(grpc_subchannel_call* call, + grpc_metadata_batch* md_batch, grpc_error* error, + grpc_status_code* status) { + if (error != GRPC_ERROR_NONE) { + grpc_error_get_status(error, call->deadline, status, nullptr, nullptr, + nullptr); } else { - GPR_ASSERT(!have_retry_alarm_); - have_retry_alarm_ = true; - const grpc_millis time_til_next = - next_attempt_deadline_ - ExecCtx::Get()->Now(); - if (time_til_next <= 0) { - gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this); + if (md_batch->idx.named.grpc_status != nullptr) { + *status = grpc_get_status_code_from_metadata( + md_batch->idx.named.grpc_status->md); } else { - gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", - this, time_til_next); + *status = GRPC_STATUS_UNKNOWN; } - GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this, - grpc_schedule_on_exec_ctx); - grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_); } + GRPC_ERROR_UNREF(error); } -void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) { - Subchannel* c = static_cast(arg); - gpr_mu_lock(&c->mu_); - c->have_retry_alarm_ = false; - if (c->disconnected_) { - error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected", - &error, 1); - } else if (c->retry_immediately_) { - c->retry_immediately_ = false; - error = GRPC_ERROR_NONE; +static void recv_trailing_metadata_ready(void* arg, grpc_error* error) { + grpc_subchannel_call* call = static_cast(arg); + GPR_ASSERT(call->recv_trailing_metadata != nullptr); + grpc_status_code status = GRPC_STATUS_OK; + grpc_metadata_batch* md_batch = call->recv_trailing_metadata; + get_call_status(call, md_batch, GRPC_ERROR_REF(error), &status); + grpc_core::channelz::SubchannelNode* channelz_subchannel = + call->connection->channelz_subchannel(); + GPR_ASSERT(channelz_subchannel != nullptr); + if (status == GRPC_STATUS_OK) { + channelz_subchannel->RecordCallSucceeded(); } else { - GRPC_ERROR_REF(error); + channelz_subchannel->RecordCallFailed(); } - if (error == GRPC_ERROR_NONE) { - gpr_log(GPR_INFO, "Failed to connect to channel, retrying"); - c->ContinueConnectingLocked(); - gpr_mu_unlock(&c->mu_); - } else { - gpr_mu_unlock(&c->mu_); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); + GRPC_CLOSURE_RUN(call->original_recv_trailing_metadata, + GRPC_ERROR_REF(error)); +} + +// If channelz is enabled, intercept recv_trailing so that we may check the +// status and associate it to a subchannel. +static void maybe_intercept_recv_trailing_metadata( + grpc_subchannel_call* call, grpc_transport_stream_op_batch* batch) { + // only intercept payloads with recv trailing. + if (!batch->recv_trailing_metadata) { + return; } - GRPC_ERROR_UNREF(error); + // only add interceptor is channelz is enabled. + if (call->connection->channelz_subchannel() == nullptr) { + return; + } + GRPC_CLOSURE_INIT(&call->recv_trailing_metadata_ready, + recv_trailing_metadata_ready, call, + grpc_schedule_on_exec_ctx); + // save some state needed for the interception callback. + GPR_ASSERT(call->recv_trailing_metadata == nullptr); + call->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + call->original_recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; + batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = + &call->recv_trailing_metadata_ready; } -void Subchannel::ContinueConnectingLocked() { - grpc_connect_in_args args; - args.interested_parties = pollset_set_; - const grpc_millis min_deadline = - min_connect_timeout_ms_ + ExecCtx::Get()->Now(); - next_attempt_deadline_ = backoff_.NextAttemptTime(); - args.deadline = std::max(next_attempt_deadline_, min_deadline); - args.channel_args = args_; - SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, - "connecting"); - grpc_connectivity_state_set(&state_and_health_tracker_, - GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, - "connecting"); - grpc_connector_connect(connector_, &args, &connecting_result_, - &on_connecting_finished_); +void grpc_subchannel_call_process_op(grpc_subchannel_call* call, + grpc_transport_stream_op_batch* batch) { + GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0); + maybe_intercept_recv_trailing_metadata(call, batch); + grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); + grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0); + GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch); + top_elem->filter->start_transport_stream_op_batch(top_elem, batch); } -void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) { - auto* c = static_cast(arg); - grpc_channel_args* delete_channel_args = c->connecting_result_.channel_args; - GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished"); - gpr_mu_lock(&c->mu_); - c->connecting_ = false; - if (c->connecting_result_.transport != nullptr && - c->PublishTransportLocked()) { - // Do nothing, transport was published. - } else if (c->disconnected_) { - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); - } else { - c->SetConnectivityStateLocked( - GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - "connect_failed"); - grpc_connectivity_state_set( - &c->state_and_health_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, - grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Connect Failed", &error, 1), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - "connect_failed"); +grpc_core::RefCountedPtr +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) { + gpr_mu_lock(&c->mu); + auto copy = c->connected_subchannel; + gpr_mu_unlock(&c->mu); + return copy; +} - const char* errmsg = grpc_error_string(error); - gpr_log(GPR_INFO, "Connect failed: %s", errmsg); +void* grpc_connected_subchannel_call_get_parent_data( + grpc_subchannel_call* subchannel_call) { + grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack(); + return (char*)subchannel_call + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)) + + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size); +} - c->MaybeStartConnectingLocked(); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting"); +grpc_call_stack* grpc_subchannel_call_get_call_stack( + grpc_subchannel_call* subchannel_call) { + return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); +} + +static void grpc_uri_to_sockaddr(const char* uri_str, + grpc_resolved_address* addr) { + grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + GPR_ASSERT(uri != nullptr); + if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr)); + grpc_uri_destroy(uri); +} + +void grpc_get_subchannel_address_arg(const grpc_channel_args* args, + grpc_resolved_address* addr) { + const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args); + memset(addr, 0, sizeof(*addr)); + if (*addr_uri_str != '\0') { + grpc_uri_to_sockaddr(addr_uri_str, addr); } - gpr_mu_unlock(&c->mu_); - GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished"); - grpc_channel_args_destroy(delete_channel_args); } -namespace { +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) { + const grpc_arg* addr_arg = + grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; +} -void ConnectionDestroy(void* arg, grpc_error* error) { - grpc_channel_stack* stk = static_cast(arg); - grpc_channel_stack_destroy(stk); - gpr_free(stk); +const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) { + const grpc_arg* addr_arg = + grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); + const char* addr_str = grpc_channel_arg_get_string(addr_arg); + GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy. + return addr_str; } -} // namespace +grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) { + return grpc_channel_arg_string_create( + (char*)GRPC_ARG_SUBCHANNEL_ADDRESS, + addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup("")); +} -bool Subchannel::PublishTransportLocked() { - // Construct channel stack. - grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create(); - grpc_channel_stack_builder_set_channel_arguments( - builder, connecting_result_.channel_args); - grpc_channel_stack_builder_set_transport(builder, - connecting_result_.transport); - if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) { - grpc_channel_stack_builder_destroy(builder); - return false; - } - grpc_channel_stack* stk; - grpc_error* error = grpc_channel_stack_builder_finish( - builder, 0, 1, ConnectionDestroy, nullptr, - reinterpret_cast(&stk)); - if (error != GRPC_ERROR_NONE) { - grpc_transport_destroy(connecting_result_.transport); - gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", - grpc_error_string(error)); - GRPC_ERROR_UNREF(error); - return false; - } - intptr_t socket_uuid = connecting_result_.socket_uuid; - memset(&connecting_result_, 0, sizeof(connecting_result_)); - if (disconnected_) { - grpc_channel_stack_destroy(stk); - gpr_free(stk); - return false; - } - // Publish. - connected_subchannel_.reset( - New(stk, args_, channelz_node_, socket_uuid)); - gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p", - connected_subchannel_.get(), this); - // Instantiate state watcher. Will clean itself up. - connected_subchannel_watcher_ = - MakeOrphanable(this); - return true; +namespace grpc_core { + +ConnectedSubchannel::ConnectedSubchannel( + grpc_channel_stack* channel_stack, const grpc_channel_args* args, + grpc_core::RefCountedPtr + channelz_subchannel, + intptr_t socket_uuid) + : RefCounted(&grpc_trace_stream_refcount), + channel_stack_(channel_stack), + args_(grpc_channel_args_copy(args)), + channelz_subchannel_(std::move(channelz_subchannel)), + socket_uuid_(socket_uuid) {} + +ConnectedSubchannel::~ConnectedSubchannel() { + grpc_channel_args_destroy(args_); + GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor"); } -void Subchannel::Disconnect() { - // The subchannel_pool is only used once here in this subchannel, so the - // access can be outside of the lock. - if (subchannel_pool_ != nullptr) { - subchannel_pool_->UnregisterSubchannel(key_); - subchannel_pool_.reset(); +void ConnectedSubchannel::NotifyOnStateChange( + grpc_pollset_set* interested_parties, grpc_connectivity_state* state, + grpc_closure* closure) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->connectivity_state = state; + op->on_connectivity_state_change = closure; + op->bind_pollset_set = interested_parties; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +void ConnectedSubchannel::Ping(grpc_closure* on_initiate, + grpc_closure* on_ack) { + grpc_transport_op* op = grpc_make_transport_op(nullptr); + grpc_channel_element* elem; + op->send_ping.on_initiate = on_initiate; + op->send_ping.on_ack = on_ack; + elem = grpc_channel_stack_element(channel_stack_, 0); + elem->filter->start_transport_op(elem, op); +} + +grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args, + grpc_subchannel_call** call) { + const size_t allocation_size = + GetInitialCallSizeEstimate(args.parent_data_size); + *call = new (gpr_arena_alloc(args.arena, allocation_size)) + grpc_subchannel_call(this, args); + grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); + RefCountedPtr connection = + Ref(DEBUG_LOCATION, "subchannel_call"); + connection.release(); // Ref is passed to the grpc_subchannel_call object. + const grpc_call_element_args call_args = { + callstk, /* call_stack */ + nullptr, /* server_transport_data */ + args.context, /* context */ + args.path, /* path */ + args.start_time, /* start_time */ + args.deadline, /* deadline */ + args.arena, /* arena */ + args.call_combiner /* call_combiner */ + }; + grpc_error* error = grpc_call_stack_init( + channel_stack_, 1, subchannel_call_destroy, *call, &call_args); + if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) { + const char* error_string = grpc_error_string(error); + gpr_log(GPR_ERROR, "error: %s", error_string); + return error; } - MutexLock lock(&mu_); - GPR_ASSERT(!disconnected_); - disconnected_ = true; - grpc_connector_shutdown(connector_, GRPC_ERROR_CREATE_FROM_STATIC_STRING( - "Subchannel disconnected")); - connected_subchannel_.reset(); - connected_subchannel_watcher_.reset(); + grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent); + if (channelz_subchannel_ != nullptr) { + channelz_subchannel_->RecordCallStarted(); + } + return GRPC_ERROR_NONE; } -gpr_atm Subchannel::RefMutate( - gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) { - gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta) - : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta); -#ifndef NDEBUG - if (grpc_trace_stream_refcount.enabled()) { - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this, - purpose, old_val, old_val + delta, reason); +size_t ConnectedSubchannel::GetInitialCallSizeEstimate( + size_t parent_data_size) const { + size_t allocation_size = + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call)); + if (parent_data_size > 0) { + allocation_size += + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) + + parent_data_size; + } else { + allocation_size += channel_stack_->call_stack_size; } -#endif - return old_val; + return allocation_size; } } // namespace grpc_core diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 88282c9d95e..fac515eee5c 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -24,49 +24,53 @@ #include "src/core/ext/filters/client_channel/client_channel_channelz.h" #include "src/core/ext/filters/client_channel/connector.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" -#include "src/core/lib/backoff/backoff.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/polling_entity.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" // Channel arg containing a grpc_resolved_address to connect to. #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" -// For debugging refcounting. +/** A (sub-)channel that knows how to connect to exactly one target + address. Provides a target for load balancing. */ +typedef struct grpc_subchannel grpc_subchannel; +typedef struct grpc_subchannel_call grpc_subchannel_call; + #ifndef NDEBUG -#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_REF(p, r) \ + grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ - (p)->RefFromWeakRef(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r)) + grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(p, r) \ + grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \ + grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \ + grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_REF(p, r) \ + grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ + grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ - const char *file, int line, const char *reason -#define GRPC_SUBCHANNEL_REF_REASON reason -#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \ - , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose -#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x + , const char *file, int line, const char *reason #else -#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref() -#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() -#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref() -#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef() -#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref() +#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) +#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \ + grpc_subchannel_ref_from_weak_ref((p)) +#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) +#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p)) +#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p)) +#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS -#define GRPC_SUBCHANNEL_REF_REASON "" -#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS -#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) #endif namespace grpc_core { -class SubchannelCall; - class ConnectedSubchannel : public RefCounted { public: struct CallArgs { @@ -82,7 +86,8 @@ class ConnectedSubchannel : public RefCounted { ConnectedSubchannel( grpc_channel_stack* channel_stack, const grpc_channel_args* args, - RefCountedPtr channelz_subchannel, + grpc_core::RefCountedPtr + channelz_subchannel, intptr_t socket_uuid); ~ConnectedSubchannel(); @@ -90,8 +95,7 @@ class ConnectedSubchannel : public RefCounted { grpc_connectivity_state* state, grpc_closure* closure); void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); - RefCountedPtr CreateCall(const CallArgs& args, - grpc_error** error); + grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call); grpc_channel_stack* channel_stack() const { return channel_stack_; } const grpc_channel_args* args() const { return args_; } @@ -107,204 +111,91 @@ class ConnectedSubchannel : public RefCounted { grpc_channel_args* args_; // ref counted pointer to the channelz node in this connected subchannel's // owning subchannel. - RefCountedPtr channelz_subchannel_; + grpc_core::RefCountedPtr + channelz_subchannel_; // uuid of this subchannel's socket. 0 if this subchannel is not connected. const intptr_t socket_uuid_; }; -// Implements the interface of RefCounted<>. -class SubchannelCall { - public: - SubchannelCall(RefCountedPtr connected_subchannel, - const ConnectedSubchannel::CallArgs& args) - : connected_subchannel_(std::move(connected_subchannel)), - deadline_(args.deadline) {} - - // Continues processing a transport stream op batch. - void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); - - // Returns a pointer to the parent data associated with the subchannel call. - // The data will be of the size specified in \a parent_data_size field of - // the args passed to \a ConnectedSubchannel::CreateCall(). - void* GetParentData(); - - // Returns the call stack of the subchannel call. - grpc_call_stack* GetCallStack(); - - grpc_closure* after_call_stack_destroy() const { - return after_call_stack_destroy_; - } - - // Sets the 'then_schedule_closure' argument for call stack destruction. - // Must be called once per call. - void SetAfterCallStackDestroy(grpc_closure* closure); - - // Interface of RefCounted<>. - RefCountedPtr Ref() GRPC_MUST_USE_RESULT; - RefCountedPtr Ref(const DebugLocation& location, - const char* reason) GRPC_MUST_USE_RESULT; - // When refcount drops to 0, destroys itself and the associated call stack, - // but does NOT free the memory because it's in the call arena. - void Unref(); - void Unref(const DebugLocation& location, const char* reason); - - private: - // Allow RefCountedPtr<> to access IncrementRefCount(). - template - friend class RefCountedPtr; - - // If channelz is enabled, intercepts recv_trailing so that we may check the - // status and associate it to a subchannel. - void MaybeInterceptRecvTrailingMetadata( - grpc_transport_stream_op_batch* batch); - - static void RecvTrailingMetadataReady(void* arg, grpc_error* error); - - // Interface of RefCounted<>. - void IncrementRefCount(); - void IncrementRefCount(const DebugLocation& location, const char* reason); - - RefCountedPtr connected_subchannel_; - grpc_closure* after_call_stack_destroy_ = nullptr; - // State needed to support channelz interception of recv trailing metadata. - grpc_closure recv_trailing_metadata_ready_; - grpc_closure* original_recv_trailing_metadata_ = nullptr; - grpc_metadata_batch* recv_trailing_metadata_ = nullptr; - grpc_millis deadline_; -}; - -// A subchannel that knows how to connect to exactly one target address. It -// provides a target for load balancing. -class Subchannel { - public: - // The ctor and dtor are not intended to use directly. - Subchannel(SubchannelKey* key, grpc_connector* connector, - const grpc_channel_args* args); - ~Subchannel(); - - // Creates a subchannel given \a connector and \a args. - static Subchannel* Create(grpc_connector* connector, - const grpc_channel_args* args); - - // Strong and weak refcounting. - Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); - - intptr_t GetChildSocketUuid(); - - // Gets the string representing the subchannel address. - // Caller doesn't take ownership. - const char* GetTargetAddress(); - - // Gets the connected subchannel - or nullptr if not connected (which may - // happen before it initially connects or during transient failures). - RefCountedPtr connected_subchannel(); - - channelz::SubchannelNode* channelz_node(); - - // Polls the current connectivity state of the subchannel. - grpc_connectivity_state CheckConnectivity(grpc_error** error, - bool inhibit_health_checking); - - // When the connectivity state of the subchannel changes from \a *state, - // invokes \a notify and updates \a *state with the new state. - void NotifyOnStateChange(grpc_pollset_set* interested_parties, - grpc_connectivity_state* state, grpc_closure* notify, - bool inhibit_health_checks); - - // Resets the connection backoff of the subchannel. - // TODO(roth): Move connection backoff out of subchannels and up into LB - // policy code (probably by adding a SubchannelGroup between - // SubchannelList and SubchannelData), at which point this method can - // go away. - void ResetBackoff(); - - // Returns a new channel arg encoding the subchannel address as a URI - // string. Caller is responsible for freeing the string. - static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr); - - // Returns the URI string from the subchannel address arg in \a args. - static const char* GetUriFromSubchannelAddressArg( - const grpc_channel_args* args); - - // Sets \a addr from the subchannel address arg in \a args. - static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args, - grpc_resolved_address* addr); - - private: - struct ExternalStateWatcher; - class ConnectedSubchannelStateWatcher; - - // Sets the subchannel's connectivity state to \a state. - void SetConnectivityStateLocked(grpc_connectivity_state state, - grpc_error* error, const char* reason); - - // Methods for connection. - void MaybeStartConnectingLocked(); - static void OnRetryAlarm(void* arg, grpc_error* error); - void ContinueConnectingLocked(); - static void OnConnectingFinished(void* arg, grpc_error* error); - bool PublishTransportLocked(); - void Disconnect(); - - gpr_atm RefMutate(gpr_atm delta, - int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS); - - // The subchannel pool this subchannel is in. - RefCountedPtr subchannel_pool_; - // TODO(juanlishen): Consider using args_ as key_ directly. - // Subchannel key that identifies this subchannel in the subchannel pool. - SubchannelKey* key_; - // Channel args. - grpc_channel_args* args_; - // pollset_set tracking who's interested in a connection being setup. - grpc_pollset_set* pollset_set_; - // Protects the other members. - gpr_mu mu_; - // Refcount - // - lower INTERNAL_REF_BITS bits are for internal references: - // these do not keep the subchannel open. - // - upper remaining bits are for public references: these do - // keep the subchannel open - gpr_atm ref_pair_; - - // Connection states. - grpc_connector* connector_ = nullptr; - // Set during connection. - grpc_connect_out_args connecting_result_; - grpc_closure on_connecting_finished_; - // Active connection, or null. - RefCountedPtr connected_subchannel_; - OrphanablePtr connected_subchannel_watcher_; - bool connecting_ = false; - bool disconnected_ = false; - - // Connectivity state tracking. - grpc_connectivity_state_tracker state_tracker_; - grpc_connectivity_state_tracker state_and_health_tracker_; - UniquePtr health_check_service_name_; - ExternalStateWatcher* external_state_watcher_list_ = nullptr; - - // Backoff state. - BackOff backoff_; - grpc_millis next_attempt_deadline_; - grpc_millis min_connect_timeout_ms_; - bool backoff_begun_ = false; - - // Retry alarm. - grpc_timer retry_alarm_; - grpc_closure on_retry_alarm_; - bool have_retry_alarm_ = false; - // reset_backoff() was called while alarm was pending. - bool retry_immediately_ = false; - - // Channelz tracking. - RefCountedPtr channelz_node_; -}; - } // namespace grpc_core +grpc_subchannel* grpc_subchannel_ref( + grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel* grpc_subchannel_ref_from_weak_ref( + grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref( + grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel* grpc_subchannel_weak_ref( + grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_weak_unref( + grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +grpc_subchannel_call* grpc_subchannel_call_ref( + grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref( + grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); + +grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node( + grpc_subchannel* subchannel); + +intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel); + +/** Returns a pointer to the parent data associated with \a subchannel_call. + The data will be of the size specified in \a parent_data_size + field of the args passed to \a grpc_connected_subchannel_create_call(). */ +void* grpc_connected_subchannel_call_get_parent_data( + grpc_subchannel_call* subchannel_call); + +/** poll the current connectivity state of a channel */ +grpc_connectivity_state grpc_subchannel_check_connectivity( + grpc_subchannel* channel, grpc_error** error, bool inhibit_health_checking); + +/** Calls notify when the connectivity state of a channel becomes different + from *state. Updates *state with the new state of the channel. */ +void grpc_subchannel_notify_on_state_change( + grpc_subchannel* channel, grpc_pollset_set* interested_parties, + grpc_connectivity_state* state, grpc_closure* notify, + bool inhibit_health_checks); + +/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected + * (which may happen before it initially connects or during transient failures) + * */ +grpc_core::RefCountedPtr +grpc_subchannel_get_connected_subchannel(grpc_subchannel* c); + +// Resets the connection backoff of the subchannel. +// TODO(roth): Move connection backoff out of subchannels and up into LB +// policy code (probably by adding a SubchannelGroup between +// SubchannelList and SubchannelData), at which point this method can +// go away. +void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel); + +/** continue processing a transport op */ +void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call, + grpc_transport_stream_op_batch* op); + +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call* subchannel_call, grpc_closure* closure); + +grpc_call_stack* grpc_subchannel_call_get_call_stack( + grpc_subchannel_call* subchannel_call); + +/** create a subchannel given a connector */ +grpc_subchannel* grpc_subchannel_create(grpc_connector* connector, + const grpc_channel_args* args); + +/// Sets \a addr from \a args. +void grpc_get_subchannel_address_arg(const grpc_channel_args* args, + grpc_resolved_address* addr); + +const char* grpc_subchannel_get_target(grpc_subchannel* subchannel); + +/// Returns the URI string for the address to connect to. +const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args); + +/// Returns a new channel arg encoding the subchannel address as a string. +/// Caller is responsible for freeing the string. +grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */ diff --git a/src/core/ext/filters/client_channel/subchannel_pool_interface.h b/src/core/ext/filters/client_channel/subchannel_pool_interface.h index eeb56faf0c0..21597bf4276 100644 --- a/src/core/ext/filters/client_channel/subchannel_pool_interface.h +++ b/src/core/ext/filters/client_channel/subchannel_pool_interface.h @@ -26,9 +26,9 @@ #include "src/core/lib/gprpp/abstract.h" #include "src/core/lib/gprpp/ref_counted.h" -namespace grpc_core { +struct grpc_subchannel; -class Subchannel; +namespace grpc_core { extern TraceFlag grpc_subchannel_pool_trace; @@ -69,15 +69,15 @@ class SubchannelPoolInterface : public RefCounted { // Registers a subchannel against a key. Returns the subchannel registered // with \a key, which may be different from \a constructed because we reuse // (instead of update) any existing subchannel already registered with \a key. - virtual Subchannel* RegisterSubchannel(SubchannelKey* key, - Subchannel* constructed) GRPC_ABSTRACT; + virtual grpc_subchannel* RegisterSubchannel( + SubchannelKey* key, grpc_subchannel* constructed) GRPC_ABSTRACT; // Removes the registered subchannel found by \a key. virtual void UnregisterSubchannel(SubchannelKey* key) GRPC_ABSTRACT; // Finds the subchannel registered for the given subchannel key. Returns NULL // if no such channel exists. Thread-safe. - virtual Subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT; + virtual grpc_subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT; // Creates a channel arg from \a subchannel pool. static grpc_arg CreateChannelArg(SubchannelPoolInterface* subchannel_pool); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 1e9a75d0630..42a2e2e896c 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -202,8 +202,7 @@ static void chttp2_connector_connect(grpc_connector* con, grpc_closure* notify) { chttp2_connector* c = reinterpret_cast(con); grpc_resolved_address addr; - grpc_core::Subchannel::GetAddressFromSubchannelAddressArg(args->channel_args, - &addr); + grpc_get_subchannel_address_arg(args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == nullptr); c->notify = notify; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc index 8aabcfa2000..a5bf1bf21d4 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.cc @@ -39,11 +39,11 @@ static void client_channel_factory_ref( static void client_channel_factory_unref( grpc_client_channel_factory* cc_factory) {} -static grpc_core::Subchannel* client_channel_factory_create_subchannel( +static grpc_subchannel* client_channel_factory_create_subchannel( grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args); grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args); + grpc_subchannel* s = grpc_subchannel_create(connector, new_args); grpc_connector_unref(connector); grpc_channel_args_destroy(new_args); return s; diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc index eb2fee2af91..5985fa0cbdb 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc @@ -76,8 +76,7 @@ static grpc_channel_args* get_secure_naming_channel_args( grpc_core::UniquePtr authority; if (target_authority_table != nullptr) { // Find the authority for the target. - const char* target_uri_str = - grpc_core::Subchannel::GetUriFromSubchannelAddressArg(args); + const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args); grpc_uri* target_uri = grpc_uri_parse(target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != nullptr); @@ -139,7 +138,7 @@ static grpc_channel_args* get_secure_naming_channel_args( return new_args; } -static grpc_core::Subchannel* client_channel_factory_create_subchannel( +static grpc_subchannel* client_channel_factory_create_subchannel( grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) { grpc_channel_args* new_args = get_secure_naming_channel_args(args); if (new_args == nullptr) { @@ -148,7 +147,7 @@ static grpc_core::Subchannel* client_channel_factory_create_subchannel( return nullptr; } grpc_connector* connector = grpc_chttp2_connector_create(); - grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args); + grpc_subchannel* s = grpc_subchannel_create(connector, new_args); grpc_connector_unref(connector); grpc_channel_args_destroy(new_args); return s; diff --git a/test/core/util/debugger_macros.cc b/test/core/util/debugger_macros.cc index fed6ad97285..05fb1461733 100644 --- a/test/core/util/debugger_macros.cc +++ b/test/core/util/debugger_macros.cc @@ -36,14 +36,13 @@ grpc_stream* grpc_transport_stream_from_call(grpc_call* call) { for (;;) { grpc_call_element* el = grpc_call_stack_element(cs, cs->count - 1); if (el->filter == &grpc_client_channel_filter) { - grpc_core::RefCountedPtr scc = - grpc_client_channel_get_subchannel_call(el); + grpc_subchannel_call* scc = grpc_client_channel_get_subchannel_call(el); if (scc == nullptr) { fprintf(stderr, "No subchannel-call"); fflush(stderr); return nullptr; } - cs = scc->GetCallStack(); + cs = grpc_subchannel_call_get_call_stack(scc); } else if (el->filter == &grpc_connected_filter) { return grpc_connected_channel_get_stream(el); } else { diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index 973f47beaf7..125b1ce5c4e 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -325,8 +325,8 @@ class FakeClientChannelFactory : public grpc_client_channel_factory { private: static void NoRef(grpc_client_channel_factory* factory) {} static void NoUnref(grpc_client_channel_factory* factory) {} - static grpc_core::Subchannel* CreateSubchannel( - grpc_client_channel_factory* factory, const grpc_channel_args* args) { + static grpc_subchannel* CreateSubchannel(grpc_client_channel_factory* factory, + const grpc_channel_args* args) { return nullptr; } static grpc_channel* CreateClientChannel(grpc_client_channel_factory* factory,