Split out LB code into its own call object

pull/24895/head
Mark D. Roth 4 years ago
parent fdb651e927
commit d797cbee47
  1. 1246
      src/core/ext/filters/client_channel/client_channel.cc
  2. 3
      src/core/ext/filters/client_channel/health/health_check_client.cc
  3. 20
      src/core/ext/filters/client_channel/service_config_call_data.h
  4. 25
      src/core/ext/filters/client_channel/subchannel.cc
  5. 12
      src/core/ext/filters/client_channel/subchannel.h

File diff suppressed because it is too large Load Diff

@ -252,7 +252,7 @@ HealthCheckClient::CallState::CallState(
: health_check_client_(std::move(health_check_client)),
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(Arena::Create(health_check_client_->connected_subchannel_
->GetInitialCallSizeEstimate(0))),
->GetInitialCallSizeEstimate())),
payload_(context_) {}
HealthCheckClient::CallState::~CallState() {
@ -291,7 +291,6 @@ void HealthCheckClient::CallState::StartCall() {
arena_,
context_,
&call_combiner_,
0, // parent_data_size
};
grpc_error* error = GRPC_ERROR_NONE;
call_ = SubchannelCall::Create(std::move(args), &error).release();

@ -19,6 +19,10 @@
#include <grpc/support/port_platform.h>
#include <map>
#include "absl/strings/string_view.h"
#include "src/core/ext/filters/client_channel/service_config.h"
#include "src/core/ext/filters/client_channel/service_config_parser.h"
#include "src/core/lib/channel/context.h"
@ -35,13 +39,22 @@ class ServiceConfigCallData {
ServiceConfigCallData(
RefCountedPtr<ServiceConfig> service_config,
const ServiceConfigParser::ParsedConfigVector* method_configs,
std::map<const char*, absl::string_view> call_attributes,
grpc_call_context_element* call_context)
: service_config_(std::move(service_config)),
method_configs_(method_configs) {
method_configs_(method_configs),
call_attributes_(std::move(call_attributes)) {
call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value = this;
call_context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].destroy = Destroy;
}
ServiceConfigCallData(
RefCountedPtr<ServiceConfig> service_config,
const ServiceConfigParser::ParsedConfigVector* method_configs,
grpc_call_context_element* call_context)
: ServiceConfigCallData(std::move(service_config), method_configs, {},
call_context) {}
ServiceConfig* service_config() { return service_config_.get(); }
ServiceConfigParser::ParsedConfig* GetMethodParsedConfig(size_t index) const {
@ -53,6 +66,10 @@ class ServiceConfigCallData {
return service_config_->GetGlobalParsedConfig(index);
}
const std::map<const char*, absl::string_view>& call_attributes() const {
return call_attributes_;
}
private:
static void Destroy(void* ptr) {
ServiceConfigCallData* self = static_cast<ServiceConfigCallData*>(ptr);
@ -61,6 +78,7 @@ class ServiceConfigCallData {
RefCountedPtr<ServiceConfig> service_config_;
const ServiceConfigParser::ParsedConfigVector* method_configs_ = nullptr;
std::map<const char*, absl::string_view> call_attributes_;
};
} // namespace grpc_core

@ -121,18 +121,9 @@ void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
elem->filter->start_transport_op(elem, op);
}
size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
size_t parent_data_size) const {
size_t allocation_size =
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall));
if (parent_data_size > 0) {
allocation_size +=
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
parent_data_size;
} else {
allocation_size += channel_stack_->call_stack_size;
}
return allocation_size;
size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
channel_stack_->call_stack_size;
}
//
@ -142,8 +133,7 @@ size_t ConnectedSubchannel::GetInitialCallSizeEstimate(
RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
grpc_error** error) {
const size_t allocation_size =
args.connected_subchannel->GetInitialCallSizeEstimate(
args.parent_data_size);
args.connected_subchannel->GetInitialCallSizeEstimate();
Arena* arena = args.arena;
return RefCountedPtr<SubchannelCall>(new (
arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
@ -187,13 +177,6 @@ void SubchannelCall::StartTransportStreamOpBatch(
top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
}
void* SubchannelCall::GetParentData() {
grpc_channel_stack* chanstk = connected_subchannel_->channel_stack();
return reinterpret_cast<char*>(this) +
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
GPR_ROUND_UP_TO_ALIGNMENT_SIZE(chanstk->call_stack_size);
}
grpc_call_stack* SubchannelCall::GetCallStack() {
return SUBCHANNEL_CALL_TO_CALL_STACK(this);
}

@ -89,7 +89,7 @@ class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
return channelz_subchannel_.get();
}
size_t GetInitialCallSizeEstimate(size_t parent_data_size) const;
size_t GetInitialCallSizeEstimate() const;
private:
grpc_channel_stack* channel_stack_;
@ -111,18 +111,12 @@ class SubchannelCall {
Arena* arena;
grpc_call_context_element* context;
CallCombiner* call_combiner;
size_t parent_data_size;
};
static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error);
// Continues processing a transport stream op batch.
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Returns a pointer to the parent data associated with the subchannel call.
// The data will be of the size specified in \a parent_data_size field of
// the args passed to \a ConnectedSubchannel::CreateCall().
void* GetParentData();
// Returns the call stack of the subchannel call.
grpc_call_stack* GetCallStack();
@ -139,8 +133,6 @@ class SubchannelCall {
void Unref();
void Unref(const DebugLocation& location, const char* reason);
static void Destroy(void* arg, grpc_error* error);
private:
// Allow RefCountedPtr<> to access IncrementRefCount().
template <typename T>
@ -159,6 +151,8 @@ class SubchannelCall {
void IncrementRefCount();
void IncrementRefCount(const DebugLocation& location, const char* reason);
static void Destroy(void* arg, grpc_error* error);
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
grpc_closure* after_call_stack_destroy_ = nullptr;
// State needed to support channelz interception of recv trailing metadata.

Loading…
Cancel
Save