Merge branch 'cg-server-call-tracer' into cg-deadline

pull/35256/head
Craig Tiller 1 year ago
commit 8f7b0bb6c2
  1. 12
      doc/service_config.md
  2. 2
      gRPC.podspec
  3. 1
      requirements.bazel.txt
  4. 167
      src/core/lib/surface/server.cc
  5. 68
      src/core/lib/surface/server.h
  6. 23
      src/objective-c/PrivacyInfo.xcprivacy
  7. 2
      templates/gRPC.podspec.template

@ -62,12 +62,12 @@ DNS](https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md).
Here is an example service config in protobuf form: Here is an example service config in protobuf form:
``` ```textproto
{ {
// Use round_robin LB policy. # Use round_robin LB policy.
load_balancing_config: { round_robin: {} } load_balancing_config: { round_robin: {} }
// This method config applies to method "foo/bar" and to all methods # This method config applies to method "foo/bar" and to all methods
// of service "baz". # of service "baz".
method_config: { method_config: {
name: { name: {
service: "foo" service: "foo"
@ -76,7 +76,7 @@ Here is an example service config in protobuf form:
name: { name: {
service: "baz" service: "baz"
} }
// Default timeout for matching methods. # Default timeout for matching methods.
timeout: { timeout: {
seconds: 1 seconds: 1
nanos: 1 nanos: 1
@ -87,7 +87,7 @@ Here is an example service config in protobuf form:
Here is the same example service config in JSON form: Here is the same example service config in JSON form:
``` ```json
{ {
"loadBalancingConfig": [ { "round_robin": {} } ], "loadBalancingConfig": [ { "round_robin": {} } ],
"methodConfig": [ "methodConfig": [

2
gRPC.podspec generated

@ -32,6 +32,8 @@ Pod::Spec.new do |s|
:tag => "v#{version}", :tag => "v#{version}",
} }
s.resource = 'src/objective-c/PrivacyInfo.xcprivacy'
name = 'GRPCClient' name = 'GRPCClient'
s.module_name = name s.module_name = name
s.header_dir = name s.header_dir = name

@ -16,3 +16,4 @@ xds-protos==0.0.11
opencensus==0.10.0 opencensus==0.10.0
opencensus-ext-stackdriver==0.8.0 opencensus-ext-stackdriver==0.8.0
absl-py==1.4.0 absl-py==1.4.0
googleapis-common-protos==1.61.0

@ -839,9 +839,9 @@ void Server::Start() {
if (unregistered_request_matcher_ == nullptr) { if (unregistered_request_matcher_ == nullptr) {
unregistered_request_matcher_ = make_real_request_matcher(); unregistered_request_matcher_ = make_real_request_matcher();
} }
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { for (auto& rm : registered_methods_) {
if (rm->matcher == nullptr) { if (rm.second->matcher == nullptr) {
rm->matcher = make_real_request_matcher(); rm.second->matcher = make_real_request_matcher();
} }
} }
{ {
@ -928,20 +928,11 @@ void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
cqs_.push_back(cq); cqs_.push_back(cq);
} }
namespace {
bool streq(const std::string& a, const char* b) {
return (a.empty() && b == nullptr) ||
((b != nullptr) && !strcmp(a.c_str(), b));
}
} // namespace
Server::RegisteredMethod* Server::RegisterMethod( Server::RegisteredMethod* Server::RegisterMethod(
const char* method, const char* host, const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling, grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) { uint32_t flags) {
if (IsRegisteredMethodsMapEnabled() && started_) { if (started_) {
Crash("Attempting to register method after server started"); Crash("Attempting to register method after server started");
} }
@ -950,21 +941,21 @@ Server::RegisteredMethod* Server::RegisterMethod(
"grpc_server_register_method method string cannot be NULL"); "grpc_server_register_method method string cannot be NULL");
return nullptr; return nullptr;
} }
for (std::unique_ptr<RegisteredMethod>& m : registered_methods_) { auto key = std::make_pair(host ? host : "", method);
if (streq(m->method, method) && streq(m->host, host)) { if (registered_methods_.find(key) != registered_methods_.end()) {
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
host ? host : "*"); host ? host : "*");
return nullptr; return nullptr;
}
} }
if (flags != 0) { if (flags != 0) {
gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
flags); flags);
return nullptr; return nullptr;
} }
registered_methods_.emplace_back(std::make_unique<RegisteredMethod>( auto it = registered_methods_.emplace(
method, host, payload_handling, flags)); key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
return registered_methods_.back().get(); flags));
return it.first->second.get();
} }
void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) { void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
@ -1015,9 +1006,9 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) {
if (started_) { if (started_) {
unregistered_request_matcher_->KillRequests(error); unregistered_request_matcher_->KillRequests(error);
unregistered_request_matcher_->ZombifyPending(); unregistered_request_matcher_->ZombifyPending();
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { for (auto& rm : registered_methods_) {
rm->matcher->KillRequests(error); rm.second->matcher->KillRequests(error);
rm->matcher->ZombifyPending(); rm.second->matcher->ZombifyPending();
} }
} }
} }
@ -1252,7 +1243,6 @@ class Server::ChannelData::ConnectivityWatcher
// //
Server::ChannelData::~ChannelData() { Server::ChannelData::~ChannelData() {
old_registered_methods_.reset();
if (server_ != nullptr) { if (server_ != nullptr) {
if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) { if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_); server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
@ -1276,50 +1266,6 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
channel_ = channel; channel_ = channel;
cq_idx_ = cq_idx; cq_idx_ = cq_idx;
channelz_socket_uuid_ = channelz_socket_uuid; channelz_socket_uuid_ = channelz_socket_uuid;
// Build a lookup table phrased in terms of mdstr's in this channels context
// to quickly find registered methods.
size_t num_registered_methods = server_->registered_methods_.size();
if (!IsRegisteredMethodsMapEnabled() && num_registered_methods > 0) {
uint32_t max_probes = 0;
size_t slots = 2 * num_registered_methods;
old_registered_methods_ =
std::make_unique<std::vector<ChannelRegisteredMethod>>(slots);
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) {
Slice host;
Slice method = Slice::FromExternalString(rm->method);
const bool has_host = !rm->host.empty();
if (has_host) {
host = Slice::FromExternalString(rm->host);
}
uint32_t hash = MixHash32(has_host ? host.Hash() : 0, method.Hash());
uint32_t probes = 0;
for (probes = 0; (*old_registered_methods_)[(hash + probes) % slots]
.server_registered_method != nullptr;
probes++) {
}
if (probes > max_probes) max_probes = probes;
ChannelRegisteredMethod* crm =
&(*old_registered_methods_)[(hash + probes) % slots];
crm->server_registered_method = rm.get();
crm->flags = rm->flags;
crm->has_host = has_host;
if (has_host) {
crm->host = std::move(host);
}
crm->method = std::move(method);
}
GPR_ASSERT(slots <= UINT32_MAX);
registered_method_max_probes_ = max_probes;
} else if (IsRegisteredMethodsMapEnabled()) {
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) {
auto key = std::make_pair(!rm->host.empty() ? rm->host : "", rm->method);
registered_methods_.emplace(
key, std::make_unique<ChannelRegisteredMethod>(
rm.get(), rm->flags, /*has_host=*/!rm->host.empty(),
Slice::FromExternalString(rm->method),
Slice::FromExternalString(rm->host)));
}
}
// Publish channel. // Publish channel.
{ {
MutexLock lock(&server_->mu_global_); MutexLock lock(&server_->mu_global_);
@ -1345,45 +1291,17 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
transport->PerformOp(op); transport->PerformOp(op);
} }
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod(
const grpc_slice& host, const grpc_slice& path) {
if (old_registered_methods_ == nullptr) return nullptr;
// TODO(ctiller): unify these two searches
// check for an exact match with host
uint32_t hash = MixHash32(grpc_slice_hash(host), grpc_slice_hash(path));
for (size_t i = 0; i <= registered_method_max_probes_; i++) {
ChannelRegisteredMethod* rm = &(
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()];
if (rm->server_registered_method == nullptr) break;
if (!rm->has_host) continue;
if (rm->host != host) continue;
if (rm->method != path) continue;
return rm;
}
// check for a wildcard method definition (no host set)
hash = MixHash32(0, grpc_slice_hash(path));
for (size_t i = 0; i <= registered_method_max_probes_; i++) {
ChannelRegisteredMethod* rm = &(
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()];
if (rm->server_registered_method == nullptr) break;
if (rm->has_host) continue;
if (rm->method != path) continue;
return rm;
}
return nullptr;
}
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
const absl::string_view& host, const absl::string_view& path) { const absl::string_view& host, const absl::string_view& path) {
if (registered_methods_.empty()) return nullptr; if (server_->registered_methods_.empty()) return nullptr;
// check for an exact match with host // check for an exact match with host
auto it = registered_methods_.find(std::make_pair(host, path)); auto it = server_->registered_methods_.find(std::make_pair(host, path));
if (it != registered_methods_.end()) { if (it != server_->registered_methods_.end()) {
return it->second.get(); return it->second.get();
} }
// check for wildcard method definition (no host set) // check for wildcard method definition (no host set)
it = registered_methods_.find(std::make_pair("", path)); it = server_->registered_methods_.find(std::make_pair("", path));
if (it != registered_methods_.end()) { if (it != server_->registered_methods_.end()) {
return it->second.get(); return it->second.get();
} }
return nullptr; return nullptr;
@ -1404,13 +1322,8 @@ void Server::ChannelData::SetRegisteredMethodOnMetadata(
// Path not being set would result in an RPC error. // Path not being set would result in an RPC error.
return; return;
} }
ChannelRegisteredMethod* method; RegisteredMethod* method =
if (!IsRegisteredMethodsMapEnabled()) { GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
method = GetRegisteredMethod(authority->c_slice(), path->c_slice());
} else {
method = GetRegisteredMethod(authority->as_string_view(),
path->as_string_view());
}
// insert in metadata // insert in metadata
metadata.Set(GrpcRegisteredMethod(), method); metadata.Set(GrpcRegisteredMethod(), method);
} }
@ -1481,24 +1394,20 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
Timestamp deadline = GetContext<CallContext>()->deadline(); Timestamp deadline = GetContext<CallContext>()->deadline();
// Find request matcher. // Find request matcher.
RequestMatcherInterface* matcher; RequestMatcherInterface* matcher;
ChannelRegisteredMethod* rm = nullptr; RegisteredMethod* rm = nullptr;
if (IsRegisteredMethodLookupInTransportEnabled()) { if (IsRegisteredMethodLookupInTransportEnabled()) {
rm = static_cast<ChannelRegisteredMethod*>( rm = static_cast<RegisteredMethod*>(
call_args.client_initial_metadata->get(GrpcRegisteredMethod()) call_args.client_initial_metadata->get(GrpcRegisteredMethod())
.value_or(nullptr)); .value_or(nullptr));
} else { } else {
if (!IsRegisteredMethodsMapEnabled()) { rm = chand->GetRegisteredMethod(host_ptr->as_string_view(),
rm = chand->GetRegisteredMethod(host_ptr->c_slice(), path->c_slice()); path->as_string_view());
} else {
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(),
path->as_string_view());
}
} }
ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>> ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>>
maybe_read_first_message([] { return NextResult<MessageHandle>(); }); maybe_read_first_message([] { return NextResult<MessageHandle>(); });
if (rm != nullptr) { if (rm != nullptr) {
matcher = rm->server_registered_method->matcher.get(); matcher = rm->matcher.get();
switch (rm->server_registered_method->payload_handling) { switch (rm->payload_handling) {
case GRPC_SRM_PAYLOAD_NONE: case GRPC_SRM_PAYLOAD_NONE:
break; break;
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER:
@ -1752,22 +1661,18 @@ void Server::CallData::StartNewRpc(grpc_call_element* elem) {
grpc_server_register_method_payload_handling payload_handling = grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE; GRPC_SRM_PAYLOAD_NONE;
if (path_.has_value() && host_.has_value()) { if (path_.has_value() && host_.has_value()) {
ChannelRegisteredMethod* rm; RegisteredMethod* rm;
if (IsRegisteredMethodLookupInTransportEnabled()) { if (IsRegisteredMethodLookupInTransportEnabled()) {
rm = static_cast<ChannelRegisteredMethod*>( rm = static_cast<RegisteredMethod*>(
recv_initial_metadata_->get(GrpcRegisteredMethod()) recv_initial_metadata_->get(GrpcRegisteredMethod())
.value_or(nullptr)); .value_or(nullptr));
} else { } else {
if (!IsRegisteredMethodsMapEnabled()) { rm = chand->GetRegisteredMethod(host_->as_string_view(),
rm = chand->GetRegisteredMethod(host_->c_slice(), path_->c_slice()); path_->as_string_view());
} else {
rm = chand->GetRegisteredMethod(host_->as_string_view(),
path_->as_string_view());
}
} }
if (rm != nullptr) { if (rm != nullptr) {
matcher_ = rm->server_registered_method->matcher.get(); matcher_ = rm->matcher.get();
payload_handling = rm->server_registered_method->payload_handling; payload_handling = rm->payload_handling;
} }
} }
// Start recv_message op if needed. // Start recv_message op if needed.

@ -211,26 +211,6 @@ class Server : public InternallyRefCounted<Server>,
private: private:
struct RequestedCall; struct RequestedCall;
struct ChannelRegisteredMethod {
ChannelRegisteredMethod() = default;
ChannelRegisteredMethod(RegisteredMethod* server_registered_method_arg,
uint32_t flags_arg, bool has_host_arg,
Slice method_arg, Slice host_arg)
: server_registered_method(server_registered_method_arg),
flags(flags_arg),
has_host(has_host_arg),
method(std::move(method_arg)),
host(std::move(host_arg)) {}
~ChannelRegisteredMethod() = default;
RegisteredMethod* server_registered_method = nullptr;
uint32_t flags;
bool has_host;
Slice method;
Slice host;
};
class RequestMatcherInterface; class RequestMatcherInterface;
class RealRequestMatcherFilterStack; class RealRequestMatcherFilterStack;
class RealRequestMatcherPromises; class RealRequestMatcherPromises;
@ -251,11 +231,8 @@ class Server : public InternallyRefCounted<Server>,
Channel* channel() const { return channel_.get(); } Channel* channel() const { return channel_.get(); }
size_t cq_idx() const { return cq_idx_; } size_t cq_idx() const { return cq_idx_; }
ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host, RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const grpc_slice& path); const absl::string_view& path);
ChannelRegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
// Filter vtable functions. // Filter vtable functions.
static grpc_error_handle InitChannelElement( static grpc_error_handle InitChannelElement(
grpc_channel_element* elem, grpc_channel_element_args* args); grpc_channel_element* elem, grpc_channel_element_args* args);
@ -274,36 +251,12 @@ class Server : public InternallyRefCounted<Server>,
static void FinishDestroy(void* arg, grpc_error_handle error); static void FinishDestroy(void* arg, grpc_error_handle error);
struct StringViewStringViewPairHash
: absl::flat_hash_set<
std::pair<absl::string_view, absl::string_view>>::hasher {
using is_transparent = void;
};
struct StringViewStringViewPairEq
: std::equal_to<std::pair<absl::string_view, absl::string_view>> {
using is_transparent = void;
};
RefCountedPtr<Server> server_; RefCountedPtr<Server> server_;
RefCountedPtr<Channel> channel_; RefCountedPtr<Channel> channel_;
// The index into Server::cqs_ of the CQ used as a starting point for // The index into Server::cqs_ of the CQ used as a starting point for
// where to publish new incoming calls. // where to publish new incoming calls.
size_t cq_idx_; size_t cq_idx_;
absl::optional<std::list<ChannelData*>::iterator> list_position_; absl::optional<std::list<ChannelData*>::iterator> list_position_;
// A hash-table of the methods and hosts of the registered methods.
// TODO(vjpai): Convert this to an STL map type as opposed to a direct
// bucket implementation. (Consider performance impact, hash function to
// use, etc.)
std::unique_ptr<std::vector<ChannelRegisteredMethod>>
old_registered_methods_;
// Map of registered methods.
absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
std::unique_ptr<ChannelRegisteredMethod>,
StringViewStringViewPairHash,
StringViewStringViewPairEq>
registered_methods_;
uint32_t registered_method_max_probes_;
grpc_closure finish_destroy_channel_closure_; grpc_closure finish_destroy_channel_closure_;
intptr_t channelz_socket_uuid_; intptr_t channelz_socket_uuid_;
}; };
@ -412,6 +365,17 @@ class Server : public InternallyRefCounted<Server>,
grpc_cq_completion completion; grpc_cq_completion completion;
}; };
struct StringViewStringViewPairHash
: absl::flat_hash_set<
std::pair<absl::string_view, absl::string_view>>::hasher {
using is_transparent = void;
};
struct StringViewStringViewPairEq
: std::equal_to<std::pair<absl::string_view, absl::string_view>> {
using is_transparent = void;
};
static void ListenerDestroyDone(void* arg, grpc_error_handle error); static void ListenerDestroyDone(void* arg, grpc_error_handle error);
static void DoneShutdownEvent(void* server, static void DoneShutdownEvent(void* server,
@ -497,7 +461,11 @@ class Server : public InternallyRefCounted<Server>,
bool starting_ ABSL_GUARDED_BY(mu_global_) = false; bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
CondVar starting_cv_; CondVar starting_cv_;
std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_; // Map of registered methods.
absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
std::unique_ptr<RegisteredMethod>,
StringViewStringViewPairHash, StringViewStringViewPairEq>
registered_methods_;
// Request matcher for unregistered methods. // Request matcher for unregistered methods.
std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_; std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>NSPrivacyTracking</key>
<false/>
<key>NSPrivacyCollectedDataTypes</key>
<array/>
<key>NSPrivacyTrackingDomains</key>
<array/>
<key>NSPrivacyAccessedAPITypes</key>
<array>
<dict>
<key>NSPrivacyAccessedAPIType</key>
<string>NSPrivacyAccessedAPICategoryFileTimestamp</string>
<key>NSPrivacyAccessedAPITypeReasons</key>
<array>
<string>C617.1</string>
</array>
</dict>
</array>
</dict>
</plist>

@ -34,6 +34,8 @@
:tag => "v#{version}", :tag => "v#{version}",
} }
s.resource = 'src/objective-c/PrivacyInfo.xcprivacy'
name = 'GRPCClient' name = 'GRPCClient'
s.module_name = name s.module_name = name
s.header_dir = name s.header_dir = name

Loading…
Cancel
Save