mirror of https://github.com/grpc/grpc.git
commit
916f6db913
67 changed files with 2826 additions and 1689 deletions
@ -0,0 +1,833 @@ |
|||||||
|
//
|
||||||
|
// Copyright 2018 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <inttypes.h> |
||||||
|
#include <limits.h> |
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include "absl/strings/str_cat.h" |
||||||
|
#include "absl/strings/str_split.h" |
||||||
|
#include "absl/strings/string_view.h" |
||||||
|
|
||||||
|
#include <grpc/grpc.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||||
|
#include "src/core/ext/filters/client_channel/lb_policy_registry.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/gpr/string.h" |
||||||
|
#include "src/core/lib/gprpp/orphanable.h" |
||||||
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||||
|
#include "src/core/lib/iomgr/combiner.h" |
||||||
|
#include "src/core/lib/iomgr/timer.h" |
||||||
|
|
||||||
|
#define GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000) |
||||||
|
|
||||||
|
namespace grpc_core { |
||||||
|
|
||||||
|
TraceFlag grpc_xds_routing_lb_trace(false, "xds_routing_lb"); |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
constexpr char kXdsRouting[] = "xds_routing_experimental"; |
||||||
|
|
||||||
|
// Config for xds_routing LB policy.
|
||||||
|
class XdsRoutingLbConfig : public LoadBalancingPolicy::Config { |
||||||
|
public: |
||||||
|
struct Matcher { |
||||||
|
std::string service; |
||||||
|
std::string method; |
||||||
|
}; |
||||||
|
struct Route { |
||||||
|
Matcher matcher; |
||||||
|
std::string action; |
||||||
|
}; |
||||||
|
using RouteTable = std::vector<Route>; |
||||||
|
using ActionMap = |
||||||
|
std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>; |
||||||
|
|
||||||
|
XdsRoutingLbConfig(ActionMap action_map, RouteTable route_table) |
||||||
|
: action_map_(std::move(action_map)), |
||||||
|
route_table_(std::move(route_table)) {} |
||||||
|
|
||||||
|
const char* name() const override { return kXdsRouting; } |
||||||
|
|
||||||
|
const ActionMap& action_map() const { return action_map_; } |
||||||
|
|
||||||
|
const RouteTable& route_table() const { return route_table_; } |
||||||
|
|
||||||
|
private: |
||||||
|
ActionMap action_map_; |
||||||
|
RouteTable route_table_; |
||||||
|
}; |
||||||
|
|
||||||
|
// xds_routing LB policy.
|
||||||
|
class XdsRoutingLb : public LoadBalancingPolicy { |
||||||
|
public: |
||||||
|
explicit XdsRoutingLb(Args args); |
||||||
|
|
||||||
|
const char* name() const override { return kXdsRouting; } |
||||||
|
|
||||||
|
void UpdateLocked(UpdateArgs args) override; |
||||||
|
void ExitIdleLocked() override; |
||||||
|
void ResetBackoffLocked() override; |
||||||
|
|
||||||
|
private: |
||||||
|
// A simple wrapper for ref-counting a picker from the child policy.
|
||||||
|
class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> { |
||||||
|
public: |
||||||
|
ChildPickerWrapper(std::string name, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) |
||||||
|
: name_(std::move(name)), picker_(std::move(picker)) {} |
||||||
|
PickResult Pick(PickArgs args) { return picker_->Pick(args); } |
||||||
|
|
||||||
|
const std::string& name() const { return name_; } |
||||||
|
|
||||||
|
private: |
||||||
|
std::string name_; |
||||||
|
std::unique_ptr<SubchannelPicker> picker_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Picks a child using prefix or path matching and then delegates to that
|
||||||
|
// child's picker.
|
||||||
|
class RoutePicker : public SubchannelPicker { |
||||||
|
public: |
||||||
|
struct Route { |
||||||
|
XdsRoutingLbConfig::Matcher matcher; |
||||||
|
RefCountedPtr<ChildPickerWrapper> picker; |
||||||
|
}; |
||||||
|
|
||||||
|
// Maintains an ordered xds route table as provided by RDS response.
|
||||||
|
using RouteTable = std::vector<Route>; |
||||||
|
|
||||||
|
explicit RoutePicker(RouteTable route_table) |
||||||
|
: route_table_(std::move(route_table)) {} |
||||||
|
|
||||||
|
PickResult Pick(PickArgs args) override; |
||||||
|
|
||||||
|
private: |
||||||
|
RouteTable route_table_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Each XdsRoutingChild holds a ref to its parent XdsRoutingLb.
|
||||||
|
class XdsRoutingChild : public InternallyRefCounted<XdsRoutingChild> { |
||||||
|
public: |
||||||
|
XdsRoutingChild(RefCountedPtr<XdsRoutingLb> xds_routing_policy, |
||||||
|
const std::string& name); |
||||||
|
~XdsRoutingChild(); |
||||||
|
|
||||||
|
void Orphan() override; |
||||||
|
|
||||||
|
void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config, |
||||||
|
const ServerAddressList& addresses, |
||||||
|
const grpc_channel_args* args); |
||||||
|
void ExitIdleLocked(); |
||||||
|
void ResetBackoffLocked(); |
||||||
|
void DeactivateLocked(); |
||||||
|
|
||||||
|
grpc_connectivity_state connectivity_state() const { |
||||||
|
return connectivity_state_; |
||||||
|
} |
||||||
|
RefCountedPtr<ChildPickerWrapper> picker_wrapper() const { |
||||||
|
return picker_wrapper_; |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
class Helper : public ChannelControlHelper { |
||||||
|
public: |
||||||
|
explicit Helper(RefCountedPtr<XdsRoutingChild> xds_routing_child) |
||||||
|
: xds_routing_child_(std::move(xds_routing_child)) {} |
||||||
|
|
||||||
|
~Helper() { xds_routing_child_.reset(DEBUG_LOCATION, "Helper"); } |
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> CreateSubchannel( |
||||||
|
const grpc_channel_args& args) override; |
||||||
|
void UpdateState(grpc_connectivity_state state, |
||||||
|
std::unique_ptr<SubchannelPicker> picker) override; |
||||||
|
void RequestReresolution() override; |
||||||
|
void AddTraceEvent(TraceSeverity severity, StringView message) override; |
||||||
|
|
||||||
|
private: |
||||||
|
RefCountedPtr<XdsRoutingChild> xds_routing_child_; |
||||||
|
}; |
||||||
|
|
||||||
|
// Methods for dealing with the child policy.
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args); |
||||||
|
|
||||||
|
static void OnDelayedRemovalTimer(void* arg, grpc_error* error); |
||||||
|
static void OnDelayedRemovalTimerLocked(void* arg, grpc_error* error); |
||||||
|
|
||||||
|
// The owning LB policy.
|
||||||
|
RefCountedPtr<XdsRoutingLb> xds_routing_policy_; |
||||||
|
|
||||||
|
// Points to the corresponding key in XdsRoutingLb::actions_.
|
||||||
|
const std::string& name_; |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> child_policy_; |
||||||
|
|
||||||
|
RefCountedPtr<ChildPickerWrapper> picker_wrapper_; |
||||||
|
grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; |
||||||
|
bool seen_failure_since_ready_ = false; |
||||||
|
|
||||||
|
// States for delayed removal.
|
||||||
|
grpc_timer delayed_removal_timer_; |
||||||
|
grpc_closure on_delayed_removal_timer_; |
||||||
|
bool delayed_removal_timer_callback_pending_ = false; |
||||||
|
bool shutdown_ = false; |
||||||
|
}; |
||||||
|
|
||||||
|
~XdsRoutingLb(); |
||||||
|
|
||||||
|
void ShutdownLocked() override; |
||||||
|
|
||||||
|
void UpdateStateLocked(); |
||||||
|
|
||||||
|
// Current config from the resolver.
|
||||||
|
RefCountedPtr<XdsRoutingLbConfig> config_; |
||||||
|
|
||||||
|
// Internal state.
|
||||||
|
bool shutting_down_ = false; |
||||||
|
|
||||||
|
// Children.
|
||||||
|
std::map<std::string, OrphanablePtr<XdsRoutingChild>> actions_; |
||||||
|
}; |
||||||
|
|
||||||
|
//
|
||||||
|
// XdsRoutingLb::RoutePicker
|
||||||
|
//
|
||||||
|
|
||||||
|
XdsRoutingLb::PickResult XdsRoutingLb::RoutePicker::Pick(PickArgs args) { |
||||||
|
absl::string_view path; |
||||||
|
// TODO(roth): Using const auto& here trigger a warning in a macos or windows
|
||||||
|
// build:
|
||||||
|
//*(args.initial_metadata) is returning values not references.
|
||||||
|
for (const auto p : *(args.initial_metadata)) { |
||||||
|
if (p.first == ":path") { |
||||||
|
path = p.second; |
||||||
|
break; |
||||||
|
} |
||||||
|
} |
||||||
|
std::vector<absl::string_view> path_elements = |
||||||
|
absl::StrSplit(path.substr(1), '/'); |
||||||
|
for (const Route& route : route_table_) { |
||||||
|
if ((path_elements[0] == route.matcher.service && |
||||||
|
(path_elements[1] == route.matcher.method || |
||||||
|
route.matcher.method.empty())) || |
||||||
|
(route.matcher.service.empty() && route.matcher.method.empty())) { |
||||||
|
return route.picker.get()->Pick(args); |
||||||
|
} |
||||||
|
} |
||||||
|
PickResult result; |
||||||
|
result.type = PickResult::PICK_FAILED; |
||||||
|
result.error = |
||||||
|
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"xds routing picker: no matching route"), |
||||||
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); |
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// XdsRoutingLb
|
||||||
|
//
|
||||||
|
|
||||||
|
XdsRoutingLb::XdsRoutingLb(Args args) : LoadBalancingPolicy(std::move(args)) {} |
||||||
|
|
||||||
|
XdsRoutingLb::~XdsRoutingLb() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] destroying xds_routing LB policy", |
||||||
|
this); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::ShutdownLocked() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] shutting down", this); |
||||||
|
} |
||||||
|
shutting_down_ = true; |
||||||
|
actions_.clear(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::ExitIdleLocked() { |
||||||
|
for (auto& p : actions_) p.second->ExitIdleLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::ResetBackoffLocked() { |
||||||
|
for (auto& p : actions_) p.second->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::UpdateLocked(UpdateArgs args) { |
||||||
|
if (shutting_down_) return; |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] Received update", this); |
||||||
|
} |
||||||
|
// Update config.
|
||||||
|
config_ = std::move(args.config); |
||||||
|
// Deactivate the actions not in the new config.
|
||||||
|
for (const auto& p : actions_) { |
||||||
|
const std::string& name = p.first; |
||||||
|
XdsRoutingChild* child = p.second.get(); |
||||||
|
if (config_->action_map().find(name) == config_->action_map().end()) { |
||||||
|
child->DeactivateLocked(); |
||||||
|
} |
||||||
|
} |
||||||
|
// Add or update the actions in the new config.
|
||||||
|
for (const auto& p : config_->action_map()) { |
||||||
|
const std::string& name = p.first; |
||||||
|
const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second; |
||||||
|
auto it = actions_.find(name); |
||||||
|
if (it == actions_.end()) { |
||||||
|
it = actions_.emplace(std::make_pair(name, nullptr)).first; |
||||||
|
it->second = MakeOrphanable<XdsRoutingChild>( |
||||||
|
Ref(DEBUG_LOCATION, "XdsRoutingChild"), it->first); |
||||||
|
} |
||||||
|
it->second->UpdateLocked(config, args.addresses, args.args); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::UpdateStateLocked() { |
||||||
|
// Also count the number of children in each state, to determine the
|
||||||
|
// overall state.
|
||||||
|
size_t num_ready = 0; |
||||||
|
size_t num_connecting = 0; |
||||||
|
size_t num_idle = 0; |
||||||
|
size_t num_transient_failures = 0; |
||||||
|
for (const auto& p : actions_) { |
||||||
|
const auto& child_name = p.first; |
||||||
|
const XdsRoutingChild* child = p.second.get(); |
||||||
|
// Skip the actions that are not in the latest update.
|
||||||
|
if (config_->action_map().find(child_name) == config_->action_map().end()) { |
||||||
|
continue; |
||||||
|
} |
||||||
|
switch (child->connectivity_state()) { |
||||||
|
case GRPC_CHANNEL_READY: { |
||||||
|
++num_ready; |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_CONNECTING: { |
||||||
|
++num_connecting; |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_IDLE: { |
||||||
|
++num_idle; |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_TRANSIENT_FAILURE: { |
||||||
|
++num_transient_failures; |
||||||
|
break; |
||||||
|
} |
||||||
|
default: |
||||||
|
GPR_UNREACHABLE_CODE(return ); |
||||||
|
} |
||||||
|
} |
||||||
|
// Determine aggregated connectivity state.
|
||||||
|
grpc_connectivity_state connectivity_state; |
||||||
|
if (num_ready > 0) { |
||||||
|
connectivity_state = GRPC_CHANNEL_READY; |
||||||
|
} else if (num_connecting > 0) { |
||||||
|
connectivity_state = GRPC_CHANNEL_CONNECTING; |
||||||
|
} else if (num_idle > 0) { |
||||||
|
connectivity_state = GRPC_CHANNEL_IDLE; |
||||||
|
} else { |
||||||
|
connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; |
||||||
|
} |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] connectivity changed to %s", this, |
||||||
|
ConnectivityStateName(connectivity_state)); |
||||||
|
} |
||||||
|
std::unique_ptr<SubchannelPicker> picker; |
||||||
|
switch (connectivity_state) { |
||||||
|
case GRPC_CHANNEL_READY: { |
||||||
|
RoutePicker::RouteTable route_table; |
||||||
|
for (const auto& config_route : config_->route_table()) { |
||||||
|
RoutePicker::Route route; |
||||||
|
route.matcher = config_route.matcher; |
||||||
|
route.picker = actions_[config_route.action]->picker_wrapper(); |
||||||
|
if (route.picker == nullptr) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] child %s has not yet returned a " |
||||||
|
"picker; creating a QueuePicker.", |
||||||
|
this, config_route.action.c_str()); |
||||||
|
} |
||||||
|
route.picker = MakeRefCounted<ChildPickerWrapper>( |
||||||
|
config_route.action, absl::make_unique<QueuePicker>( |
||||||
|
Ref(DEBUG_LOCATION, "QueuePicker"))); |
||||||
|
} |
||||||
|
route_table.push_back(std::move(route)); |
||||||
|
} |
||||||
|
picker = absl::make_unique<RoutePicker>(std::move(route_table)); |
||||||
|
break; |
||||||
|
} |
||||||
|
case GRPC_CHANNEL_CONNECTING: |
||||||
|
case GRPC_CHANNEL_IDLE: |
||||||
|
picker = |
||||||
|
absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")); |
||||||
|
break; |
||||||
|
default: |
||||||
|
picker = absl::make_unique<TransientFailurePicker>(grpc_error_set_int( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"TRANSIENT_FAILURE from XdsRoutingLb"), |
||||||
|
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); |
||||||
|
} |
||||||
|
channel_control_helper()->UpdateState(connectivity_state, std::move(picker)); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// XdsRoutingLb::XdsRoutingChild
|
||||||
|
//
|
||||||
|
|
||||||
|
XdsRoutingLb::XdsRoutingChild::XdsRoutingChild( |
||||||
|
RefCountedPtr<XdsRoutingLb> xds_routing_policy, const std::string& name) |
||||||
|
: xds_routing_policy_(std::move(xds_routing_policy)), name_(name) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, "[xds_routing_lb %p] created XdsRoutingChild %p for %s", |
||||||
|
xds_routing_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
XdsRoutingLb::XdsRoutingChild::~XdsRoutingChild() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] XdsRoutingChild %p: destroying child", |
||||||
|
xds_routing_policy_.get(), this); |
||||||
|
} |
||||||
|
xds_routing_policy_.reset(DEBUG_LOCATION, "XdsRoutingChild"); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::Orphan() { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] XdsRoutingChild %p %s: shutting down child", |
||||||
|
xds_routing_policy_.get(), this, name_.c_str()); |
||||||
|
} |
||||||
|
// Remove the child policy's interested_parties pollset_set from the
|
||||||
|
// xDS policy.
|
||||||
|
grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), |
||||||
|
xds_routing_policy_->interested_parties()); |
||||||
|
child_policy_.reset(); |
||||||
|
// Drop our ref to the child's picker, in case it's holding a ref to
|
||||||
|
// the child.
|
||||||
|
picker_wrapper_.reset(); |
||||||
|
if (delayed_removal_timer_callback_pending_) { |
||||||
|
grpc_timer_cancel(&delayed_removal_timer_); |
||||||
|
} |
||||||
|
shutdown_ = true; |
||||||
|
Unref(); |
||||||
|
} |
||||||
|
|
||||||
|
OrphanablePtr<LoadBalancingPolicy> |
||||||
|
XdsRoutingLb::XdsRoutingChild::CreateChildPolicyLocked( |
||||||
|
const grpc_channel_args* args) { |
||||||
|
LoadBalancingPolicy::Args lb_policy_args; |
||||||
|
lb_policy_args.combiner = xds_routing_policy_->combiner(); |
||||||
|
lb_policy_args.args = args; |
||||||
|
lb_policy_args.channel_control_helper = |
||||||
|
absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper")); |
||||||
|
OrphanablePtr<LoadBalancingPolicy> lb_policy = |
||||||
|
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args), |
||||||
|
&grpc_xds_routing_lb_trace); |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] XdsRoutingChild %p %s: Created new child " |
||||||
|
"policy handler %p", |
||||||
|
xds_routing_policy_.get(), this, name_.c_str(), lb_policy.get()); |
||||||
|
} |
||||||
|
// Add the xDS's interested_parties pollset_set to that of the newly created
|
||||||
|
// child policy. This will make the child policy progress upon activity on
|
||||||
|
// xDS LB, which in turn is tied to the application's call.
|
||||||
|
grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), |
||||||
|
xds_routing_policy_->interested_parties()); |
||||||
|
return lb_policy; |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::UpdateLocked( |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> config, |
||||||
|
const ServerAddressList& addresses, const grpc_channel_args* args) { |
||||||
|
if (xds_routing_policy_->shutting_down_) return; |
||||||
|
// Update child weight.
|
||||||
|
// Reactivate if needed.
|
||||||
|
if (delayed_removal_timer_callback_pending_) { |
||||||
|
delayed_removal_timer_callback_pending_ = false; |
||||||
|
grpc_timer_cancel(&delayed_removal_timer_); |
||||||
|
} |
||||||
|
// Create child policy if needed.
|
||||||
|
if (child_policy_ == nullptr) { |
||||||
|
child_policy_ = CreateChildPolicyLocked(args); |
||||||
|
} |
||||||
|
// Construct update args.
|
||||||
|
UpdateArgs update_args; |
||||||
|
update_args.config = std::move(config); |
||||||
|
update_args.addresses = addresses; |
||||||
|
update_args.args = grpc_channel_args_copy(args); |
||||||
|
// Update the policy.
|
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] XdsRoutingChild %p %s: Updating child " |
||||||
|
"policy handler %p", |
||||||
|
xds_routing_policy_.get(), this, name_.c_str(), |
||||||
|
child_policy_.get()); |
||||||
|
} |
||||||
|
child_policy_->UpdateLocked(std::move(update_args)); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::ExitIdleLocked() { |
||||||
|
child_policy_->ExitIdleLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::ResetBackoffLocked() { |
||||||
|
child_policy_->ResetBackoffLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::DeactivateLocked() { |
||||||
|
// If already deactivated, don't do that again.
|
||||||
|
if (delayed_removal_timer_callback_pending_ == true) return; |
||||||
|
// Set the child weight to 0 so that future picker won't contain this child.
|
||||||
|
// Start a timer to delete the child.
|
||||||
|
Ref(DEBUG_LOCATION, "XdsRoutingChild+timer").release(); |
||||||
|
GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
grpc_timer_init( |
||||||
|
&delayed_removal_timer_, |
||||||
|
ExecCtx::Get()->Now() + GRPC_XDS_ROUTING_CHILD_RETENTION_INTERVAL_MS, |
||||||
|
&on_delayed_removal_timer_); |
||||||
|
delayed_removal_timer_callback_pending_ = true; |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimer(void* arg, |
||||||
|
grpc_error* error) { |
||||||
|
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg); |
||||||
|
self->xds_routing_policy_->combiner()->Run( |
||||||
|
GRPC_CLOSURE_INIT(&self->on_delayed_removal_timer_, |
||||||
|
OnDelayedRemovalTimerLocked, self, nullptr), |
||||||
|
GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::OnDelayedRemovalTimerLocked( |
||||||
|
void* arg, grpc_error* error) { |
||||||
|
XdsRoutingChild* self = static_cast<XdsRoutingChild*>(arg); |
||||||
|
self->delayed_removal_timer_callback_pending_ = false; |
||||||
|
if (error == GRPC_ERROR_NONE && !self->shutdown_) { |
||||||
|
self->xds_routing_policy_->actions_.erase(self->name_); |
||||||
|
} |
||||||
|
self->Unref(DEBUG_LOCATION, "XdsRoutingChild+timer"); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// XdsRoutingLb::XdsRoutingChild::Helper
|
||||||
|
//
|
||||||
|
|
||||||
|
RefCountedPtr<SubchannelInterface> |
||||||
|
XdsRoutingLb::XdsRoutingChild::Helper::CreateSubchannel( |
||||||
|
const grpc_channel_args& args) { |
||||||
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return nullptr; |
||||||
|
return xds_routing_child_->xds_routing_policy_->channel_control_helper() |
||||||
|
->CreateSubchannel(args); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::Helper::UpdateState( |
||||||
|
grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) { |
||||||
|
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_routing_lb_trace)) { |
||||||
|
gpr_log(GPR_INFO, |
||||||
|
"[xds_routing_lb %p] child %s: received update: state=%s picker=%p", |
||||||
|
xds_routing_child_->xds_routing_policy_.get(), |
||||||
|
xds_routing_child_->name_.c_str(), ConnectivityStateName(state), |
||||||
|
picker.get()); |
||||||
|
} |
||||||
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
||||||
|
// Cache the picker in the XdsRoutingChild.
|
||||||
|
xds_routing_child_->picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>( |
||||||
|
xds_routing_child_->name_, std::move(picker)); |
||||||
|
// Decide what state to report for aggregation purposes.
|
||||||
|
// If we haven't seen a failure since the last time we were in state
|
||||||
|
// READY, then we report the state change as-is. However, once we do see
|
||||||
|
// a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
|
||||||
|
// changes until we go back into state READY.
|
||||||
|
if (!xds_routing_child_->seen_failure_since_ready_) { |
||||||
|
if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { |
||||||
|
xds_routing_child_->seen_failure_since_ready_ = true; |
||||||
|
} |
||||||
|
} else { |
||||||
|
if (state != GRPC_CHANNEL_READY) return; |
||||||
|
xds_routing_child_->seen_failure_since_ready_ = false; |
||||||
|
} |
||||||
|
xds_routing_child_->connectivity_state_ = state; |
||||||
|
// Notify the LB policy.
|
||||||
|
xds_routing_child_->xds_routing_policy_->UpdateStateLocked(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::Helper::RequestReresolution() { |
||||||
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
||||||
|
xds_routing_child_->xds_routing_policy_->channel_control_helper() |
||||||
|
->RequestReresolution(); |
||||||
|
} |
||||||
|
|
||||||
|
void XdsRoutingLb::XdsRoutingChild::Helper::AddTraceEvent( |
||||||
|
TraceSeverity severity, StringView message) { |
||||||
|
if (xds_routing_child_->xds_routing_policy_->shutting_down_) return; |
||||||
|
xds_routing_child_->xds_routing_policy_->channel_control_helper() |
||||||
|
->AddTraceEvent(severity, message); |
||||||
|
} |
||||||
|
|
||||||
|
//
|
||||||
|
// factory
|
||||||
|
//
|
||||||
|
|
||||||
|
class XdsRoutingLbFactory : public LoadBalancingPolicyFactory { |
||||||
|
public: |
||||||
|
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( |
||||||
|
LoadBalancingPolicy::Args args) const override { |
||||||
|
return MakeOrphanable<XdsRoutingLb>(std::move(args)); |
||||||
|
} |
||||||
|
|
||||||
|
const char* name() const override { return kXdsRouting; } |
||||||
|
|
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig( |
||||||
|
const Json& json, grpc_error** error) const override { |
||||||
|
GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); |
||||||
|
if (json.type() == Json::Type::JSON_NULL) { |
||||||
|
// xds_routing was mentioned as a policy in the deprecated
|
||||||
|
// loadBalancingPolicy field or in the client API.
|
||||||
|
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:loadBalancingPolicy error:xds_routing policy requires " |
||||||
|
"configuration. Please use loadBalancingConfig field of service " |
||||||
|
"config instead."); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
// action map.
|
||||||
|
XdsRoutingLbConfig::ActionMap action_map; |
||||||
|
std::set<std::string /*action_name*/> actions_to_be_used; |
||||||
|
auto it = json.object_value().find("actions"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:actions error:required field not present")); |
||||||
|
} else if (it->second.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:actions error:type should be object")); |
||||||
|
} else { |
||||||
|
for (const auto& p : it->second.object_value()) { |
||||||
|
if (p.first.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:actions element error: name cannot be empty")); |
||||||
|
continue; |
||||||
|
} |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config> child_config; |
||||||
|
std::vector<grpc_error*> child_errors = |
||||||
|
ParseChildConfig(p.second, &child_config); |
||||||
|
if (!child_errors.empty()) { |
||||||
|
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
|
||||||
|
// string is not static in this case.
|
||||||
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:actions name:", p.first).c_str()); |
||||||
|
for (grpc_error* child_error : child_errors) { |
||||||
|
error = grpc_error_add_child(error, child_error); |
||||||
|
} |
||||||
|
error_list.push_back(error); |
||||||
|
} else { |
||||||
|
action_map[p.first] = std::move(child_config); |
||||||
|
actions_to_be_used.insert(p.first); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
if (action_map.empty()) { |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid actions configured")); |
||||||
|
} |
||||||
|
XdsRoutingLbConfig::RouteTable route_table; |
||||||
|
it = json.object_value().find("routes"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:routes error:required field not present")); |
||||||
|
} else if (it->second.type() != Json::Type::ARRAY) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:routes error:type should be array")); |
||||||
|
} else { |
||||||
|
const Json::Array& array = it->second.array_value(); |
||||||
|
for (size_t i = 0; i < array.size(); ++i) { |
||||||
|
XdsRoutingLbConfig::Route route; |
||||||
|
std::vector<grpc_error*> route_errors = |
||||||
|
ParseRoute(array[i], action_map, &route, &actions_to_be_used); |
||||||
|
if (!route_errors.empty()) { |
||||||
|
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
|
||||||
|
// string is not static in this case.
|
||||||
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( |
||||||
|
absl::StrCat("field:routes element: ", i, " error").c_str()); |
||||||
|
for (grpc_error* route_error : route_errors) { |
||||||
|
error = grpc_error_add_child(error, route_error); |
||||||
|
} |
||||||
|
error_list.push_back(error); |
||||||
|
} |
||||||
|
route_table.emplace_back(std::move(route)); |
||||||
|
} |
||||||
|
} |
||||||
|
if (route_table.empty()) { |
||||||
|
grpc_error* error = |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid routes configured"); |
||||||
|
error_list.push_back(error); |
||||||
|
} |
||||||
|
if (!route_table.back().matcher.service.empty() || |
||||||
|
!route_table.back().matcher.method.empty()) { |
||||||
|
grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"default route must not contain service or method"); |
||||||
|
error_list.push_back(error); |
||||||
|
} |
||||||
|
if (!actions_to_be_used.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"some actions were not referenced by any route")); |
||||||
|
} |
||||||
|
if (!error_list.empty()) { |
||||||
|
*error = GRPC_ERROR_CREATE_FROM_VECTOR( |
||||||
|
"xds_routing_experimental LB policy config", &error_list); |
||||||
|
return nullptr; |
||||||
|
} |
||||||
|
return MakeRefCounted<XdsRoutingLbConfig>(std::move(action_map), |
||||||
|
std::move(route_table)); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
static std::vector<grpc_error*> ParseChildConfig( |
||||||
|
const Json& json, |
||||||
|
RefCountedPtr<LoadBalancingPolicy::Config>* child_config) { |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
if (json.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"value should be of type object")); |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
auto it = json.object_value().find("child_policy"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy")); |
||||||
|
} else { |
||||||
|
grpc_error* parse_error = GRPC_ERROR_NONE; |
||||||
|
*child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( |
||||||
|
it->second, &parse_error); |
||||||
|
if (*child_config == nullptr) { |
||||||
|
GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); |
||||||
|
std::vector<grpc_error*> child_errors; |
||||||
|
child_errors.push_back(parse_error); |
||||||
|
error_list.push_back( |
||||||
|
GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); |
||||||
|
} |
||||||
|
} |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
|
||||||
|
static std::vector<grpc_error*> ParseMethodName( |
||||||
|
const Json& json, XdsRoutingLbConfig::Matcher* route_config) { |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
if (json.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"value should be of type object")); |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
// Parse service
|
||||||
|
auto it = json.object_value().find("service"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:service error: should be string")); |
||||||
|
} else { |
||||||
|
route_config->service = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
// Parse method
|
||||||
|
it = json.object_value().find("method"); |
||||||
|
if (it != json.object_value().end()) { |
||||||
|
if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:method error: should be string")); |
||||||
|
} else { |
||||||
|
route_config->method = it->second.string_value(); |
||||||
|
} |
||||||
|
} |
||||||
|
if (route_config->service.empty() && !route_config->method.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"service is empty when method is not")); |
||||||
|
} |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
|
||||||
|
static std::vector<grpc_error*> ParseRoute( |
||||||
|
const Json& json, const XdsRoutingLbConfig::ActionMap& action_map, |
||||||
|
XdsRoutingLbConfig::Route* route, |
||||||
|
std::set<std::string /*action_name*/>* actions_to_be_used) { |
||||||
|
std::vector<grpc_error*> error_list; |
||||||
|
if (json.type() != Json::Type::OBJECT) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"value should be of type object")); |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
// Parse MethodName.
|
||||||
|
auto it = json.object_value().find("methodName"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:methodName error:required field missing")); |
||||||
|
} else { |
||||||
|
std::vector<grpc_error*> method_name_errors = |
||||||
|
ParseMethodName(it->second, &route->matcher); |
||||||
|
if (!method_name_errors.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR( |
||||||
|
"field:methodName", &method_name_errors)); |
||||||
|
} |
||||||
|
} |
||||||
|
// Parse action.
|
||||||
|
it = json.object_value().find("action"); |
||||||
|
if (it == json.object_value().end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:action error:required field missing")); |
||||||
|
} else if (it->second.type() != Json::Type::STRING) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:action error:should be of type string")); |
||||||
|
} else { |
||||||
|
route->action = it->second.string_value(); |
||||||
|
if (route->action.empty()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
"field:action error:cannot be empty")); |
||||||
|
} else { |
||||||
|
// Validate action exists and mark it as used.
|
||||||
|
if (action_map.find(route->action) == action_map.end()) { |
||||||
|
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
||||||
|
absl::StrCat("field:action error:", route->action, |
||||||
|
" does not exist") |
||||||
|
.c_str())); |
||||||
|
} |
||||||
|
actions_to_be_used->erase(route->action); |
||||||
|
} |
||||||
|
} |
||||||
|
return error_list; |
||||||
|
} |
||||||
|
}; |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
} // namespace grpc_core
|
||||||
|
|
||||||
|
//
|
||||||
|
// Plugin registration
|
||||||
|
//
|
||||||
|
|
||||||
|
void grpc_lb_policy_xds_routing_init() { |
||||||
|
grpc_core::LoadBalancingPolicyRegistry::Builder:: |
||||||
|
RegisterLoadBalancingPolicyFactory( |
||||||
|
absl::make_unique<grpc_core::XdsRoutingLbFactory>()); |
||||||
|
} |
||||||
|
|
||||||
|
void grpc_lb_policy_xds_routing_shutdown() {} |
@ -0,0 +1,358 @@ |
|||||||
|
//
|
||||||
|
//
|
||||||
|
// Copyright 2020 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include <assert.h> |
||||||
|
#include <string.h> |
||||||
|
|
||||||
|
#include <grpc/compression.h> |
||||||
|
#include <grpc/slice_buffer.h> |
||||||
|
#include <grpc/support/alloc.h> |
||||||
|
#include <grpc/support/log.h> |
||||||
|
#include <grpc/support/string_util.h> |
||||||
|
|
||||||
|
#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h" |
||||||
|
#include "src/core/lib/channel/channel_args.h" |
||||||
|
#include "src/core/lib/compression/algorithm_metadata.h" |
||||||
|
#include "src/core/lib/compression/compression_args.h" |
||||||
|
#include "src/core/lib/compression/compression_internal.h" |
||||||
|
#include "src/core/lib/compression/message_compress.h" |
||||||
|
#include "src/core/lib/gpr/string.h" |
||||||
|
#include "src/core/lib/slice/slice_internal.h" |
||||||
|
#include "src/core/lib/slice/slice_string_helpers.h" |
||||||
|
|
||||||
|
namespace { |
||||||
|
|
||||||
|
class ChannelData {}; |
||||||
|
|
||||||
|
class CallData { |
||||||
|
public: |
||||||
|
explicit CallData(const grpc_call_element_args& args) |
||||||
|
: call_combiner_(args.call_combiner) { |
||||||
|
// Initialize state for recv_initial_metadata_ready callback
|
||||||
|
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_, |
||||||
|
OnRecvInitialMetadataReady, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Initialize state for recv_message_ready callback
|
||||||
|
grpc_slice_buffer_init(&recv_slices_); |
||||||
|
GRPC_CLOSURE_INIT(&on_recv_message_next_done_, OnRecvMessageNextDone, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
// Initialize state for recv_trailing_metadata_ready callback
|
||||||
|
GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_, |
||||||
|
OnRecvTrailingMetadataReady, this, |
||||||
|
grpc_schedule_on_exec_ctx); |
||||||
|
} |
||||||
|
|
||||||
|
~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); } |
||||||
|
|
||||||
|
void DecompressStartTransportStreamOpBatch( |
||||||
|
grpc_call_element* elem, grpc_transport_stream_op_batch* batch); |
||||||
|
|
||||||
|
private: |
||||||
|
static void OnRecvInitialMetadataReady(void* arg, grpc_error* error); |
||||||
|
|
||||||
|
// Methods for processing a receive message event
|
||||||
|
void MaybeResumeOnRecvMessageReady(); |
||||||
|
static void OnRecvMessageReady(void* arg, grpc_error* error); |
||||||
|
static void OnRecvMessageNextDone(void* arg, grpc_error* error); |
||||||
|
grpc_error* PullSliceFromRecvMessage(); |
||||||
|
void ContinueReadingRecvMessage(); |
||||||
|
void FinishRecvMessage(); |
||||||
|
void ContinueRecvMessageReadyCallback(grpc_error* error); |
||||||
|
|
||||||
|
// Methods for processing a recv_trailing_metadata event
|
||||||
|
void MaybeResumeOnRecvTrailingMetadataReady(); |
||||||
|
static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error); |
||||||
|
|
||||||
|
grpc_core::CallCombiner* call_combiner_; |
||||||
|
// Overall error for the call
|
||||||
|
grpc_error* error_ = GRPC_ERROR_NONE; |
||||||
|
// Fields for handling recv_initial_metadata_ready callback
|
||||||
|
grpc_closure on_recv_initial_metadata_ready_; |
||||||
|
grpc_closure* original_recv_initial_metadata_ready_ = nullptr; |
||||||
|
grpc_metadata_batch* recv_initial_metadata_ = nullptr; |
||||||
|
// Fields for handling recv_message_ready callback
|
||||||
|
bool seen_recv_message_ready_ = false; |
||||||
|
grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE; |
||||||
|
grpc_closure on_recv_message_ready_; |
||||||
|
grpc_closure* original_recv_message_ready_ = nullptr; |
||||||
|
grpc_closure on_recv_message_next_done_; |
||||||
|
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_ = nullptr; |
||||||
|
// recv_slices_ holds the slices read from the original recv_message stream.
|
||||||
|
// It is initialized during construction and reset when a new stream is
|
||||||
|
// created using it.
|
||||||
|
grpc_slice_buffer recv_slices_; |
||||||
|
std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream), |
||||||
|
alignof(grpc_core::SliceBufferByteStream)>::type |
||||||
|
recv_replacement_stream_; |
||||||
|
// Fields for handling recv_trailing_metadata_ready callback
|
||||||
|
bool seen_recv_trailing_metadata_ready_ = false; |
||||||
|
grpc_closure on_recv_trailing_metadata_ready_; |
||||||
|
grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; |
||||||
|
grpc_error* on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; |
||||||
|
}; |
||||||
|
|
||||||
|
grpc_message_compression_algorithm DecodeMessageCompressionAlgorithm( |
||||||
|
grpc_mdelem md) { |
||||||
|
grpc_message_compression_algorithm algorithm = |
||||||
|
grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md)); |
||||||
|
if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) { |
||||||
|
char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md)); |
||||||
|
gpr_log(GPR_ERROR, |
||||||
|
"Invalid incoming message compression algorithm: '%s'. " |
||||||
|
"Interpreting incoming data as uncompressed.", |
||||||
|
md_c_str); |
||||||
|
gpr_free(md_c_str); |
||||||
|
return GRPC_MESSAGE_COMPRESS_NONE; |
||||||
|
} |
||||||
|
return algorithm; |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) { |
||||||
|
CallData* calld = static_cast<CallData*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
grpc_linked_mdelem* grpc_encoding = |
||||||
|
calld->recv_initial_metadata_->idx.named.grpc_encoding; |
||||||
|
if (grpc_encoding != nullptr) { |
||||||
|
calld->algorithm_ = DecodeMessageCompressionAlgorithm(grpc_encoding->md); |
||||||
|
} |
||||||
|
} |
||||||
|
calld->MaybeResumeOnRecvMessageReady(); |
||||||
|
calld->MaybeResumeOnRecvTrailingMetadataReady(); |
||||||
|
grpc_closure* closure = calld->original_recv_initial_metadata_ready_; |
||||||
|
calld->original_recv_initial_metadata_ready_ = nullptr; |
||||||
|
grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::MaybeResumeOnRecvMessageReady() { |
||||||
|
if (seen_recv_message_ready_) { |
||||||
|
seen_recv_message_ready_ = false; |
||||||
|
GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_, |
||||||
|
GRPC_ERROR_NONE, |
||||||
|
"continue recv_message_ready callback"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::OnRecvMessageReady(void* arg, grpc_error* error) { |
||||||
|
CallData* calld = static_cast<CallData*>(arg); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
if (calld->original_recv_initial_metadata_ready_ != nullptr) { |
||||||
|
calld->seen_recv_message_ready_ = true; |
||||||
|
GRPC_CALL_COMBINER_STOP(calld->call_combiner_, |
||||||
|
"Deferring OnRecvMessageReady until after " |
||||||
|
"OnRecvInitialMetadataReady"); |
||||||
|
return; |
||||||
|
} |
||||||
|
if (calld->algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) { |
||||||
|
// recv_message can be NULL if trailing metadata is received instead of
|
||||||
|
// message, or it's possible that the message was not compressed.
|
||||||
|
if (*calld->recv_message_ == nullptr || |
||||||
|
(*calld->recv_message_)->length() == 0 || |
||||||
|
((*calld->recv_message_)->flags() & GRPC_WRITE_INTERNAL_COMPRESS) == |
||||||
|
0) { |
||||||
|
return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE); |
||||||
|
} |
||||||
|
grpc_slice_buffer_destroy_internal(&calld->recv_slices_); |
||||||
|
grpc_slice_buffer_init(&calld->recv_slices_); |
||||||
|
return calld->ContinueReadingRecvMessage(); |
||||||
|
} |
||||||
|
} |
||||||
|
calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::ContinueReadingRecvMessage() { |
||||||
|
while ((*recv_message_) |
||||||
|
->Next((*recv_message_)->length() - recv_slices_.length, |
||||||
|
&on_recv_message_next_done_)) { |
||||||
|
grpc_error* error = PullSliceFromRecvMessage(); |
||||||
|
if (error != GRPC_ERROR_NONE) { |
||||||
|
return ContinueRecvMessageReadyCallback(error); |
||||||
|
} |
||||||
|
// We have read the entire message.
|
||||||
|
if (recv_slices_.length == (*recv_message_)->length()) { |
||||||
|
return FinishRecvMessage(); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
grpc_error* CallData::PullSliceFromRecvMessage() { |
||||||
|
grpc_slice incoming_slice; |
||||||
|
grpc_error* error = (*recv_message_)->Pull(&incoming_slice); |
||||||
|
if (error == GRPC_ERROR_NONE) { |
||||||
|
grpc_slice_buffer_add(&recv_slices_, incoming_slice); |
||||||
|
} |
||||||
|
return error; |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::OnRecvMessageNextDone(void* arg, grpc_error* error) { |
||||||
|
CallData* calld = static_cast<CallData*>(arg); |
||||||
|
if (error != GRPC_ERROR_NONE) { |
||||||
|
return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error)); |
||||||
|
} |
||||||
|
error = calld->PullSliceFromRecvMessage(); |
||||||
|
if (error != GRPC_ERROR_NONE) { |
||||||
|
return calld->ContinueRecvMessageReadyCallback(error); |
||||||
|
} |
||||||
|
if (calld->recv_slices_.length == (*calld->recv_message_)->length()) { |
||||||
|
calld->FinishRecvMessage(); |
||||||
|
} else { |
||||||
|
calld->ContinueReadingRecvMessage(); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::FinishRecvMessage() { |
||||||
|
grpc_slice_buffer decompressed_slices; |
||||||
|
grpc_slice_buffer_init(&decompressed_slices); |
||||||
|
if (grpc_msg_decompress(algorithm_, &recv_slices_, &decompressed_slices) == |
||||||
|
0) { |
||||||
|
char* msg; |
||||||
|
gpr_asprintf( |
||||||
|
&msg, |
||||||
|
"Unexpected error decompressing data for algorithm with enum value %d", |
||||||
|
algorithm_); |
||||||
|
GPR_DEBUG_ASSERT(error_ == GRPC_ERROR_NONE); |
||||||
|
error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); |
||||||
|
gpr_free(msg); |
||||||
|
grpc_slice_buffer_destroy_internal(&decompressed_slices); |
||||||
|
} else { |
||||||
|
uint32_t recv_flags = |
||||||
|
((*recv_message_)->flags() & (~GRPC_WRITE_INTERNAL_COMPRESS)) | |
||||||
|
GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED; |
||||||
|
// Swap out the original receive byte stream with our new one and send the
|
||||||
|
// batch down.
|
||||||
|
// Initializing recv_replacement_stream_ with decompressed_slices removes
|
||||||
|
// all the slices from decompressed_slices leaving it empty.
|
||||||
|
new (&recv_replacement_stream_) |
||||||
|
grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags); |
||||||
|
recv_message_->reset(reinterpret_cast<grpc_core::SliceBufferByteStream*>( |
||||||
|
&recv_replacement_stream_)); |
||||||
|
recv_message_ = nullptr; |
||||||
|
} |
||||||
|
ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_)); |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) { |
||||||
|
MaybeResumeOnRecvTrailingMetadataReady(); |
||||||
|
// The surface will clean up the receiving stream if there is an error.
|
||||||
|
grpc_closure* closure = original_recv_message_ready_; |
||||||
|
original_recv_message_ready_ = nullptr; |
||||||
|
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::MaybeResumeOnRecvTrailingMetadataReady() { |
||||||
|
if (seen_recv_trailing_metadata_ready_) { |
||||||
|
seen_recv_trailing_metadata_ready_ = false; |
||||||
|
grpc_error* error = on_recv_trailing_metadata_ready_error_; |
||||||
|
on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE; |
||||||
|
GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_, |
||||||
|
error, "Continuing OnRecvTrailingMetadataReady"); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) { |
||||||
|
CallData* calld = static_cast<CallData*>(arg); |
||||||
|
if (calld->original_recv_initial_metadata_ready_ != nullptr || |
||||||
|
calld->original_recv_message_ready_ != nullptr) { |
||||||
|
calld->seen_recv_trailing_metadata_ready_ = true; |
||||||
|
calld->on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error); |
||||||
|
GRPC_CALL_COMBINER_STOP( |
||||||
|
calld->call_combiner_, |
||||||
|
"Deferring OnRecvTrailingMetadataReady until after " |
||||||
|
"OnRecvInitialMetadataReady and OnRecvMessageReady"); |
||||||
|
return; |
||||||
|
} |
||||||
|
error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_); |
||||||
|
calld->error_ = GRPC_ERROR_NONE; |
||||||
|
grpc_closure* closure = calld->original_recv_trailing_metadata_ready_; |
||||||
|
calld->original_recv_trailing_metadata_ready_ = nullptr; |
||||||
|
grpc_core::Closure::Run(DEBUG_LOCATION, closure, error); |
||||||
|
} |
||||||
|
|
||||||
|
void CallData::DecompressStartTransportStreamOpBatch( |
||||||
|
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
||||||
|
// Handle recv_initial_metadata.
|
||||||
|
if (batch->recv_initial_metadata) { |
||||||
|
recv_initial_metadata_ = |
||||||
|
batch->payload->recv_initial_metadata.recv_initial_metadata; |
||||||
|
original_recv_initial_metadata_ready_ = |
||||||
|
batch->payload->recv_initial_metadata.recv_initial_metadata_ready; |
||||||
|
batch->payload->recv_initial_metadata.recv_initial_metadata_ready = |
||||||
|
&on_recv_initial_metadata_ready_; |
||||||
|
} |
||||||
|
// Handle recv_message
|
||||||
|
if (batch->recv_message) { |
||||||
|
recv_message_ = batch->payload->recv_message.recv_message; |
||||||
|
original_recv_message_ready_ = |
||||||
|
batch->payload->recv_message.recv_message_ready; |
||||||
|
batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_; |
||||||
|
} |
||||||
|
// Handle recv_trailing_metadata
|
||||||
|
if (batch->recv_trailing_metadata) { |
||||||
|
original_recv_trailing_metadata_ready_ = |
||||||
|
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
||||||
|
batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready = |
||||||
|
&on_recv_trailing_metadata_ready_; |
||||||
|
} |
||||||
|
// Pass control down the stack.
|
||||||
|
grpc_call_next_op(elem, batch); |
||||||
|
} |
||||||
|
|
||||||
|
void DecompressStartTransportStreamOpBatch( |
||||||
|
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
||||||
|
GPR_TIMER_SCOPE("decompress_start_transport_stream_op_batch", 0); |
||||||
|
CallData* calld = static_cast<CallData*>(elem->call_data); |
||||||
|
calld->DecompressStartTransportStreamOpBatch(elem, batch); |
||||||
|
} |
||||||
|
|
||||||
|
static grpc_error* DecompressInitCallElem(grpc_call_element* elem, |
||||||
|
const grpc_call_element_args* args) { |
||||||
|
new (elem->call_data) CallData(*args); |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
static void DecompressDestroyCallElem( |
||||||
|
grpc_call_element* elem, const grpc_call_final_info* /*final_info*/, |
||||||
|
grpc_closure* /*ignored*/) { |
||||||
|
CallData* calld = static_cast<CallData*>(elem->call_data); |
||||||
|
calld->~CallData(); |
||||||
|
} |
||||||
|
|
||||||
|
static grpc_error* DecompressInitChannelElem( |
||||||
|
grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) { |
||||||
|
return GRPC_ERROR_NONE; |
||||||
|
} |
||||||
|
|
||||||
|
void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {} |
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
const grpc_channel_filter grpc_message_decompress_filter = { |
||||||
|
DecompressStartTransportStreamOpBatch, |
||||||
|
grpc_channel_next_op, |
||||||
|
sizeof(CallData), |
||||||
|
DecompressInitCallElem, |
||||||
|
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
||||||
|
DecompressDestroyCallElem, |
||||||
|
0, // sizeof(ChannelData)
|
||||||
|
DecompressInitChannelElem, |
||||||
|
DecompressDestroyChannelElem, |
||||||
|
grpc_channel_next_get_info, |
||||||
|
"message_decompress"}; |
@ -0,0 +1,29 @@ |
|||||||
|
//
|
||||||
|
//
|
||||||
|
// Copyright 2020 gRPC authors.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H |
||||||
|
#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H |
||||||
|
|
||||||
|
#include <grpc/support/port_platform.h> |
||||||
|
|
||||||
|
#include "src/core/lib/channel/channel_stack.h" |
||||||
|
|
||||||
|
extern const grpc_channel_filter grpc_message_decompress_filter; |
||||||
|
|
||||||
|
#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \ |
||||||
|
*/ |
@ -1,356 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2020 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
/// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
|
|
||||||
/// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
|
|
||||||
/// handle and trigger all the CFStream events. The CFStream streams register
|
|
||||||
/// themselves with the run loop with functions grpc_apple_register_read_stream
|
|
||||||
/// and grpc_apple_register_read_stream. Pollsets are dummy and block on a
|
|
||||||
/// condition variable in pollset_work().
|
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#include "src/core/lib/iomgr/port.h" |
|
||||||
|
|
||||||
#ifdef GRPC_APPLE_EV |
|
||||||
|
|
||||||
#include <CoreFoundation/CoreFoundation.h> |
|
||||||
|
|
||||||
#include <list> |
|
||||||
|
|
||||||
#include "src/core/lib/gprpp/thd.h" |
|
||||||
#include "src/core/lib/iomgr/ev_apple.h" |
|
||||||
|
|
||||||
grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling"); |
|
||||||
|
|
||||||
#ifndef NDEBUG |
|
||||||
#define GRPC_POLLING_TRACE(format, ...) \ |
|
||||||
if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
|
|
||||||
gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \
|
|
||||||
} |
|
||||||
#else |
|
||||||
#define GRPC_POLLING_TRACE(...) |
|
||||||
#endif // NDEBUG
|
|
||||||
|
|
||||||
#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) |
|
||||||
|
|
||||||
struct GlobalRunLoopContext { |
|
||||||
grpc_core::CondVar init_cv; |
|
||||||
grpc_core::CondVar input_source_cv; |
|
||||||
|
|
||||||
grpc_core::Mutex mu; |
|
||||||
|
|
||||||
// Whether an input source registration is pending. Protected by mu.
|
|
||||||
bool input_source_registered = false; |
|
||||||
|
|
||||||
// The reference to the global run loop object. Protected by mu.
|
|
||||||
CFRunLoopRef run_loop; |
|
||||||
|
|
||||||
// Whether the pollset has been globally shut down. Protected by mu.
|
|
||||||
bool is_shutdown = false; |
|
||||||
}; |
|
||||||
|
|
||||||
struct GrpcAppleWorker { |
|
||||||
// The condition varible to kick the worker. Works with the pollset's lock
|
|
||||||
// (GrpcApplePollset.mu).
|
|
||||||
grpc_core::CondVar cv; |
|
||||||
|
|
||||||
// Whether the worker is kicked. Protected by the pollset's lock
|
|
||||||
// (GrpcApplePollset.mu).
|
|
||||||
bool kicked = false; |
|
||||||
}; |
|
||||||
|
|
||||||
struct GrpcApplePollset { |
|
||||||
grpc_core::Mutex mu; |
|
||||||
|
|
||||||
// Tracks the current workers in the pollset. Protected by mu.
|
|
||||||
std::list<GrpcAppleWorker*> workers; |
|
||||||
|
|
||||||
// Whether the pollset is shut down. Protected by mu.
|
|
||||||
bool is_shutdown = false; |
|
||||||
|
|
||||||
// Closure to call when shutdown is done. Protected by mu.
|
|
||||||
grpc_closure* shutdown_closure; |
|
||||||
|
|
||||||
// Whether there's an outstanding kick that was not processed. Protected by
|
|
||||||
// mu.
|
|
||||||
bool kicked_without_poller = false; |
|
||||||
}; |
|
||||||
|
|
||||||
static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr; |
|
||||||
static grpc_core::Thread* gGlobalRunLoopThread = nullptr; |
|
||||||
|
|
||||||
/// Register the stream with the dispatch queue. Callbacks of the stream will be
|
|
||||||
/// issued to the dispatch queue when a network event happens and will be
|
|
||||||
/// managed by Grand Central Dispatch.
|
|
||||||
static void grpc_apple_register_read_stream_queue( |
|
||||||
CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { |
|
||||||
CFReadStreamSetDispatchQueue(read_stream, dispatch_queue); |
|
||||||
} |
|
||||||
|
|
||||||
/// Register the stream with the dispatch queue. Callbacks of the stream will be
|
|
||||||
/// issued to the dispatch queue when a network event happens and will be
|
|
||||||
/// managed by Grand Central Dispatch.
|
|
||||||
static void grpc_apple_register_write_stream_queue( |
|
||||||
CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { |
|
||||||
CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue); |
|
||||||
} |
|
||||||
|
|
||||||
/// Register the stream with the global run loop. Callbacks of the stream will
|
|
||||||
/// be issued to the run loop when a network event happens and will be driven by
|
|
||||||
/// the global run loop thread gGlobalRunLoopThread.
|
|
||||||
static void grpc_apple_register_read_stream_run_loop( |
|
||||||
CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { |
|
||||||
GRPC_POLLING_TRACE("Register read stream: %p", read_stream); |
|
||||||
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); |
|
||||||
CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop, |
|
||||||
kCFRunLoopDefaultMode); |
|
||||||
gGlobalRunLoopContext->input_source_registered = true; |
|
||||||
gGlobalRunLoopContext->input_source_cv.Signal(); |
|
||||||
} |
|
||||||
|
|
||||||
/// Register the stream with the global run loop. Callbacks of the stream will
|
|
||||||
/// be issued to the run loop when a network event happens, and will be driven
|
|
||||||
/// by the global run loop thread gGlobalRunLoopThread.
|
|
||||||
static void grpc_apple_register_write_stream_run_loop( |
|
||||||
CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { |
|
||||||
GRPC_POLLING_TRACE("Register write stream: %p", write_stream); |
|
||||||
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); |
|
||||||
CFWriteStreamScheduleWithRunLoop( |
|
||||||
write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode); |
|
||||||
gGlobalRunLoopContext->input_source_registered = true; |
|
||||||
gGlobalRunLoopContext->input_source_cv.Signal(); |
|
||||||
} |
|
||||||
|
|
||||||
/// The default implementation of stream registration is to register the stream
|
|
||||||
/// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
|
|
||||||
/// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
|
|
||||||
/// CFStream streams are registered with the global run loop instead (see
|
|
||||||
/// pollset_global_init below).
|
|
||||||
static void (*grpc_apple_register_read_stream_impl)( |
|
||||||
CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue; |
|
||||||
static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef, |
|
||||||
dispatch_queue_t) = |
|
||||||
grpc_apple_register_write_stream_queue; |
|
||||||
|
|
||||||
void grpc_apple_register_read_stream(CFReadStreamRef read_stream, |
|
||||||
dispatch_queue_t dispatch_queue) { |
|
||||||
grpc_apple_register_read_stream_impl(read_stream, dispatch_queue); |
|
||||||
} |
|
||||||
|
|
||||||
void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, |
|
||||||
dispatch_queue_t dispatch_queue) { |
|
||||||
grpc_apple_register_write_stream_impl(write_stream, dispatch_queue); |
|
||||||
} |
|
||||||
|
|
||||||
/// Drive the run loop in a global singleton thread until the global run loop is
|
|
||||||
/// shutdown.
|
|
||||||
static void GlobalRunLoopFunc(void* arg) { |
|
||||||
grpc_core::ReleasableMutexLock lock(&gGlobalRunLoopContext->mu); |
|
||||||
gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent(); |
|
||||||
gGlobalRunLoopContext->init_cv.Signal(); |
|
||||||
|
|
||||||
while (!gGlobalRunLoopContext->is_shutdown) { |
|
||||||
// CFRunLoopRun() will return immediately if no stream is registered on it.
|
|
||||||
// So we wait on a conditional variable until a stream is registered;
|
|
||||||
// otherwise we'll be running a spinning loop.
|
|
||||||
while (!gGlobalRunLoopContext->input_source_registered) { |
|
||||||
gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu); |
|
||||||
} |
|
||||||
gGlobalRunLoopContext->input_source_registered = false; |
|
||||||
lock.Unlock(); |
|
||||||
CFRunLoopRun(); |
|
||||||
lock.Lock(); |
|
||||||
} |
|
||||||
lock.Unlock(); |
|
||||||
} |
|
||||||
|
|
||||||
// pollset implementation
|
|
||||||
|
|
||||||
static void pollset_global_init(void) { |
|
||||||
gGlobalRunLoopContext = new GlobalRunLoopContext; |
|
||||||
|
|
||||||
grpc_apple_register_read_stream_impl = |
|
||||||
grpc_apple_register_read_stream_run_loop; |
|
||||||
grpc_apple_register_write_stream_impl = |
|
||||||
grpc_apple_register_write_stream_run_loop; |
|
||||||
|
|
||||||
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); |
|
||||||
gGlobalRunLoopThread = |
|
||||||
new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr); |
|
||||||
gGlobalRunLoopThread->Start(); |
|
||||||
while (gGlobalRunLoopContext->run_loop == NULL) |
|
||||||
gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu); |
|
||||||
} |
|
||||||
|
|
||||||
static void pollset_global_shutdown(void) { |
|
||||||
{ |
|
||||||
grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); |
|
||||||
gGlobalRunLoopContext->is_shutdown = true; |
|
||||||
CFRunLoopStop(gGlobalRunLoopContext->run_loop); |
|
||||||
} |
|
||||||
gGlobalRunLoopThread->Join(); |
|
||||||
delete gGlobalRunLoopThread; |
|
||||||
delete gGlobalRunLoopContext; |
|
||||||
} |
|
||||||
|
|
||||||
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
|
|
||||||
/// function. The lock may be temporarily released when waiting on the condition
|
|
||||||
/// variable but will be re-acquired before the function returns.
|
|
||||||
///
|
|
||||||
/// The Apple pollset simply waits on a condition variable until it is kicked.
|
|
||||||
/// The network events are handled in the global run loop thread. Processing of
|
|
||||||
/// these events will eventually trigger the kick.
|
|
||||||
static grpc_error* pollset_work(grpc_pollset* pollset, |
|
||||||
grpc_pollset_worker** worker, |
|
||||||
grpc_millis deadline) { |
|
||||||
GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64, |
|
||||||
pollset, worker, deadline); |
|
||||||
GrpcApplePollset* apple_pollset = |
|
||||||
reinterpret_cast<GrpcApplePollset*>(pollset); |
|
||||||
GrpcAppleWorker actual_worker; |
|
||||||
if (worker) { |
|
||||||
*worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker); |
|
||||||
} |
|
||||||
|
|
||||||
if (apple_pollset->kicked_without_poller) { |
|
||||||
// Process the outstanding kick and reset the flag. Do not block.
|
|
||||||
apple_pollset->kicked_without_poller = false; |
|
||||||
} else { |
|
||||||
// Block until kicked, timed out, or the pollset shuts down.
|
|
||||||
apple_pollset->workers.push_front(&actual_worker); |
|
||||||
auto it = apple_pollset->workers.begin(); |
|
||||||
|
|
||||||
while (!actual_worker.kicked && !apple_pollset->is_shutdown) { |
|
||||||
if (actual_worker.cv.Wait( |
|
||||||
&apple_pollset->mu, |
|
||||||
grpc_millis_to_timespec(deadline, GPR_CLOCK_REALTIME))) { |
|
||||||
// timed out
|
|
||||||
break; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
apple_pollset->workers.erase(it); |
|
||||||
|
|
||||||
// If the pollset is shut down asynchronously and this is the last pending
|
|
||||||
// worker, the shutdown process is complete at this moment and the shutdown
|
|
||||||
// callback will be called.
|
|
||||||
if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) { |
|
||||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure, |
|
||||||
GRPC_ERROR_NONE); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
return GRPC_ERROR_NONE; |
|
||||||
} |
|
||||||
|
|
||||||
/// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
|
|
||||||
/// before calling this function.
|
|
||||||
static void kick_worker(GrpcAppleWorker* worker) { |
|
||||||
worker->kicked = true; |
|
||||||
worker->cv.Signal(); |
|
||||||
} |
|
||||||
|
|
||||||
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
|
|
||||||
/// function. The kick action simply signals the condition variable of the
|
|
||||||
/// worker.
|
|
||||||
static grpc_error* pollset_kick(grpc_pollset* pollset, |
|
||||||
grpc_pollset_worker* specific_worker) { |
|
||||||
GrpcApplePollset* apple_pollset = |
|
||||||
reinterpret_cast<GrpcApplePollset*>(pollset); |
|
||||||
|
|
||||||
GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker); |
|
||||||
|
|
||||||
if (specific_worker == nullptr) { |
|
||||||
if (apple_pollset->workers.empty()) { |
|
||||||
apple_pollset->kicked_without_poller = true; |
|
||||||
} else { |
|
||||||
GrpcAppleWorker* actual_worker = apple_pollset->workers.front(); |
|
||||||
kick_worker(actual_worker); |
|
||||||
} |
|
||||||
} else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { |
|
||||||
for (auto& actual_worker : apple_pollset->workers) { |
|
||||||
kick_worker(actual_worker); |
|
||||||
} |
|
||||||
} else { |
|
||||||
GrpcAppleWorker* actual_worker = |
|
||||||
reinterpret_cast<GrpcAppleWorker*>(specific_worker); |
|
||||||
kick_worker(actual_worker); |
|
||||||
} |
|
||||||
|
|
||||||
return GRPC_ERROR_NONE; |
|
||||||
} |
|
||||||
|
|
||||||
static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { |
|
||||||
GRPC_POLLING_TRACE("pollset init: %p", pollset); |
|
||||||
GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset(); |
|
||||||
*mu = apple_pollset->mu.get(); |
|
||||||
} |
|
||||||
|
|
||||||
/// The caller must acquire the lock GrpcApplePollset.mu before calling this
|
|
||||||
/// function.
|
|
||||||
static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { |
|
||||||
GRPC_POLLING_TRACE("pollset shutdown: %p", pollset); |
|
||||||
|
|
||||||
GrpcApplePollset* apple_pollset = |
|
||||||
reinterpret_cast<GrpcApplePollset*>(pollset); |
|
||||||
apple_pollset->is_shutdown = true; |
|
||||||
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); |
|
||||||
|
|
||||||
// If there is any worker blocked, shutdown will be done asynchronously.
|
|
||||||
if (apple_pollset->workers.empty()) { |
|
||||||
grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); |
|
||||||
} else { |
|
||||||
apple_pollset->shutdown_closure = closure; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
static void pollset_destroy(grpc_pollset* pollset) { |
|
||||||
GRPC_POLLING_TRACE("pollset destroy: %p", pollset); |
|
||||||
GrpcApplePollset* apple_pollset = |
|
||||||
reinterpret_cast<GrpcApplePollset*>(pollset); |
|
||||||
apple_pollset->~GrpcApplePollset(); |
|
||||||
} |
|
||||||
|
|
||||||
size_t pollset_size(void) { return sizeof(GrpcApplePollset); } |
|
||||||
|
|
||||||
grpc_pollset_vtable grpc_apple_pollset_vtable = { |
|
||||||
pollset_global_init, pollset_global_shutdown, |
|
||||||
pollset_init, pollset_shutdown, |
|
||||||
pollset_destroy, pollset_work, |
|
||||||
pollset_kick, pollset_size}; |
|
||||||
|
|
||||||
// pollset_set implementation
|
|
||||||
|
|
||||||
grpc_pollset_set* pollset_set_create(void) { return nullptr; } |
|
||||||
void pollset_set_destroy(grpc_pollset_set* pollset_set) {} |
|
||||||
void pollset_set_add_pollset(grpc_pollset_set* pollset_set, |
|
||||||
grpc_pollset* pollset) {} |
|
||||||
void pollset_set_del_pollset(grpc_pollset_set* pollset_set, |
|
||||||
grpc_pollset* pollset) {} |
|
||||||
void pollset_set_add_pollset_set(grpc_pollset_set* bag, |
|
||||||
grpc_pollset_set* item) {} |
|
||||||
void pollset_set_del_pollset_set(grpc_pollset_set* bag, |
|
||||||
grpc_pollset_set* item) {} |
|
||||||
|
|
||||||
grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = { |
|
||||||
pollset_set_create, pollset_set_destroy, |
|
||||||
pollset_set_add_pollset, pollset_set_del_pollset, |
|
||||||
pollset_set_add_pollset_set, pollset_set_del_pollset_set}; |
|
||||||
|
|
||||||
#endif |
|
@ -1,43 +0,0 @@ |
|||||||
/*
|
|
||||||
* |
|
||||||
* Copyright 2020 gRPC authors. |
|
||||||
* |
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
* you may not use this file except in compliance with the License. |
|
||||||
* You may obtain a copy of the License at |
|
||||||
* |
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* |
|
||||||
* Unless required by applicable law or agreed to in writing, software |
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
* See the License for the specific language governing permissions and |
|
||||||
* limitations under the License. |
|
||||||
* |
|
||||||
*/ |
|
||||||
|
|
||||||
#ifndef GRPC_CORE_LIB_IOMGR_EV_APPLE_H |
|
||||||
#define GRPC_CORE_LIB_IOMGR_EV_APPLE_H |
|
||||||
|
|
||||||
#include <grpc/support/port_platform.h> |
|
||||||
|
|
||||||
#ifdef GRPC_APPLE_EV |
|
||||||
|
|
||||||
#include <CoreFoundation/CoreFoundation.h> |
|
||||||
|
|
||||||
#include "src/core/lib/iomgr/pollset.h" |
|
||||||
#include "src/core/lib/iomgr/pollset_set.h" |
|
||||||
|
|
||||||
void grpc_apple_register_read_stream(CFReadStreamRef read_stream, |
|
||||||
dispatch_queue_t dispatch_queue); |
|
||||||
|
|
||||||
void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, |
|
||||||
dispatch_queue_t dispatch_queue); |
|
||||||
|
|
||||||
extern grpc_pollset_vtable grpc_apple_pollset_vtable; |
|
||||||
|
|
||||||
extern grpc_pollset_set_vtable grpc_apple_pollset_set_vtable; |
|
||||||
|
|
||||||
#endif |
|
||||||
|
|
||||||
#endif |
|
@ -1,57 +0,0 @@ |
|||||||
# Copyright 2020 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
||||||
"""The AsyncIO version of the reflection servicer.""" |
|
||||||
|
|
||||||
from typing import AsyncIterable |
|
||||||
|
|
||||||
import grpc |
|
||||||
|
|
||||||
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2 |
|
||||||
from grpc_reflection.v1alpha._base import BaseReflectionServicer |
|
||||||
|
|
||||||
|
|
||||||
class ReflectionServicer(BaseReflectionServicer): |
|
||||||
"""Servicer handling RPCs for service statuses.""" |
|
||||||
|
|
||||||
async def ServerReflectionInfo( |
|
||||||
self, request_iterator: AsyncIterable[ |
|
||||||
_reflection_pb2.ServerReflectionRequest], unused_context |
|
||||||
) -> AsyncIterable[_reflection_pb2.ServerReflectionResponse]: |
|
||||||
async for request in request_iterator: |
|
||||||
if request.HasField('file_by_filename'): |
|
||||||
yield self._file_by_filename(request.file_by_filename) |
|
||||||
elif request.HasField('file_containing_symbol'): |
|
||||||
yield self._file_containing_symbol( |
|
||||||
request.file_containing_symbol) |
|
||||||
elif request.HasField('file_containing_extension'): |
|
||||||
yield self._file_containing_extension( |
|
||||||
request.file_containing_extension.containing_type, |
|
||||||
request.file_containing_extension.extension_number) |
|
||||||
elif request.HasField('all_extension_numbers_of_type'): |
|
||||||
yield self._all_extension_numbers_of_type( |
|
||||||
request.all_extension_numbers_of_type) |
|
||||||
elif request.HasField('list_services'): |
|
||||||
yield self._list_services() |
|
||||||
else: |
|
||||||
yield _reflection_pb2.ServerReflectionResponse( |
|
||||||
error_response=_reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.INVALID_ARGUMENT.value[0], |
|
||||||
error_message=grpc.StatusCode.INVALID_ARGUMENT.value[1]. |
|
||||||
encode(), |
|
||||||
)) |
|
||||||
|
|
||||||
|
|
||||||
__all__ = [ |
|
||||||
"ReflectionServicer", |
|
||||||
] |
|
@ -1,110 +0,0 @@ |
|||||||
# Copyright 2020 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
||||||
"""Base implementation of reflection servicer.""" |
|
||||||
|
|
||||||
import grpc |
|
||||||
from google.protobuf import descriptor_pb2 |
|
||||||
from google.protobuf import descriptor_pool |
|
||||||
|
|
||||||
from grpc_reflection.v1alpha import reflection_pb2 as _reflection_pb2 |
|
||||||
from grpc_reflection.v1alpha import reflection_pb2_grpc as _reflection_pb2_grpc |
|
||||||
|
|
||||||
_POOL = descriptor_pool.Default() |
|
||||||
|
|
||||||
|
|
||||||
def _not_found_error(): |
|
||||||
return _reflection_pb2.ServerReflectionResponse( |
|
||||||
error_response=_reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.NOT_FOUND.value[0], |
|
||||||
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), |
|
||||||
)) |
|
||||||
|
|
||||||
|
|
||||||
def _file_descriptor_response(descriptor): |
|
||||||
proto = descriptor_pb2.FileDescriptorProto() |
|
||||||
descriptor.CopyToProto(proto) |
|
||||||
serialized_proto = proto.SerializeToString() |
|
||||||
return _reflection_pb2.ServerReflectionResponse( |
|
||||||
file_descriptor_response=_reflection_pb2.FileDescriptorResponse( |
|
||||||
file_descriptor_proto=(serialized_proto,)),) |
|
||||||
|
|
||||||
|
|
||||||
class BaseReflectionServicer(_reflection_pb2_grpc.ServerReflectionServicer): |
|
||||||
"""Base class for reflection servicer.""" |
|
||||||
|
|
||||||
def __init__(self, service_names, pool=None): |
|
||||||
"""Constructor. |
|
||||||
|
|
||||||
Args: |
|
||||||
service_names: Iterable of fully-qualified service names available. |
|
||||||
pool: An optional DescriptorPool instance. |
|
||||||
""" |
|
||||||
self._service_names = tuple(sorted(service_names)) |
|
||||||
self._pool = _POOL if pool is None else pool |
|
||||||
|
|
||||||
def _file_by_filename(self, filename): |
|
||||||
try: |
|
||||||
descriptor = self._pool.FindFileByName(filename) |
|
||||||
except KeyError: |
|
||||||
return _not_found_error() |
|
||||||
else: |
|
||||||
return _file_descriptor_response(descriptor) |
|
||||||
|
|
||||||
def _file_containing_symbol(self, fully_qualified_name): |
|
||||||
try: |
|
||||||
descriptor = self._pool.FindFileContainingSymbol( |
|
||||||
fully_qualified_name) |
|
||||||
except KeyError: |
|
||||||
return _not_found_error() |
|
||||||
else: |
|
||||||
return _file_descriptor_response(descriptor) |
|
||||||
|
|
||||||
def _file_containing_extension(self, containing_type, extension_number): |
|
||||||
try: |
|
||||||
message_descriptor = self._pool.FindMessageTypeByName( |
|
||||||
containing_type) |
|
||||||
extension_descriptor = self._pool.FindExtensionByNumber( |
|
||||||
message_descriptor, extension_number) |
|
||||||
descriptor = self._pool.FindFileContainingSymbol( |
|
||||||
extension_descriptor.full_name) |
|
||||||
except KeyError: |
|
||||||
return _not_found_error() |
|
||||||
else: |
|
||||||
return _file_descriptor_response(descriptor) |
|
||||||
|
|
||||||
def _all_extension_numbers_of_type(self, containing_type): |
|
||||||
try: |
|
||||||
message_descriptor = self._pool.FindMessageTypeByName( |
|
||||||
containing_type) |
|
||||||
extension_numbers = tuple( |
|
||||||
sorted(extension.number for extension in |
|
||||||
self._pool.FindAllExtensions(message_descriptor))) |
|
||||||
except KeyError: |
|
||||||
return _not_found_error() |
|
||||||
else: |
|
||||||
return _reflection_pb2.ServerReflectionResponse( |
|
||||||
all_extension_numbers_response=_reflection_pb2. |
|
||||||
ExtensionNumberResponse( |
|
||||||
base_type_name=message_descriptor.full_name, |
|
||||||
extension_number=extension_numbers)) |
|
||||||
|
|
||||||
def _list_services(self): |
|
||||||
return _reflection_pb2.ServerReflectionResponse( |
|
||||||
list_services_response=_reflection_pb2.ListServiceResponse(service=[ |
|
||||||
_reflection_pb2.ServiceResponse(name=service_name) |
|
||||||
for service_name in self._service_names |
|
||||||
])) |
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['BaseReflectionServicer'] |
|
@ -1,30 +0,0 @@ |
|||||||
# Copyright 2020 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
||||||
|
|
||||||
package(default_testonly = 1) |
|
||||||
|
|
||||||
py_test( |
|
||||||
name = "reflection_servicer_test", |
|
||||||
srcs = ["reflection_servicer_test.py"], |
|
||||||
imports = ["../../"], |
|
||||||
python_version = "PY3", |
|
||||||
deps = [ |
|
||||||
"//src/proto/grpc/testing:empty_py_pb2", |
|
||||||
"//src/proto/grpc/testing/proto2:empty2_extensions_proto", |
|
||||||
"//src/proto/grpc/testing/proto2:empty2_proto", |
|
||||||
"//src/python/grpcio/grpc:grpcio", |
|
||||||
"//src/python/grpcio_reflection/grpc_reflection/v1alpha:grpc_reflection", |
|
||||||
"//src/python/grpcio_tests/tests_aio/unit:_test_base", |
|
||||||
], |
|
||||||
) |
|
@ -1,13 +0,0 @@ |
|||||||
# Copyright 2016 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
@ -1,193 +0,0 @@ |
|||||||
# Copyright 2016 gRPC authors. |
|
||||||
# |
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
||||||
# you may not use this file except in compliance with the License. |
|
||||||
# You may obtain a copy of the License at |
|
||||||
# |
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0 |
|
||||||
# |
|
||||||
# Unless required by applicable law or agreed to in writing, software |
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, |
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
||||||
# See the License for the specific language governing permissions and |
|
||||||
# limitations under the License. |
|
||||||
"""Tests of grpc_reflection.v1alpha.reflection.""" |
|
||||||
|
|
||||||
import logging |
|
||||||
import unittest |
|
||||||
|
|
||||||
import grpc |
|
||||||
from google.protobuf import descriptor_pb2, descriptor_pool |
|
||||||
from grpc.experimental import aio |
|
||||||
|
|
||||||
from grpc_reflection.v1alpha import (reflection, reflection_pb2, |
|
||||||
reflection_pb2_grpc) |
|
||||||
from src.proto.grpc.testing import empty_pb2 |
|
||||||
from src.proto.grpc.testing.proto2 import empty2_extensions_pb2 |
|
||||||
from tests_aio.unit._test_base import AioTestBase |
|
||||||
|
|
||||||
_EMPTY_PROTO_FILE_NAME = 'src/proto/grpc/testing/empty.proto' |
|
||||||
_EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty' |
|
||||||
_SERVICE_NAMES = ('Angstrom', 'Bohr', 'Curie', 'Dyson', 'Einstein', 'Feynman', |
|
||||||
'Galilei') |
|
||||||
_EMPTY_EXTENSIONS_SYMBOL_NAME = 'grpc.testing.proto2.EmptyWithExtensions' |
|
||||||
_EMPTY_EXTENSIONS_NUMBERS = ( |
|
||||||
124, |
|
||||||
125, |
|
||||||
126, |
|
||||||
127, |
|
||||||
128, |
|
||||||
) |
|
||||||
|
|
||||||
|
|
||||||
def _file_descriptor_to_proto(descriptor): |
|
||||||
proto = descriptor_pb2.FileDescriptorProto() |
|
||||||
descriptor.CopyToProto(proto) |
|
||||||
return proto.SerializeToString() |
|
||||||
|
|
||||||
|
|
||||||
class ReflectionServicerTest(AioTestBase): |
|
||||||
|
|
||||||
async def setUp(self): |
|
||||||
self._server = aio.server() |
|
||||||
reflection.enable_server_reflection(_SERVICE_NAMES, self._server) |
|
||||||
port = self._server.add_insecure_port('[::]:0') |
|
||||||
await self._server.start() |
|
||||||
|
|
||||||
self._channel = aio.insecure_channel('localhost:%d' % port) |
|
||||||
self._stub = reflection_pb2_grpc.ServerReflectionStub(self._channel) |
|
||||||
|
|
||||||
async def tearDown(self): |
|
||||||
await self._server.stop(None) |
|
||||||
await self._channel.close() |
|
||||||
|
|
||||||
async def test_file_by_name(self): |
|
||||||
requests = ( |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_by_filename=_EMPTY_PROTO_FILE_NAME), |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_by_filename='i-donut-exist'), |
|
||||||
) |
|
||||||
responses = [] |
|
||||||
async for response in self._stub.ServerReflectionInfo(iter(requests)): |
|
||||||
responses.append(response) |
|
||||||
expected_responses = ( |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
file_descriptor_response=reflection_pb2.FileDescriptorResponse( |
|
||||||
file_descriptor_proto=( |
|
||||||
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),))), |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
error_response=reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.NOT_FOUND.value[0], |
|
||||||
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), |
|
||||||
)), |
|
||||||
) |
|
||||||
self.assertSequenceEqual(expected_responses, responses) |
|
||||||
|
|
||||||
async def test_file_by_symbol(self): |
|
||||||
requests = ( |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_containing_symbol=_EMPTY_PROTO_SYMBOL_NAME), |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_containing_symbol='i.donut.exist.co.uk.org.net.me.name.foo' |
|
||||||
), |
|
||||||
) |
|
||||||
responses = [] |
|
||||||
async for response in self._stub.ServerReflectionInfo(iter(requests)): |
|
||||||
responses.append(response) |
|
||||||
expected_responses = ( |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
file_descriptor_response=reflection_pb2.FileDescriptorResponse( |
|
||||||
file_descriptor_proto=( |
|
||||||
_file_descriptor_to_proto(empty_pb2.DESCRIPTOR),))), |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
error_response=reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.NOT_FOUND.value[0], |
|
||||||
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), |
|
||||||
)), |
|
||||||
) |
|
||||||
self.assertSequenceEqual(expected_responses, responses) |
|
||||||
|
|
||||||
async def test_file_containing_extension(self): |
|
||||||
requests = ( |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_containing_extension=reflection_pb2.ExtensionRequest( |
|
||||||
containing_type=_EMPTY_EXTENSIONS_SYMBOL_NAME, |
|
||||||
extension_number=125, |
|
||||||
),), |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
file_containing_extension=reflection_pb2.ExtensionRequest( |
|
||||||
containing_type='i.donut.exist.co.uk.org.net.me.name.foo', |
|
||||||
extension_number=55, |
|
||||||
),), |
|
||||||
) |
|
||||||
responses = [] |
|
||||||
async for response in self._stub.ServerReflectionInfo(iter(requests)): |
|
||||||
responses.append(response) |
|
||||||
expected_responses = ( |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
file_descriptor_response=reflection_pb2.FileDescriptorResponse( |
|
||||||
file_descriptor_proto=(_file_descriptor_to_proto( |
|
||||||
empty2_extensions_pb2.DESCRIPTOR),))), |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
error_response=reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.NOT_FOUND.value[0], |
|
||||||
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), |
|
||||||
)), |
|
||||||
) |
|
||||||
self.assertSequenceEqual(expected_responses, responses) |
|
||||||
|
|
||||||
async def test_extension_numbers_of_type(self): |
|
||||||
requests = ( |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
all_extension_numbers_of_type=_EMPTY_EXTENSIONS_SYMBOL_NAME), |
|
||||||
reflection_pb2.ServerReflectionRequest( |
|
||||||
all_extension_numbers_of_type='i.donut.exist.co.uk.net.name.foo' |
|
||||||
), |
|
||||||
) |
|
||||||
responses = [] |
|
||||||
async for response in self._stub.ServerReflectionInfo(iter(requests)): |
|
||||||
responses.append(response) |
|
||||||
expected_responses = ( |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
all_extension_numbers_response=reflection_pb2. |
|
||||||
ExtensionNumberResponse( |
|
||||||
base_type_name=_EMPTY_EXTENSIONS_SYMBOL_NAME, |
|
||||||
extension_number=_EMPTY_EXTENSIONS_NUMBERS)), |
|
||||||
reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
error_response=reflection_pb2.ErrorResponse( |
|
||||||
error_code=grpc.StatusCode.NOT_FOUND.value[0], |
|
||||||
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(), |
|
||||||
)), |
|
||||||
) |
|
||||||
self.assertSequenceEqual(expected_responses, responses) |
|
||||||
|
|
||||||
async def test_list_services(self): |
|
||||||
requests = (reflection_pb2.ServerReflectionRequest(list_services='',),) |
|
||||||
responses = [] |
|
||||||
async for response in self._stub.ServerReflectionInfo(iter(requests)): |
|
||||||
responses.append(response) |
|
||||||
expected_responses = (reflection_pb2.ServerReflectionResponse( |
|
||||||
valid_host='', |
|
||||||
list_services_response=reflection_pb2.ListServiceResponse( |
|
||||||
service=tuple( |
|
||||||
reflection_pb2.ServiceResponse(name=name) |
|
||||||
for name in _SERVICE_NAMES))),) |
|
||||||
self.assertSequenceEqual(expected_responses, responses) |
|
||||||
|
|
||||||
def test_reflection_service_name(self): |
|
||||||
self.assertEqual(reflection.SERVICE_NAME, |
|
||||||
'grpc.reflection.v1alpha.ServerReflection') |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
logging.basicConfig(level=logging.DEBUG) |
|
||||||
unittest.main(verbosity=2) |
|
Loading…
Reference in new issue