|
|
|
@ -839,9 +839,9 @@ void Server::Start() { |
|
|
|
|
if (unregistered_request_matcher_ == nullptr) { |
|
|
|
|
unregistered_request_matcher_ = make_real_request_matcher(); |
|
|
|
|
} |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { |
|
|
|
|
if (rm->matcher == nullptr) { |
|
|
|
|
rm->matcher = make_real_request_matcher(); |
|
|
|
|
for (auto& rm : registered_methods_) { |
|
|
|
|
if (rm.second->matcher == nullptr) { |
|
|
|
|
rm.second->matcher = make_real_request_matcher(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
{ |
|
|
|
@ -928,20 +928,11 @@ void Server::RegisterCompletionQueue(grpc_completion_queue* cq) { |
|
|
|
|
cqs_.push_back(cq); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
bool streq(const std::string& a, const char* b) { |
|
|
|
|
return (a.empty() && b == nullptr) || |
|
|
|
|
((b != nullptr) && !strcmp(a.c_str(), b)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
Server::RegisteredMethod* Server::RegisterMethod( |
|
|
|
|
const char* method, const char* host, |
|
|
|
|
grpc_server_register_method_payload_handling payload_handling, |
|
|
|
|
uint32_t flags) { |
|
|
|
|
if (IsRegisteredMethodsMapEnabled() && started_) { |
|
|
|
|
if (started_) { |
|
|
|
|
Crash("Attempting to register method after server started"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -950,21 +941,21 @@ Server::RegisteredMethod* Server::RegisterMethod( |
|
|
|
|
"grpc_server_register_method method string cannot be NULL"); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& m : registered_methods_) { |
|
|
|
|
if (streq(m->method, method) && streq(m->host, host)) { |
|
|
|
|
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, |
|
|
|
|
host ? host : "*"); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
auto key = std::make_pair(host ? host : "", method); |
|
|
|
|
if (registered_methods_.find(key) != registered_methods_.end()) { |
|
|
|
|
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method, |
|
|
|
|
host ? host : "*"); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
if (flags != 0) { |
|
|
|
|
gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x", |
|
|
|
|
flags); |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
registered_methods_.emplace_back(std::make_unique<RegisteredMethod>( |
|
|
|
|
method, host, payload_handling, flags)); |
|
|
|
|
return registered_methods_.back().get(); |
|
|
|
|
auto it = registered_methods_.emplace( |
|
|
|
|
key, std::make_unique<RegisteredMethod>(method, host, payload_handling, |
|
|
|
|
flags)); |
|
|
|
|
return it.first->second.get(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) { |
|
|
|
@ -1015,9 +1006,9 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) { |
|
|
|
|
if (started_) { |
|
|
|
|
unregistered_request_matcher_->KillRequests(error); |
|
|
|
|
unregistered_request_matcher_->ZombifyPending(); |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) { |
|
|
|
|
rm->matcher->KillRequests(error); |
|
|
|
|
rm->matcher->ZombifyPending(); |
|
|
|
|
for (auto& rm : registered_methods_) { |
|
|
|
|
rm.second->matcher->KillRequests(error); |
|
|
|
|
rm.second->matcher->ZombifyPending(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1252,7 +1243,6 @@ class Server::ChannelData::ConnectivityWatcher |
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
Server::ChannelData::~ChannelData() { |
|
|
|
|
old_registered_methods_.reset(); |
|
|
|
|
if (server_ != nullptr) { |
|
|
|
|
if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) { |
|
|
|
|
server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_); |
|
|
|
@ -1276,50 +1266,6 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server, |
|
|
|
|
channel_ = channel; |
|
|
|
|
cq_idx_ = cq_idx; |
|
|
|
|
channelz_socket_uuid_ = channelz_socket_uuid; |
|
|
|
|
// Build a lookup table phrased in terms of mdstr's in this channels context
|
|
|
|
|
// to quickly find registered methods.
|
|
|
|
|
size_t num_registered_methods = server_->registered_methods_.size(); |
|
|
|
|
if (!IsRegisteredMethodsMapEnabled() && num_registered_methods > 0) { |
|
|
|
|
uint32_t max_probes = 0; |
|
|
|
|
size_t slots = 2 * num_registered_methods; |
|
|
|
|
old_registered_methods_ = |
|
|
|
|
std::make_unique<std::vector<ChannelRegisteredMethod>>(slots); |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) { |
|
|
|
|
Slice host; |
|
|
|
|
Slice method = Slice::FromExternalString(rm->method); |
|
|
|
|
const bool has_host = !rm->host.empty(); |
|
|
|
|
if (has_host) { |
|
|
|
|
host = Slice::FromExternalString(rm->host); |
|
|
|
|
} |
|
|
|
|
uint32_t hash = MixHash32(has_host ? host.Hash() : 0, method.Hash()); |
|
|
|
|
uint32_t probes = 0; |
|
|
|
|
for (probes = 0; (*old_registered_methods_)[(hash + probes) % slots] |
|
|
|
|
.server_registered_method != nullptr; |
|
|
|
|
probes++) { |
|
|
|
|
} |
|
|
|
|
if (probes > max_probes) max_probes = probes; |
|
|
|
|
ChannelRegisteredMethod* crm = |
|
|
|
|
&(*old_registered_methods_)[(hash + probes) % slots]; |
|
|
|
|
crm->server_registered_method = rm.get(); |
|
|
|
|
crm->flags = rm->flags; |
|
|
|
|
crm->has_host = has_host; |
|
|
|
|
if (has_host) { |
|
|
|
|
crm->host = std::move(host); |
|
|
|
|
} |
|
|
|
|
crm->method = std::move(method); |
|
|
|
|
} |
|
|
|
|
GPR_ASSERT(slots <= UINT32_MAX); |
|
|
|
|
registered_method_max_probes_ = max_probes; |
|
|
|
|
} else if (IsRegisteredMethodsMapEnabled()) { |
|
|
|
|
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) { |
|
|
|
|
auto key = std::make_pair(!rm->host.empty() ? rm->host : "", rm->method); |
|
|
|
|
registered_methods_.emplace( |
|
|
|
|
key, std::make_unique<ChannelRegisteredMethod>( |
|
|
|
|
rm.get(), rm->flags, /*has_host=*/!rm->host.empty(), |
|
|
|
|
Slice::FromExternalString(rm->method), |
|
|
|
|
Slice::FromExternalString(rm->host))); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Publish channel.
|
|
|
|
|
{ |
|
|
|
|
MutexLock lock(&server_->mu_global_); |
|
|
|
@ -1345,45 +1291,17 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server, |
|
|
|
|
transport->PerformOp(op); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( |
|
|
|
|
const grpc_slice& host, const grpc_slice& path) { |
|
|
|
|
if (old_registered_methods_ == nullptr) return nullptr; |
|
|
|
|
// TODO(ctiller): unify these two searches
|
|
|
|
|
// check for an exact match with host
|
|
|
|
|
uint32_t hash = MixHash32(grpc_slice_hash(host), grpc_slice_hash(path)); |
|
|
|
|
for (size_t i = 0; i <= registered_method_max_probes_; i++) { |
|
|
|
|
ChannelRegisteredMethod* rm = &( |
|
|
|
|
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()]; |
|
|
|
|
if (rm->server_registered_method == nullptr) break; |
|
|
|
|
if (!rm->has_host) continue; |
|
|
|
|
if (rm->host != host) continue; |
|
|
|
|
if (rm->method != path) continue; |
|
|
|
|
return rm; |
|
|
|
|
} |
|
|
|
|
// check for a wildcard method definition (no host set)
|
|
|
|
|
hash = MixHash32(0, grpc_slice_hash(path)); |
|
|
|
|
for (size_t i = 0; i <= registered_method_max_probes_; i++) { |
|
|
|
|
ChannelRegisteredMethod* rm = &( |
|
|
|
|
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()]; |
|
|
|
|
if (rm->server_registered_method == nullptr) break; |
|
|
|
|
if (rm->has_host) continue; |
|
|
|
|
if (rm->method != path) continue; |
|
|
|
|
return rm; |
|
|
|
|
} |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod( |
|
|
|
|
Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod( |
|
|
|
|
const absl::string_view& host, const absl::string_view& path) { |
|
|
|
|
if (registered_methods_.empty()) return nullptr; |
|
|
|
|
if (server_->registered_methods_.empty()) return nullptr; |
|
|
|
|
// check for an exact match with host
|
|
|
|
|
auto it = registered_methods_.find(std::make_pair(host, path)); |
|
|
|
|
if (it != registered_methods_.end()) { |
|
|
|
|
auto it = server_->registered_methods_.find(std::make_pair(host, path)); |
|
|
|
|
if (it != server_->registered_methods_.end()) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
// check for wildcard method definition (no host set)
|
|
|
|
|
it = registered_methods_.find(std::make_pair("", path)); |
|
|
|
|
if (it != registered_methods_.end()) { |
|
|
|
|
it = server_->registered_methods_.find(std::make_pair("", path)); |
|
|
|
|
if (it != server_->registered_methods_.end()) { |
|
|
|
|
return it->second.get(); |
|
|
|
|
} |
|
|
|
|
return nullptr; |
|
|
|
@ -1404,13 +1322,8 @@ void Server::ChannelData::SetRegisteredMethodOnMetadata( |
|
|
|
|
// Path not being set would result in an RPC error.
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ChannelRegisteredMethod* method; |
|
|
|
|
if (!IsRegisteredMethodsMapEnabled()) { |
|
|
|
|
method = GetRegisteredMethod(authority->c_slice(), path->c_slice()); |
|
|
|
|
} else { |
|
|
|
|
method = GetRegisteredMethod(authority->as_string_view(), |
|
|
|
|
path->as_string_view()); |
|
|
|
|
} |
|
|
|
|
RegisteredMethod* method = |
|
|
|
|
GetRegisteredMethod(authority->as_string_view(), path->as_string_view()); |
|
|
|
|
// insert in metadata
|
|
|
|
|
metadata.Set(GrpcRegisteredMethod(), method); |
|
|
|
|
} |
|
|
|
@ -1481,24 +1394,20 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise( |
|
|
|
|
Timestamp deadline = GetContext<CallContext>()->deadline(); |
|
|
|
|
// Find request matcher.
|
|
|
|
|
RequestMatcherInterface* matcher; |
|
|
|
|
ChannelRegisteredMethod* rm = nullptr; |
|
|
|
|
RegisteredMethod* rm = nullptr; |
|
|
|
|
if (IsRegisteredMethodLookupInTransportEnabled()) { |
|
|
|
|
rm = static_cast<ChannelRegisteredMethod*>( |
|
|
|
|
rm = static_cast<RegisteredMethod*>( |
|
|
|
|
call_args.client_initial_metadata->get(GrpcRegisteredMethod()) |
|
|
|
|
.value_or(nullptr)); |
|
|
|
|
} else { |
|
|
|
|
if (!IsRegisteredMethodsMapEnabled()) { |
|
|
|
|
rm = chand->GetRegisteredMethod(host_ptr->c_slice(), path->c_slice()); |
|
|
|
|
} else { |
|
|
|
|
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(), |
|
|
|
|
path->as_string_view()); |
|
|
|
|
} |
|
|
|
|
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(), |
|
|
|
|
path->as_string_view()); |
|
|
|
|
} |
|
|
|
|
ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>> |
|
|
|
|
maybe_read_first_message([] { return NextResult<MessageHandle>(); }); |
|
|
|
|
if (rm != nullptr) { |
|
|
|
|
matcher = rm->server_registered_method->matcher.get(); |
|
|
|
|
switch (rm->server_registered_method->payload_handling) { |
|
|
|
|
matcher = rm->matcher.get(); |
|
|
|
|
switch (rm->payload_handling) { |
|
|
|
|
case GRPC_SRM_PAYLOAD_NONE: |
|
|
|
|
break; |
|
|
|
|
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: |
|
|
|
@ -1752,22 +1661,18 @@ void Server::CallData::StartNewRpc(grpc_call_element* elem) { |
|
|
|
|
grpc_server_register_method_payload_handling payload_handling = |
|
|
|
|
GRPC_SRM_PAYLOAD_NONE; |
|
|
|
|
if (path_.has_value() && host_.has_value()) { |
|
|
|
|
ChannelRegisteredMethod* rm; |
|
|
|
|
RegisteredMethod* rm; |
|
|
|
|
if (IsRegisteredMethodLookupInTransportEnabled()) { |
|
|
|
|
rm = static_cast<ChannelRegisteredMethod*>( |
|
|
|
|
rm = static_cast<RegisteredMethod*>( |
|
|
|
|
recv_initial_metadata_->get(GrpcRegisteredMethod()) |
|
|
|
|
.value_or(nullptr)); |
|
|
|
|
} else { |
|
|
|
|
if (!IsRegisteredMethodsMapEnabled()) { |
|
|
|
|
rm = chand->GetRegisteredMethod(host_->c_slice(), path_->c_slice()); |
|
|
|
|
} else { |
|
|
|
|
rm = chand->GetRegisteredMethod(host_->as_string_view(), |
|
|
|
|
path_->as_string_view()); |
|
|
|
|
} |
|
|
|
|
rm = chand->GetRegisteredMethod(host_->as_string_view(), |
|
|
|
|
path_->as_string_view()); |
|
|
|
|
} |
|
|
|
|
if (rm != nullptr) { |
|
|
|
|
matcher_ = rm->server_registered_method->matcher.get(); |
|
|
|
|
payload_handling = rm->server_registered_method->payload_handling; |
|
|
|
|
matcher_ = rm->matcher.get(); |
|
|
|
|
payload_handling = rm->payload_handling; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Start recv_message op if needed.
|
|
|
|
|