C++-ify subchannel

pull/17653/head
Juanli Shen 6 years ago
parent 5ae19486dd
commit 25dc2ffed6
  1. 112
      src/core/ext/filters/client_channel/client_channel.cc
  2. 4
      src/core/ext/filters/client_channel/client_channel.h
  3. 11
      src/core/ext/filters/client_channel/client_channel_channelz.cc
  4. 9
      src/core/ext/filters/client_channel/client_channel_channelz.h
  5. 2
      src/core/ext/filters/client_channel/client_channel_factory.cc
  6. 6
      src/core/ext/filters/client_channel/client_channel_factory.h
  7. 19
      src/core/ext/filters/client_channel/global_subchannel_pool.cc
  8. 6
      src/core/ext/filters/client_channel/global_subchannel_pool.h
  9. 18
      src/core/ext/filters/client_channel/health/health_check_client.cc
  10. 2
      src/core/ext/filters/client_channel/health/health_check_client.h
  11. 2
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  12. 2
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  13. 35
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  14. 14
      src/core/ext/filters/client_channel/local_subchannel_pool.cc
  15. 6
      src/core/ext/filters/client_channel/local_subchannel_pool.h
  16. 1420
      src/core/ext/filters/client_channel/subchannel.cc
  17. 313
      src/core/ext/filters/client_channel/subchannel.h
  18. 10
      src/core/ext/filters/client_channel/subchannel_pool_interface.h
  19. 3
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  20. 4
      src/core/ext/transport/chttp2/client/insecure/channel_create.cc
  21. 7
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
  22. 5
      test/core/util/debugger_macros.cc
  23. 4
      test/cpp/microbenchmarks/bm_call_create.cc

@ -394,7 +394,7 @@ struct subchannel_batch_data {
gpr_refcount refs;
grpc_call_element* elem;
grpc_subchannel_call* subchannel_call; // Holds a ref.
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
// The batch to use in the subchannel call.
// Its payload field points to subchannel_call_retry_state.batch_payload.
grpc_transport_stream_op_batch batch;
@ -478,7 +478,7 @@ struct pending_batch {
bool send_ops_cached;
};
/** Call data. Holds a pointer to grpc_subchannel_call and the
/** Call data. Holds a pointer to SubchannelCall and the
associated machinery to create such a pointer.
Handles queueing of stream ops until a call object is ready, waiting
for initial metadata before trying to create a call object,
@ -504,10 +504,6 @@ struct call_data {
last_attempt_got_server_pushback(false) {}
~call_data() {
if (GPR_LIKELY(subchannel_call != nullptr)) {
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call,
"client_channel_destroy_call");
}
grpc_slice_unref_internal(path);
GRPC_ERROR_UNREF(cancel_error);
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches); ++i) {
@ -536,7 +532,7 @@ struct call_data {
grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
grpc_subchannel_call* subchannel_call = nullptr;
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> subchannel_call;
// Set when we get a cancel_stream op.
grpc_error* cancel_error = GRPC_ERROR_NONE;
@ -807,8 +803,8 @@ static void pending_batches_add(grpc_call_element* elem,
calld->subchannel_call == nullptr
? nullptr
: static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
calld->subchannel_call->GetParentData());
retry_commit(elem, retry_state);
// If we are not going to retry and have not yet started, pretend
// retries are disabled so that we don't bother with retry overhead.
@ -896,10 +892,10 @@ static void resume_pending_batch_in_call_combiner(void* arg,
grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_subchannel_call* subchannel_call =
static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
grpc_core::SubchannelCall* subchannel_call =
static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(subchannel_call, batch);
subchannel_call->StartTransportStreamOpBatch(batch);
}
// This is called via the call combiner, so access to calld is synchronized.
@ -919,7 +915,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" pending batches on subchannel_call=%p",
chand, calld, num_batches, calld->subchannel_call);
chand, calld, num_batches, calld->subchannel_call.get());
}
grpc_core::CallCombinerClosureList closures;
for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
@ -930,7 +926,7 @@ static void pending_batches_resume(grpc_call_element* elem) {
maybe_inject_recv_trailing_metadata_ready_for_lb(
*calld->request->pick(), batch);
}
batch->handler_private.extra_arg = calld->subchannel_call;
batch->handler_private.extra_arg = calld->subchannel_call.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
resume_pending_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
@ -1019,12 +1015,7 @@ static void do_retry(grpc_call_element* elem,
const ClientChannelMethodParams::RetryPolicy* retry_policy =
calld->method_params->retry_policy();
GPR_ASSERT(retry_policy != nullptr);
// Reset subchannel call and connected subchannel.
if (calld->subchannel_call != nullptr) {
GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
"client_channel_call_retry");
calld->subchannel_call = nullptr;
}
calld->subchannel_call.reset();
if (calld->have_request) {
calld->have_request = false;
calld->request.Destroy();
@ -1078,8 +1069,7 @@ static bool maybe_retry(grpc_call_element* elem,
subchannel_call_retry_state* retry_state = nullptr;
if (batch_data != nullptr) {
retry_state = static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
if (retry_state->retry_dispatched) {
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
@ -1180,13 +1170,10 @@ namespace {
subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
call_data* calld, int refcount,
bool set_on_complete)
: elem(elem),
subchannel_call(GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call,
"batch_data_create")) {
: elem(elem), subchannel_call(calld->subchannel_call) {
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
calld->subchannel_call->GetParentData());
batch.payload = &retry_state->batch_payload;
gpr_ref_init(&refs, refcount);
if (set_on_complete) {
@ -1200,7 +1187,7 @@ subchannel_batch_data::subchannel_batch_data(grpc_call_element* elem,
void subchannel_batch_data::destroy() {
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(subchannel_call));
subchannel_call->GetParentData());
if (batch.send_initial_metadata) {
grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
}
@ -1213,7 +1200,7 @@ void subchannel_batch_data::destroy() {
if (batch.recv_trailing_metadata) {
grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
}
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "batch_data_unref");
subchannel_call.reset();
call_data* calld = static_cast<call_data*>(elem->call_data);
GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
}
@ -1260,8 +1247,7 @@ static void invoke_recv_initial_metadata_callback(void* arg,
// Return metadata.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_initial_metadata,
pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
@ -1293,8 +1279,7 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_initial_metadata = true;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_initial_metadata op, so do nothing.
@ -1355,8 +1340,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
// Return payload.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
*pending->batch->payload->recv_message.recv_message =
std::move(retry_state->recv_message);
// Update bookkeeping.
@ -1384,8 +1368,7 @@ static void recv_message_ready(void* arg, grpc_error* error) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
++retry_state->completed_recv_message_count;
// If a retry was already dispatched, then we're not going to use the
// result of this recv_message op, so do nothing.
@ -1473,8 +1456,7 @@ static void add_closure_for_recv_trailing_metadata_ready(
// Return metadata.
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
grpc_metadata_batch_move(
&retry_state->recv_trailing_metadata,
pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
@ -1576,8 +1558,7 @@ static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
call_data* calld = static_cast<call_data*>(elem->call_data);
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
// Construct list of closures to execute.
grpc_core::CallCombinerClosureList closures;
// First, add closure for recv_trailing_metadata_ready.
@ -1611,8 +1592,7 @@ static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
retry_state->completed_recv_trailing_metadata = true;
// Get the call's status and check for server pushback metadata.
grpc_status_code status = GRPC_STATUS_OK;
@ -1735,8 +1715,7 @@ static void on_complete(void* arg, grpc_error* error) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
batch_data->subchannel_call));
batch_data->subchannel_call->GetParentData());
// Update bookkeeping in retry_state.
if (batch_data->batch.send_initial_metadata) {
retry_state->completed_send_initial_metadata = true;
@ -1792,10 +1771,10 @@ static void on_complete(void* arg, grpc_error* error) {
static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_subchannel_call* subchannel_call =
static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
grpc_core::SubchannelCall* subchannel_call =
static_cast<grpc_core::SubchannelCall*>(batch->handler_private.extra_arg);
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(subchannel_call, batch);
subchannel_call->StartTransportStreamOpBatch(batch);
}
// Adds a closure to closures that will execute batch in the call combiner.
@ -1804,7 +1783,7 @@ static void add_closure_for_subchannel_batch(
grpc_core::CallCombinerClosureList* closures) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
batch->handler_private.extra_arg = calld->subchannel_call;
batch->handler_private.extra_arg = calld->subchannel_call.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure,
start_batch_in_call_combiner, batch,
grpc_schedule_on_exec_ctx);
@ -1978,8 +1957,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
calld->subchannel_call->GetParentData());
// Create batch_data with 2 refs, since this batch will be unreffed twice:
// once for the recv_trailing_metadata_ready callback when the subchannel
// batch returns, and again when we actually get a recv_trailing_metadata
@ -1989,7 +1967,7 @@ static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
retry_state->recv_trailing_metadata_internal_batch = batch_data;
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
calld->subchannel_call->StartTransportStreamOpBatch(&batch_data->batch);
}
// If there are any cached send ops that need to be replayed on the
@ -2196,8 +2174,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
}
subchannel_call_retry_state* retry_state =
static_cast<subchannel_call_retry_state*>(
grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call));
calld->subchannel_call->GetParentData());
// Construct list of closures to execute, one for each pending batch.
grpc_core::CallCombinerClosureList closures;
// Replay previously-returned send_* ops if needed.
@ -2220,7 +2197,7 @@ static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting %" PRIuPTR
" retriable batches on subchannel_call=%p",
chand, calld, closures.size(), calld->subchannel_call);
chand, calld, closures.size(), calld->subchannel_call.get());
}
// Note: This will yield the call combiner.
closures.RunClosures(calld->call_combiner);
@ -2245,21 +2222,21 @@ static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
calld->call_combiner, // call_combiner
parent_data_size // parent_data_size
};
grpc_error* new_error =
calld->request->pick()->connected_subchannel->CreateCall(
call_args, &calld->subchannel_call);
grpc_error* new_error = GRPC_ERROR_NONE;
calld->subchannel_call =
calld->request->pick()->connected_subchannel->CreateCall(call_args,
&new_error);
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
chand, calld, calld->subchannel_call, grpc_error_string(new_error));
chand, calld, calld->subchannel_call.get(),
grpc_error_string(new_error));
}
if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
new_error = grpc_error_add_child(new_error, error);
pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
} else {
if (parent_data_size > 0) {
new (grpc_connected_subchannel_call_get_parent_data(
calld->subchannel_call))
subchannel_call_retry_state(
new (calld->subchannel_call->GetParentData()) subchannel_call_retry_state(
calld->request->pick()->subchannel_call_context);
}
pending_batches_resume(elem);
@ -2488,7 +2465,7 @@ static void cc_start_transport_stream_op_batch(
batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
} else {
// Note: This will release the call combiner.
grpc_subchannel_call_process_op(calld->subchannel_call, batch);
calld->subchannel_call->StartTransportStreamOpBatch(batch);
}
return;
}
@ -2502,7 +2479,7 @@ static void cc_start_transport_stream_op_batch(
if (grpc_client_channel_trace.enabled()) {
gpr_log(GPR_INFO,
"chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
calld, calld->subchannel_call);
calld, calld->subchannel_call.get());
}
pending_batches_resume(elem);
return;
@ -2545,8 +2522,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem,
grpc_closure* then_schedule_closure) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
then_schedule_closure);
calld->subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
then_schedule_closure = nullptr;
}
calld->~call_data();
@ -2752,8 +2728,8 @@ void grpc_client_channel_watch_connectivity_state(
GRPC_ERROR_NONE);
}
grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
grpc_call_element* elem) {
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
return calld->subchannel_call;
}

@ -60,7 +60,7 @@ void grpc_client_channel_watch_connectivity_state(
grpc_closure* watcher_timer_init);
/* Debug helper: pull the subchannel call from a call stack element */
grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
grpc_call_element* elem);
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H */

@ -113,12 +113,11 @@ RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode(
is_top_level_channel);
}
SubchannelNode::SubchannelNode(grpc_subchannel* subchannel,
SubchannelNode::SubchannelNode(Subchannel* subchannel,
size_t channel_tracer_max_nodes)
: BaseNode(EntityType::kSubchannel),
subchannel_(subchannel),
target_(
UniquePtr<char>(gpr_strdup(grpc_subchannel_get_target(subchannel_)))),
target_(UniquePtr<char>(gpr_strdup(subchannel_->GetTargetAddress()))),
trace_(channel_tracer_max_nodes) {}
SubchannelNode::~SubchannelNode() {}
@ -128,8 +127,8 @@ void SubchannelNode::PopulateConnectivityState(grpc_json* json) {
if (subchannel_ == nullptr) {
state = GRPC_CHANNEL_SHUTDOWN;
} else {
state = grpc_subchannel_check_connectivity(
subchannel_, nullptr, true /* inhibit_health_checking */);
state = subchannel_->CheckConnectivity(nullptr,
true /* inhibit_health_checking */);
}
json = grpc_json_create_child(nullptr, json, "state", nullptr,
GRPC_JSON_OBJECT, false);
@ -170,7 +169,7 @@ grpc_json* SubchannelNode::RenderJson() {
call_counter_.PopulateCallCounts(json);
json = top_level_json;
// populate the child socket.
intptr_t socket_uuid = grpc_subchannel_get_child_socket_uuid(subchannel_);
intptr_t socket_uuid = subchannel_->GetChildSocketUuid();
if (socket_uuid != 0) {
grpc_json* array_parent = grpc_json_create_child(
nullptr, json, "socketRef", nullptr, GRPC_JSON_ARRAY, false);

@ -26,9 +26,10 @@
#include "src/core/lib/channel/channel_trace.h"
#include "src/core/lib/channel/channelz.h"
typedef struct grpc_subchannel grpc_subchannel;
namespace grpc_core {
class Subchannel;
namespace channelz {
// Subtype of ChannelNode that overrides and provides client_channel specific
@ -59,7 +60,7 @@ class ClientChannelNode : public ChannelNode {
// Handles channelz bookkeeping for sockets
class SubchannelNode : public BaseNode {
public:
SubchannelNode(grpc_subchannel* subchannel, size_t channel_tracer_max_nodes);
SubchannelNode(Subchannel* subchannel, size_t channel_tracer_max_nodes);
~SubchannelNode() override;
void MarkSubchannelDestroyed() {
@ -84,7 +85,7 @@ class SubchannelNode : public BaseNode {
void RecordCallSucceeded() { call_counter_.RecordCallSucceeded(); }
private:
grpc_subchannel* subchannel_;
Subchannel* subchannel_;
UniquePtr<char> target_;
CallCountingHelper call_counter_;
ChannelTrace trace_;

@ -29,7 +29,7 @@ void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory) {
factory->vtable->unref(factory);
}
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel(
grpc_client_channel_factory* factory, const grpc_channel_args* args) {
return factory->vtable->create_subchannel(factory, args);
}

@ -48,8 +48,8 @@ struct grpc_client_channel_factory {
struct grpc_client_channel_factory_vtable {
void (*ref)(grpc_client_channel_factory* factory);
void (*unref)(grpc_client_channel_factory* factory);
grpc_subchannel* (*create_subchannel)(grpc_client_channel_factory* factory,
const grpc_channel_args* args);
grpc_core::Subchannel* (*create_subchannel)(
grpc_client_channel_factory* factory, const grpc_channel_args* args);
grpc_channel* (*create_client_channel)(grpc_client_channel_factory* factory,
const char* target,
grpc_client_channel_type type,
@ -60,7 +60,7 @@ void grpc_client_channel_factory_ref(grpc_client_channel_factory* factory);
void grpc_client_channel_factory_unref(grpc_client_channel_factory* factory);
/** Create a new grpc_subchannel */
grpc_subchannel* grpc_client_channel_factory_create_subchannel(
grpc_core::Subchannel* grpc_client_channel_factory_create_subchannel(
grpc_client_channel_factory* factory, const grpc_channel_args* args);
/** Create a new grpc_channel */

@ -54,9 +54,9 @@ RefCountedPtr<GlobalSubchannelPool> GlobalSubchannelPool::instance() {
return *instance_;
}
grpc_subchannel* GlobalSubchannelPool::RegisterSubchannel(
SubchannelKey* key, grpc_subchannel* constructed) {
grpc_subchannel* c = nullptr;
Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key,
Subchannel* constructed) {
Subchannel* c = nullptr;
// Compare and swap (CAS) loop:
while (c == nullptr) {
// Ref the shared map to have a local copy.
@ -64,7 +64,7 @@ grpc_subchannel* GlobalSubchannelPool::RegisterSubchannel(
grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr);
gpr_mu_unlock(&mu_);
// Check to see if a subchannel already exists.
c = static_cast<grpc_subchannel*>(grpc_avl_get(old_map, key, nullptr));
c = static_cast<Subchannel*>(grpc_avl_get(old_map, key, nullptr));
if (c != nullptr) {
// The subchannel already exists. Reuse it.
c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "subchannel_register+reuse");
@ -121,15 +121,14 @@ void GlobalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) {
}
}
grpc_subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) {
Subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) {
// Lock, and take a reference to the subchannel map.
// We don't need to do the search under a lock as AVL's are immutable.
gpr_mu_lock(&mu_);
grpc_avl index = grpc_avl_ref(subchannel_map_, nullptr);
gpr_mu_unlock(&mu_);
grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
static_cast<grpc_subchannel*>(grpc_avl_get(index, key, nullptr)),
"found_from_pool");
Subchannel* c = static_cast<Subchannel*>(grpc_avl_get(index, key, nullptr));
if (c != nullptr) GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "found_from_pool");
grpc_avl_unref(index, nullptr);
return c;
}
@ -156,11 +155,11 @@ long sck_avl_compare(void* a, void* b, void* unused) {
}
void scv_avl_destroy(void* p, void* user_data) {
GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "global_subchannel_pool");
GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool");
}
void* scv_avl_copy(void* p, void* unused) {
GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "global_subchannel_pool");
GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool");
return p;
}

@ -45,10 +45,10 @@ class GlobalSubchannelPool final : public SubchannelPoolInterface {
static RefCountedPtr<GlobalSubchannelPool> instance();
// Implements interface methods.
grpc_subchannel* RegisterSubchannel(SubchannelKey* key,
grpc_subchannel* constructed) override;
Subchannel* RegisterSubchannel(SubchannelKey* key,
Subchannel* constructed) override;
void UnregisterSubchannel(SubchannelKey* key) override;
grpc_subchannel* FindSubchannel(SubchannelKey* key) override;
Subchannel* FindSubchannel(SubchannelKey* key) override;
private:
// The singleton instance. (It's a pointer to RefCountedPtr so that this

@ -295,7 +295,9 @@ HealthCheckClient::CallState::~CallState() {
gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
health_check_client_.get(), this);
}
if (call_ != nullptr) GRPC_SUBCHANNEL_CALL_UNREF(call_, "call_ended");
// The subchannel call is in the arena, so reset the pointer before we destroy
// the arena.
call_.reset();
for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
if (context_[i].destroy != nullptr) {
context_[i].destroy(context_[i].value);
@ -329,8 +331,8 @@ void HealthCheckClient::CallState::StartCall() {
&call_combiner_,
0, // parent_data_size
};
grpc_error* error =
health_check_client_->connected_subchannel_->CreateCall(args, &call_);
grpc_error* error = GRPC_ERROR_NONE;
call_ = health_check_client_->connected_subchannel_->CreateCall(args, &error);
if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR,
"HealthCheckClient %p CallState %p: error creating health "
@ -423,14 +425,14 @@ void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
grpc_error* error) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
grpc_subchannel_call* call =
static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
grpc_subchannel_call_process_op(call, batch);
SubchannelCall* call =
static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
call->StartTransportStreamOpBatch(batch);
}
void HealthCheckClient::CallState::StartBatch(
grpc_transport_stream_op_batch* batch) {
batch->handler_private.extra_arg = call_;
batch->handler_private.extra_arg = call_.get();
GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
batch, grpc_schedule_on_exec_ctx);
GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
@ -452,7 +454,7 @@ void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
grpc_subchannel_call_process_op(self->call_, batch);
self->call_->StartTransportStreamOpBatch(batch);
}
void HealthCheckClient::CallState::Cancel() {

@ -99,7 +99,7 @@ class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
// The streaming call to the backend. Always non-NULL.
grpc_subchannel_call* call_;
RefCountedPtr<SubchannelCall> call_;
grpc_transport_stream_op_batch_payload payload_;
grpc_transport_stream_op_batch batch_;

@ -79,7 +79,7 @@ class PickFirst : public LoadBalancingPolicy {
PickFirstSubchannelData(
SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}

@ -94,7 +94,7 @@ class RoundRobin : public LoadBalancingPolicy {
RoundRobinSubchannelData(
SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: SubchannelData(subchannel_list, address, subchannel, combiner) {}

@ -88,7 +88,7 @@ class SubchannelData {
}
// Returns a pointer to the subchannel.
grpc_subchannel* subchannel() const { return subchannel_; }
Subchannel* subchannel() const { return subchannel_; }
// Returns the connected subchannel. Will be null if the subchannel
// is not connected.
@ -103,8 +103,8 @@ class SubchannelData {
// ProcessConnectivityChangeLocked()).
grpc_connectivity_state CheckConnectivityStateLocked(grpc_error** error) {
GPR_ASSERT(!connectivity_notification_pending_);
pending_connectivity_state_unsafe_ = grpc_subchannel_check_connectivity(
subchannel(), error, subchannel_list_->inhibit_health_checking());
pending_connectivity_state_unsafe_ = subchannel()->CheckConnectivity(
error, subchannel_list_->inhibit_health_checking());
UpdateConnectedSubchannelLocked();
return pending_connectivity_state_unsafe_;
}
@ -142,7 +142,7 @@ class SubchannelData {
protected:
SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner);
virtual ~SubchannelData();
@ -170,7 +170,7 @@ class SubchannelData {
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
// The subchannel and connected subchannel.
grpc_subchannel* subchannel_;
Subchannel* subchannel_;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
// Notification that connectivity has changed on subchannel.
@ -203,7 +203,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
for (size_t i = 0; i < subchannels_.size(); ++i) {
if (subchannels_[i].subchannel() != nullptr) {
grpc_core::channelz::SubchannelNode* subchannel_node =
grpc_subchannel_get_channelz_node(subchannels_[i].subchannel());
subchannels_[i].subchannel()->channelz_node();
if (subchannel_node != nullptr) {
refs_list->push_back(subchannel_node->uuid());
}
@ -276,7 +276,7 @@ class SubchannelList : public InternallyRefCounted<SubchannelListType> {
template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
const ServerAddress& address, grpc_subchannel* subchannel,
const ServerAddress& address, Subchannel* subchannel,
grpc_combiner* combiner)
: subchannel_list_(subchannel_list),
subchannel_(subchannel),
@ -317,7 +317,7 @@ template <typename SubchannelListType, typename SubchannelDataType>
void SubchannelData<SubchannelListType,
SubchannelDataType>::ResetBackoffLocked() {
if (subchannel_ != nullptr) {
grpc_subchannel_reset_backoff(subchannel_);
subchannel_->ResetBackoff();
}
}
@ -337,8 +337,8 @@ void SubchannelData<SubchannelListType,
GPR_ASSERT(!connectivity_notification_pending_);
connectivity_notification_pending_ = true;
subchannel_list()->Ref(DEBUG_LOCATION, "connectivity_watch").release();
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
subchannel_->NotifyOnStateChange(
subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
@ -357,8 +357,8 @@ void SubchannelData<SubchannelListType,
grpc_connectivity_state_name(pending_connectivity_state_unsafe_));
}
GPR_ASSERT(connectivity_notification_pending_);
grpc_subchannel_notify_on_state_change(
subchannel_, subchannel_list_->policy()->interested_parties(),
subchannel_->NotifyOnStateChange(
subchannel_list_->policy()->interested_parties(),
&pending_connectivity_state_unsafe_, &connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
@ -391,8 +391,8 @@ void SubchannelData<SubchannelListType, SubchannelDataType>::
subchannel_, reason);
}
GPR_ASSERT(connectivity_notification_pending_);
grpc_subchannel_notify_on_state_change(
subchannel_, nullptr, nullptr, &connectivity_changed_closure_,
subchannel_->NotifyOnStateChange(nullptr, nullptr,
&connectivity_changed_closure_,
subchannel_list_->inhibit_health_checking());
}
@ -401,8 +401,7 @@ bool SubchannelData<SubchannelListType,
SubchannelDataType>::UpdateConnectedSubchannelLocked() {
// If the subchannel is READY, take a ref to the connected subchannel.
if (pending_connectivity_state_unsafe_ == GRPC_CHANNEL_READY) {
connected_subchannel_ =
grpc_subchannel_get_connected_subchannel(subchannel_);
connected_subchannel_ = subchannel_->connected_subchannel();
// If the subchannel became disconnected between the time that READY
// was reported and the time we got here (e.g., between when a
// notification callback is scheduled and when it was actually run in
@ -518,7 +517,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
SubchannelPoolInterface::CreateChannelArg(policy_->subchannel_pool()));
const size_t subchannel_address_arg_index = args_to_add.size();
args_to_add.emplace_back(
grpc_create_subchannel_address_arg(&addresses[i].address()));
Subchannel::CreateSubchannelAddressArg(&addresses[i].address()));
if (addresses[i].args() != nullptr) {
for (size_t j = 0; j < addresses[i].args()->num_args; ++j) {
args_to_add.emplace_back(addresses[i].args()->args[j]);
@ -528,7 +527,7 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
&args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove),
args_to_add.data(), args_to_add.size());
gpr_free(args_to_add[subchannel_address_arg_index].value.string);
grpc_subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
Subchannel* subchannel = grpc_client_channel_factory_create_subchannel(
client_channel_factory, new_args);
grpc_channel_args_destroy(new_args);
if (subchannel == nullptr) {

@ -32,11 +32,11 @@ LocalSubchannelPool::~LocalSubchannelPool() {
grpc_avl_unref(subchannel_map_, nullptr);
}
grpc_subchannel* LocalSubchannelPool::RegisterSubchannel(
SubchannelKey* key, grpc_subchannel* constructed) {
Subchannel* LocalSubchannelPool::RegisterSubchannel(SubchannelKey* key,
Subchannel* constructed) {
// Check to see if a subchannel already exists.
grpc_subchannel* c = static_cast<grpc_subchannel*>(
grpc_avl_get(subchannel_map_, key, nullptr));
Subchannel* c =
static_cast<Subchannel*>(grpc_avl_get(subchannel_map_, key, nullptr));
if (c != nullptr) {
// The subchannel already exists. Reuse it.
c = GRPC_SUBCHANNEL_REF(c, "subchannel_register+reuse");
@ -54,9 +54,9 @@ void LocalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) {
subchannel_map_ = grpc_avl_remove(subchannel_map_, key, nullptr);
}
grpc_subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) {
grpc_subchannel* c = static_cast<grpc_subchannel*>(
grpc_avl_get(subchannel_map_, key, nullptr));
Subchannel* LocalSubchannelPool::FindSubchannel(SubchannelKey* key) {
Subchannel* c =
static_cast<Subchannel*>(grpc_avl_get(subchannel_map_, key, nullptr));
return c == nullptr ? c : GRPC_SUBCHANNEL_REF(c, "found_from_pool");
}

@ -39,10 +39,10 @@ class LocalSubchannelPool final : public SubchannelPoolInterface {
// Implements interface methods.
// Thread-unsafe. Intended to be invoked within the client_channel combiner.
grpc_subchannel* RegisterSubchannel(SubchannelKey* key,
grpc_subchannel* constructed) override;
Subchannel* RegisterSubchannel(SubchannelKey* key,
Subchannel* constructed) override;
void UnregisterSubchannel(SubchannelKey* key) override;
grpc_subchannel* FindSubchannel(SubchannelKey* key) override;
Subchannel* FindSubchannel(SubchannelKey* key) override;
private:
// The vtable for subchannel operations in an AVL tree.

File diff suppressed because it is too large Load Diff

@ -24,53 +24,49 @@
#include "src/core/ext/filters/client_channel/client_channel_channelz.h"
#include "src/core/ext/filters/client_channel/connector.h"
#include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata.h"
// Channel arg containing a grpc_resolved_address to connect to.
#define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
/** A (sub-)channel that knows how to connect to exactly one target
address. Provides a target for load balancing. */
typedef struct grpc_subchannel grpc_subchannel;
typedef struct grpc_subchannel_call grpc_subchannel_call;
// For debugging refcounting.
#ifndef NDEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
grpc_subchannel_ref_from_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) \
grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) \
grpc_subchannel_weak_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) \
grpc_subchannel_weak_unref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r))
(p)->RefFromWeakRef(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
, const char *file, int line, const char *reason
const char *file, int line, const char *reason
#define GRPC_SUBCHANNEL_REF_REASON reason
#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \
, GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) \
grpc_subchannel_ref_from_weak_ref((p))
#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) grpc_subchannel_weak_ref((p))
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) grpc_subchannel_weak_unref((p))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
#define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref()
#define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef()
#define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref()
#define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef()
#define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref()
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#define GRPC_SUBCHANNEL_REF_REASON ""
#define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS
#define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x)
#endif
namespace grpc_core {
class SubchannelCall;
class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
public:
struct CallArgs {
@ -86,8 +82,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
ConnectedSubchannel(
grpc_channel_stack* channel_stack, const grpc_channel_args* args,
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
channelz_subchannel,
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel,
intptr_t socket_uuid);
~ConnectedSubchannel();
@ -95,7 +90,8 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_connectivity_state* state,
grpc_closure* closure);
void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
grpc_error* CreateCall(const CallArgs& args, grpc_subchannel_call** call);
RefCountedPtr<SubchannelCall> CreateCall(const CallArgs& args,
grpc_error** error);
grpc_channel_stack* channel_stack() const { return channel_stack_; }
const grpc_channel_args* args() const { return args_; }
@ -111,91 +107,204 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
grpc_channel_args* args_;
// ref counted pointer to the channelz node in this connected subchannel's
// owning subchannel.
grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
channelz_subchannel_;
RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
// uuid of this subchannel's socket. 0 if this subchannel is not connected.
const intptr_t socket_uuid_;
};
} // namespace grpc_core
// Implements the interface of RefCounted<>.
class SubchannelCall {
public:
SubchannelCall(RefCountedPtr<ConnectedSubchannel> connected_subchannel,
const ConnectedSubchannel::CallArgs& args)
: connected_subchannel_(std::move(connected_subchannel)),
deadline_(args.deadline) {}
// Continues processing a transport stream op batch.
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Returns a pointer to the parent data associated with the subchannel call.
// The data will be of the size specified in \a parent_data_size field of
// the args passed to \a ConnectedSubchannel::CreateCall().
void* GetParentData();
// Returns the call stack of the subchannel call.
grpc_call_stack* GetCallStack();
grpc_closure* after_call_stack_destroy() const {
return after_call_stack_destroy_;
}
// Sets the 'then_schedule_closure' argument for call stack destruction.
// Must be called once per call.
void SetAfterCallStackDestroy(grpc_closure* closure);
// Interface of RefCounted<>.
RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
const char* reason) GRPC_MUST_USE_RESULT;
// When refcount drops to 0, destroys itself and the associated call stack,
// but does NOT free the memory because it's in the call arena.
void Unref();
void Unref(const DebugLocation& location, const char* reason);
grpc_subchannel* grpc_subchannel_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel* grpc_subchannel_weak_ref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_weak_unref(
grpc_subchannel* channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_subchannel_call* grpc_subchannel_call_ref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_unref(
grpc_subchannel_call* call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
grpc_subchannel* subchannel);
intptr_t grpc_subchannel_get_child_socket_uuid(grpc_subchannel* subchannel);
/** Returns a pointer to the parent data associated with \a subchannel_call.
The data will be of the size specified in \a parent_data_size
field of the args passed to \a grpc_connected_subchannel_create_call(). */
void* grpc_connected_subchannel_call_get_parent_data(
grpc_subchannel_call* subchannel_call);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
grpc_subchannel* channel, grpc_error** error, bool inhibit_health_checking);
/** Calls notify when the connectivity state of a channel becomes different
from *state. Updates *state with the new state of the channel. */
void grpc_subchannel_notify_on_state_change(
grpc_subchannel* channel, grpc_pollset_set* interested_parties,
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
template <typename T>
friend class RefCountedPtr;
// If channelz is enabled, intercepts recv_trailing so that we may check the
// status and associate it to a subchannel.
void MaybeInterceptRecvTrailingMetadata(
grpc_transport_stream_op_batch* batch);
static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
// Interface of RefCounted<>.
void IncrementRefCount();
void IncrementRefCount(const DebugLocation& location, const char* reason);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
// State needed to support channelz interception of recv trailing metadata.
grpc_closure recv_trailing_metadata_ready_;
grpc_closure* original_recv_trailing_metadata_ = nullptr;
grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
grpc_millis deadline_;
};
// A subchannel that knows how to connect to exactly one target address. It
// provides a target for load balancing.
class Subchannel {
public:
// The ctor and dtor are not intended to use directly.
Subchannel(SubchannelKey* key, grpc_connector* connector,
const grpc_channel_args* args);
~Subchannel();
// Creates a subchannel given \a connector and \a args.
static Subchannel* Create(grpc_connector* connector,
const grpc_channel_args* args);
// Strong and weak refcounting.
Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
Subchannel* RefFromWeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
intptr_t GetChildSocketUuid();
// Gets the string representing the subchannel address.
// Caller doesn't take ownership.
const char* GetTargetAddress();
// Gets the connected subchannel - or nullptr if not connected (which may
// happen before it initially connects or during transient failures).
RefCountedPtr<ConnectedSubchannel> connected_subchannel();
channelz::SubchannelNode* channelz_node();
// Polls the current connectivity state of the subchannel.
grpc_connectivity_state CheckConnectivity(grpc_error** error,
bool inhibit_health_checking);
// When the connectivity state of the subchannel changes from \a *state,
// invokes \a notify and updates \a *state with the new state.
void NotifyOnStateChange(grpc_pollset_set* interested_parties,
grpc_connectivity_state* state, grpc_closure* notify,
bool inhibit_health_checks);
/** retrieve the grpc_core::ConnectedSubchannel - or nullptr if not connected
* (which may happen before it initially connects or during transient failures)
* */
grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
grpc_subchannel_get_connected_subchannel(grpc_subchannel* c);
// Resets the connection backoff of the subchannel.
// TODO(roth): Move connection backoff out of subchannels and up into LB
// policy code (probably by adding a SubchannelGroup between
// SubchannelList and SubchannelData), at which point this method can
// go away.
void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call* subchannel_call,
grpc_transport_stream_op_batch* op);
/** Must be called once per call. Sets the 'then_schedule_closure' argument for
call stack destruction. */
void grpc_subchannel_call_set_cleanup_closure(
grpc_subchannel_call* subchannel_call, grpc_closure* closure);
grpc_call_stack* grpc_subchannel_call_get_call_stack(
grpc_subchannel_call* subchannel_call);
/** create a subchannel given a connector */
grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
// Resets the connection backoff of the subchannel.
// TODO(roth): Move connection backoff out of subchannels and up into LB
// policy code (probably by adding a SubchannelGroup between
// SubchannelList and SubchannelData), at which point this method can
// go away.
void ResetBackoff();
// Returns a new channel arg encoding the subchannel address as a URI
// string. Caller is responsible for freeing the string.
static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
// Returns the URI string from the subchannel address arg in \a args.
static const char* GetUriFromSubchannelAddressArg(
const grpc_channel_args* args);
/// Sets \a addr from \a args.
void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
// Sets \a addr from the subchannel address arg in \a args.
static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
grpc_resolved_address* addr);
const char* grpc_subchannel_get_target(grpc_subchannel* subchannel);
private:
struct ExternalStateWatcher;
class ConnectedSubchannelStateWatcher;
// Sets the subchannel's connectivity state to \a state.
void SetConnectivityStateLocked(grpc_connectivity_state state,
grpc_error* error, const char* reason);
// Methods for connection.
void MaybeStartConnectingLocked();
static void OnRetryAlarm(void* arg, grpc_error* error);
void ContinueConnectingLocked();
static void OnConnectingFinished(void* arg, grpc_error* error);
bool PublishTransportLocked();
void Disconnect();
gpr_atm RefMutate(gpr_atm delta,
int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS);
// The subchannel pool this subchannel is in.
RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
// TODO(juanlishen): Consider using args_ as key_ directly.
// Subchannel key that identifies this subchannel in the subchannel pool.
SubchannelKey* key_;
// Channel args.
grpc_channel_args* args_;
// pollset_set tracking who's interested in a connection being setup.
grpc_pollset_set* pollset_set_;
// Protects the other members.
gpr_mu mu_;
// Refcount
// - lower INTERNAL_REF_BITS bits are for internal references:
// these do not keep the subchannel open.
// - upper remaining bits are for public references: these do
// keep the subchannel open
gpr_atm ref_pair_;
// Connection states.
grpc_connector* connector_ = nullptr;
// Set during connection.
grpc_connect_out_args connecting_result_;
grpc_closure on_connecting_finished_;
// Active connection, or null.
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
OrphanablePtr<ConnectedSubchannelStateWatcher> connected_subchannel_watcher_;
bool connecting_ = false;
bool disconnected_ = false;
/// Returns the URI string for the address to connect to.
const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args);
// Connectivity state tracking.
grpc_connectivity_state_tracker state_tracker_;
grpc_connectivity_state_tracker state_and_health_tracker_;
UniquePtr<char> health_check_service_name_;
ExternalStateWatcher* external_state_watcher_list_ = nullptr;
/// Returns a new channel arg encoding the subchannel address as a string.
/// Caller is responsible for freeing the string.
grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr);
// Backoff state.
BackOff backoff_;
grpc_millis next_attempt_deadline_;
grpc_millis min_connect_timeout_ms_;
bool backoff_begun_ = false;
// Retry alarm.
grpc_timer retry_alarm_;
grpc_closure on_retry_alarm_;
bool have_retry_alarm_ = false;
// reset_backoff() was called while alarm was pending.
bool retry_immediately_ = false;
// Channelz tracking.
RefCountedPtr<channelz::SubchannelNode> channelz_node_;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */

@ -26,10 +26,10 @@
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/ref_counted.h"
struct grpc_subchannel;
namespace grpc_core {
class Subchannel;
extern TraceFlag grpc_subchannel_pool_trace;
// A key that can uniquely identify a subchannel.
@ -69,15 +69,15 @@ class SubchannelPoolInterface : public RefCounted<SubchannelPoolInterface> {
// Registers a subchannel against a key. Returns the subchannel registered
// with \a key, which may be different from \a constructed because we reuse
// (instead of update) any existing subchannel already registered with \a key.
virtual grpc_subchannel* RegisterSubchannel(
SubchannelKey* key, grpc_subchannel* constructed) GRPC_ABSTRACT;
virtual Subchannel* RegisterSubchannel(SubchannelKey* key,
Subchannel* constructed) GRPC_ABSTRACT;
// Removes the registered subchannel found by \a key.
virtual void UnregisterSubchannel(SubchannelKey* key) GRPC_ABSTRACT;
// Finds the subchannel registered for the given subchannel key. Returns NULL
// if no such channel exists. Thread-safe.
virtual grpc_subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT;
virtual Subchannel* FindSubchannel(SubchannelKey* key) GRPC_ABSTRACT;
// Creates a channel arg from \a subchannel pool.
static grpc_arg CreateChannelArg(SubchannelPoolInterface* subchannel_pool);

@ -202,7 +202,8 @@ static void chttp2_connector_connect(grpc_connector* con,
grpc_closure* notify) {
chttp2_connector* c = reinterpret_cast<chttp2_connector*>(con);
grpc_resolved_address addr;
grpc_get_subchannel_address_arg(args->channel_args, &addr);
grpc_core::Subchannel::GetAddressFromSubchannelAddressArg(args->channel_args,
&addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == nullptr);
c->notify = notify;

@ -39,11 +39,11 @@ static void client_channel_factory_ref(
static void client_channel_factory_unref(
grpc_client_channel_factory* cc_factory) {}
static grpc_subchannel* client_channel_factory_create_subchannel(
static grpc_core::Subchannel* client_channel_factory_create_subchannel(
grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) {
grpc_channel_args* new_args = grpc_default_authority_add_if_not_present(args);
grpc_connector* connector = grpc_chttp2_connector_create();
grpc_subchannel* s = grpc_subchannel_create(connector, new_args);
grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
grpc_channel_args_destroy(new_args);
return s;

@ -76,7 +76,8 @@ static grpc_channel_args* get_secure_naming_channel_args(
grpc_core::UniquePtr<char> authority;
if (target_authority_table != nullptr) {
// Find the authority for the target.
const char* target_uri_str = grpc_get_subchannel_address_uri_arg(args);
const char* target_uri_str =
grpc_core::Subchannel::GetUriFromSubchannelAddressArg(args);
grpc_uri* target_uri =
grpc_uri_parse(target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != nullptr);
@ -138,7 +139,7 @@ static grpc_channel_args* get_secure_naming_channel_args(
return new_args;
}
static grpc_subchannel* client_channel_factory_create_subchannel(
static grpc_core::Subchannel* client_channel_factory_create_subchannel(
grpc_client_channel_factory* cc_factory, const grpc_channel_args* args) {
grpc_channel_args* new_args = get_secure_naming_channel_args(args);
if (new_args == nullptr) {
@ -147,7 +148,7 @@ static grpc_subchannel* client_channel_factory_create_subchannel(
return nullptr;
}
grpc_connector* connector = grpc_chttp2_connector_create();
grpc_subchannel* s = grpc_subchannel_create(connector, new_args);
grpc_core::Subchannel* s = grpc_core::Subchannel::Create(connector, new_args);
grpc_connector_unref(connector);
grpc_channel_args_destroy(new_args);
return s;

@ -36,13 +36,14 @@ grpc_stream* grpc_transport_stream_from_call(grpc_call* call) {
for (;;) {
grpc_call_element* el = grpc_call_stack_element(cs, cs->count - 1);
if (el->filter == &grpc_client_channel_filter) {
grpc_subchannel_call* scc = grpc_client_channel_get_subchannel_call(el);
grpc_core::RefCountedPtr<grpc_core::SubchannelCall> scc =
grpc_client_channel_get_subchannel_call(el);
if (scc == nullptr) {
fprintf(stderr, "No subchannel-call");
fflush(stderr);
return nullptr;
}
cs = grpc_subchannel_call_get_call_stack(scc);
cs = scc->GetCallStack();
} else if (el->filter == &grpc_connected_filter) {
return grpc_connected_channel_get_stream(el);
} else {

@ -325,8 +325,8 @@ class FakeClientChannelFactory : public grpc_client_channel_factory {
private:
static void NoRef(grpc_client_channel_factory* factory) {}
static void NoUnref(grpc_client_channel_factory* factory) {}
static grpc_subchannel* CreateSubchannel(grpc_client_channel_factory* factory,
const grpc_channel_args* args) {
static grpc_core::Subchannel* CreateSubchannel(
grpc_client_channel_factory* factory, const grpc_channel_args* args) {
return nullptr;
}
static grpc_channel* CreateClientChannel(grpc_client_channel_factory* factory,

Loading…
Cancel
Save