Merge remote-tracking branch 'upstream/master' into fix-dns-resolver-cooldown-test

pull/35251/head
yijiem 12 months ago
commit 857835200a
  1. 12
      doc/service_config.md
  2. 2
      gRPC.podspec
  3. 1
      requirements.bazel.txt
  4. 1
      src/core/ext/filters/http/client/http_client_filter.cc
  5. 1
      src/core/ext/filters/http/client/http_client_filter.h
  6. 1
      src/core/ext/filters/http/server/http_server_filter.cc
  7. 1
      src/core/ext/filters/http/server/http_server_filter.h
  8. 124
      src/core/ext/filters/message_size/message_size_filter.cc
  9. 58
      src/core/ext/filters/message_size/message_size_filter.h
  10. 179
      src/core/lib/channel/promise_based_filter.h
  11. 74
      src/core/lib/channel/server_call_tracer_filter.cc
  12. 167
      src/core/lib/surface/server.cc
  13. 68
      src/core/lib/surface/server.h
  14. 23
      src/objective-c/PrivacyInfo.xcprivacy
  15. 2
      src/python/grpcio/grpc/_server.py
  16. 2
      templates/gRPC.podspec.template
  17. 10
      tools/run_tests/run_performance_tests.py
  18. 4
      tools/run_tests/run_tests.py

@ -62,12 +62,12 @@ DNS](https://github.com/grpc/proposal/blob/master/A2-service-configs-in-dns.md).
Here is an example service config in protobuf form:
```
```textproto
{
// Use round_robin LB policy.
# Use round_robin LB policy.
load_balancing_config: { round_robin: {} }
// This method config applies to method "foo/bar" and to all methods
// of service "baz".
# This method config applies to method "foo/bar" and to all methods
# of service "baz".
method_config: {
name: {
service: "foo"
@ -76,7 +76,7 @@ Here is an example service config in protobuf form:
name: {
service: "baz"
}
// Default timeout for matching methods.
# Default timeout for matching methods.
timeout: {
seconds: 1
nanos: 1
@ -87,7 +87,7 @@ Here is an example service config in protobuf form:
Here is the same example service config in JSON form:
```
```json
{
"loadBalancingConfig": [ { "round_robin": {} } ],
"methodConfig": [

2
gRPC.podspec generated

@ -32,6 +32,8 @@ Pod::Spec.new do |s|
:tag => "v#{version}",
}
s.resource = 'src/objective-c/PrivacyInfo.xcprivacy'
name = 'GRPCClient'
s.module_name = name
s.header_dir = name

@ -16,3 +16,4 @@ xds-protos==0.0.11
opencensus==0.10.0
opencensus-ext-stackdriver==0.8.0
absl-py==1.4.0
googleapis-common-protos==1.61.0

@ -53,6 +53,7 @@ namespace grpc_core {
const NoInterceptor HttpClientFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpClientFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpClientFilter::Call::OnFinalize;
const grpc_channel_filter HttpClientFilter::kFilter =
MakePromiseBasedFilter<HttpClientFilter, FilterEndpoint::kClient,

@ -45,6 +45,7 @@ class HttpClientFilter : public ImplementChannelFilter<HttpClientFilter> {
absl::Status OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
private:

@ -51,6 +51,7 @@ namespace grpc_core {
const NoInterceptor HttpServerFilter::Call::OnClientToServerMessage;
const NoInterceptor HttpServerFilter::Call::OnServerToClientMessage;
const NoInterceptor HttpServerFilter::Call::OnFinalize;
const grpc_channel_filter HttpServerFilter::kFilter =
MakePromiseBasedFilter<HttpServerFilter, FilterEndpoint::kServer,

@ -47,6 +47,7 @@ class HttpServerFilter : public ImplementChannelFilter<HttpServerFilter> {
void OnServerTrailingMetadata(ServerMetadata& md);
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
static const NoInterceptor OnFinalize;
};
private:

@ -50,6 +50,15 @@
namespace grpc_core {
const NoInterceptor ClientMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ClientMessageSizeFilter::Call::OnFinalize;
const NoInterceptor ServerMessageSizeFilter::Call::OnClientInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerInitialMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnServerTrailingMetadata;
const NoInterceptor ServerMessageSizeFilter::Call::OnFinalize;
//
// MessageSizeParsedConfig
//
@ -138,60 +147,6 @@ const grpc_channel_filter ServerMessageSizeFilter::kFilter =
kFilterExaminesOutboundMessages |
kFilterExaminesInboundMessages>("message_size");
class MessageSizeFilter::CallBuilder {
private:
auto Interceptor(uint32_t max_length, bool is_send) {
return [max_length, is_send,
err = err_](MessageHandle msg) -> absl::optional<MessageHandle> {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
Activity::current()->DebugTag().c_str(),
is_send ? "send" : "recv", msg->payload()->Length(),
max_length);
}
if (msg->payload()->Length() > max_length) {
if (err->is_set()) return std::move(msg);
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(
GetContext<Arena>());
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(),
Slice::FromCopiedString(
absl::StrFormat("%s message larger than max (%u vs. %d)",
is_send ? "Sent" : "Received",
msg->payload()->Length(), max_length)));
err->Set(std::move(r));
return absl::nullopt;
}
return std::move(msg);
};
}
public:
explicit CallBuilder(const MessageSizeParsedConfig& limits)
: limits_(limits) {}
template <typename T>
void AddSend(T* pipe_end) {
if (!limits_.max_send_size().has_value()) return;
pipe_end->InterceptAndMap(Interceptor(*limits_.max_send_size(), true));
}
template <typename T>
void AddRecv(T* pipe_end) {
if (!limits_.max_recv_size().has_value()) return;
pipe_end->InterceptAndMap(Interceptor(*limits_.max_recv_size(), false));
}
ArenaPromise<ServerMetadataHandle> Run(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
return Race(err_->Wait(), next_promise_factory(std::move(call_args)));
}
private:
Latch<ServerMetadataHandle>* const err_ =
GetContext<Arena>()->ManagedNew<Latch<ServerMetadataHandle>>();
MessageSizeParsedConfig limits_;
};
absl::StatusOr<ClientMessageSizeFilter> ClientMessageSizeFilter::Create(
const ChannelArgs& args, ChannelFilter::Args) {
return ClientMessageSizeFilter(args);
@ -202,20 +157,40 @@ absl::StatusOr<ServerMessageSizeFilter> ServerMessageSizeFilter::Create(
return ServerMessageSizeFilter(args);
}
ArenaPromise<ServerMetadataHandle> ClientMessageSizeFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
namespace {
ServerMetadataHandle CheckPayload(const Message& msg,
absl::optional<uint32_t> max_length,
bool is_send) {
if (!max_length.has_value()) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(grpc_call_trace)) {
gpr_log(GPR_INFO, "%s[message_size] %s len:%" PRIdPTR " max:%d",
Activity::current()->DebugTag().c_str(), is_send ? "send" : "recv",
msg.payload()->Length(), *max_length);
}
if (msg.payload()->Length() <= *max_length) return nullptr;
auto r = GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>());
r->Set(GrpcStatusMetadata(), GRPC_STATUS_RESOURCE_EXHAUSTED);
r->Set(GrpcMessageMetadata(), Slice::FromCopiedString(absl::StrFormat(
"%s message larger than max (%u vs. %d)",
is_send ? "Sent" : "Received",
msg.payload()->Length(), *max_length)));
return r;
}
} // namespace
ClientMessageSizeFilter::Call::Call(ClientMessageSizeFilter* filter)
: limits_(filter->parsed_config_) {
// Get max sizes from channel data, then merge in per-method config values.
// Note: Per-method config is only available on the client, so we
// apply the max request size to the send limit and the max response
// size to the receive limit.
MessageSizeParsedConfig limits = this->limits();
const MessageSizeParsedConfig* config_from_call_context =
MessageSizeParsedConfig::GetFromCallContext(
GetContext<grpc_call_context_element>(),
service_config_parser_index_);
filter->service_config_parser_index_);
if (config_from_call_context != nullptr) {
absl::optional<uint32_t> max_send_size = limits.max_send_size();
absl::optional<uint32_t> max_recv_size = limits.max_recv_size();
absl::optional<uint32_t> max_send_size = limits_.max_send_size();
absl::optional<uint32_t> max_recv_size = limits_.max_recv_size();
if (config_from_call_context->max_send_size().has_value() &&
(!max_send_size.has_value() ||
*config_from_call_context->max_send_size() < *max_send_size)) {
@ -226,21 +201,28 @@ ArenaPromise<ServerMetadataHandle> ClientMessageSizeFilter::MakeCallPromise(
*config_from_call_context->max_recv_size() < *max_recv_size)) {
max_recv_size = *config_from_call_context->max_recv_size();
}
limits = MessageSizeParsedConfig(max_send_size, max_recv_size);
limits_ = MessageSizeParsedConfig(max_send_size, max_recv_size);
}
}
ServerMetadataHandle ServerMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter) {
return CheckPayload(message, filter->parsed_config_.max_recv_size(), false);
}
ServerMetadataHandle ServerMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message, ServerMessageSizeFilter* filter) {
return CheckPayload(message, filter->parsed_config_.max_send_size(), true);
}
CallBuilder b(limits);
b.AddSend(call_args.client_to_server_messages);
b.AddRecv(call_args.server_to_client_messages);
return b.Run(std::move(call_args), std::move(next_promise_factory));
ServerMetadataHandle ClientMessageSizeFilter::Call::OnClientToServerMessage(
const Message& message) {
return CheckPayload(message, limits_.max_send_size(), true);
}
ArenaPromise<ServerMetadataHandle> ServerMessageSizeFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
CallBuilder b(limits());
b.AddSend(call_args.server_to_client_messages);
b.AddRecv(call_args.client_to_server_messages);
return b.Run(std::move(call_args), std::move(next_promise_factory));
ServerMetadataHandle ClientMessageSizeFilter::Call::OnServerToClientMessage(
const Message& message) {
return CheckPayload(message, limits_.max_recv_size(), false);
}
namespace {

@ -86,48 +86,60 @@ class MessageSizeParser : public ServiceConfigParser::Parser {
absl::optional<uint32_t> GetMaxRecvSizeFromChannelArgs(const ChannelArgs& args);
absl::optional<uint32_t> GetMaxSendSizeFromChannelArgs(const ChannelArgs& args);
class MessageSizeFilter : public ChannelFilter {
protected:
explicit MessageSizeFilter(const ChannelArgs& args)
: limits_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
class CallBuilder;
const MessageSizeParsedConfig& limits() const { return limits_; }
private:
MessageSizeParsedConfig limits_;
};
class ServerMessageSizeFilter final : public MessageSizeFilter {
class ServerMessageSizeFilter final
: public ImplementChannelFilter<ServerMessageSizeFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(
const Message& message, ServerMessageSizeFilter* filter);
ServerMetadataHandle OnServerToClientMessage(
const Message& message, ServerMessageSizeFilter* filter);
};
private:
using MessageSizeFilter::MessageSizeFilter;
explicit ServerMessageSizeFilter(const ChannelArgs& args)
: parsed_config_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
const MessageSizeParsedConfig parsed_config_;
};
class ClientMessageSizeFilter final : public MessageSizeFilter {
class ClientMessageSizeFilter final
: public ImplementChannelFilter<ClientMessageSizeFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientMessageSizeFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
explicit Call(ClientMessageSizeFilter* filter);
static const NoInterceptor OnClientInitialMetadata;
static const NoInterceptor OnServerInitialMetadata;
static const NoInterceptor OnServerTrailingMetadata;
static const NoInterceptor OnFinalize;
ServerMetadataHandle OnClientToServerMessage(const Message& message);
ServerMetadataHandle OnServerToClientMessage(const Message& message);
private:
MessageSizeParsedConfig limits_;
};
private:
explicit ClientMessageSizeFilter(const ChannelArgs& args)
: parsed_config_(MessageSizeParsedConfig::GetFromChannelArgs(args)) {}
const size_t service_config_parser_index_{MessageSizeParser::ParserIndex()};
using MessageSizeFilter::MessageSizeFilter;
const MessageSizeParsedConfig parsed_config_;
};
} // namespace grpc_core

@ -186,6 +186,11 @@ inline constexpr bool HasChannelAccess(R (T::*)(A)) {
return false;
}
template <typename T, typename R, typename A>
inline constexpr bool HasChannelAccess(R (T::*)()) {
return false;
}
template <typename T, typename R, typename A, typename C>
inline constexpr bool HasChannelAccess(R (T::*)(A, C)) {
return true;
@ -208,7 +213,8 @@ inline constexpr bool CallHasChannelAccess() {
&Derived::Call::OnClientToServerMessage,
&Derived::Call::OnServerInitialMetadata,
&Derived::Call::OnServerToClientMessage,
&Derived::Call::OnServerTrailingMetadata);
&Derived::Call::OnServerTrailingMetadata,
&Derived::Call::OnFinalize);
}
// Given a boolean X export a type:
@ -254,11 +260,31 @@ struct RaceAsyncCompletion<true> {
}
};
// Zero-member wrapper to make sure that Call always has a constructor
// that takes a channel pointer (even if it's thrown away)
template <typename Derived, typename SfinaeVoid = void>
class CallWrapper;
template <typename Derived>
class CallWrapper<Derived, absl::void_t<decltype(typename Derived::Call(
std::declval<Derived*>()))>>
: public Derived::Call {
public:
explicit CallWrapper(Derived* channel) : Derived::Call(channel) {}
};
template <typename Derived>
class CallWrapper<Derived, absl::void_t<decltype(typename Derived::Call())>>
: public Derived::Call {
public:
explicit CallWrapper(Derived*) : Derived::Call() {}
};
// For the original promise scheme polyfill: data associated with once call.
template <typename Derived>
struct FilterCallData {
explicit FilterCallData(Derived* channel) : channel(channel) {}
GPR_NO_UNIQUE_ADDRESS typename Derived::Call call;
explicit FilterCallData(Derived* channel) : call(channel), channel(channel) {}
GPR_NO_UNIQUE_ADDRESS CallWrapper<Derived> call;
GPR_NO_UNIQUE_ADDRESS
typename TypeIfNeeded<Latch<ServerMetadataHandle>,
CallHasAsyncErrorInterceptor<Derived>()>::Type
@ -347,9 +373,68 @@ inline auto RunCall(void (Derived::Call::*fn)(ClientMetadata& md,
inline void InterceptClientToServerMessage(const NoInterceptor*, void*,
const CallArgs&) {}
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call_data->call.OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_args.client_to_server_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md =
call_data->call.OnClientToServerMessage(*msg, call_data->channel);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
}
inline void InterceptClientToServerMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
});
}
template <typename Derived>
inline void InterceptClientToServerMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnClientToServerMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnClientToServerMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
});
}
inline void InterceptClientInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -441,7 +526,6 @@ inline void InterceptServerInitialMetadata(
});
}
template <typename CallArgs>
inline void InterceptServerInitialMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -474,9 +558,68 @@ inline void InterceptServerInitialMetadata(
inline void InterceptServerToClientMessage(const NoInterceptor*, void*,
const CallArgs&) {}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call_data->call.OnServerToClientMessage(*msg);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
FilterCallData<Derived>* call_data, const CallArgs& call_args) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_args.server_to_client_messages->InterceptAndMap(
[call_data](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md =
call_data->call.OnServerToClientMessage(*msg, call_data->channel);
if (return_md == nullptr) return std::move(msg);
if (call_data->error_latch.is_set()) return absl::nullopt;
call_data->error_latch.Set(std::move(return_md));
return absl::nullopt;
});
}
inline void InterceptServerToClientMessage(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&),
typename Derived::Call* call, Derived*, CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
});
}
template <typename Derived>
inline void InterceptServerToClientMessage(
ServerMetadataHandle (Derived::Call::*fn)(const Message&, Derived*),
typename Derived::Call* call, Derived* channel,
CallSpineInterface* call_spine) {
GPR_DEBUG_ASSERT(fn == &Derived::Call::OnServerToClientMessage);
call_spine->server_to_client_messages().sender.InterceptAndMap(
[call, call_spine,
channel](MessageHandle msg) -> absl::optional<MessageHandle> {
auto return_md = call->OnServerToClientMessage(*msg, channel);
if (return_md == nullptr) return std::move(msg);
return call_spine->Cancel(std::move(return_md));
});
}
inline void InterceptServerTrailingMetadata(const NoInterceptor*, void*, void*,
CallSpineInterface*) {}
@ -505,6 +648,18 @@ inline void InterceptServerTrailingMetadata(
});
}
inline void InterceptFinalize(const NoInterceptor*, void*) {}
template <class Call>
inline void InterceptFinalize(void (Call::*fn)(const grpc_call_final_info*),
Call* call) {
GPR_DEBUG_ASSERT(fn == &Call::OnFinalize);
GetContext<CallFinalization>()->Add(
[call](const grpc_call_final_info* final_info) {
call->OnFinalize(final_info);
});
}
template <typename Derived>
absl::enable_if_t<std::is_empty<FilterCallData<Derived>>::value,
FilterCallData<Derived>*>
@ -537,6 +692,7 @@ MakeFilterCall(Derived* derived) {
// - OnServerToClientMessage - $VALUE_TYPE = Message
// - OnClientToServerMessage - $VALUE_TYPE = Message
// - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata
// - OnFinalize - special, see below
// These members define an interception point for a particular event in
// the call lifecycle.
// The type of these members matters, and is selectable by the class
@ -569,12 +725,21 @@ MakeFilterCall(Derived* derived) {
// the filter can return nullptr for success, or a metadata handle for
// failure (in which case the call will be aborted).
// useful for cases where the exact metadata returned needs to be customized.
// Finally, OnFinalize can be added to intecept call finalization.
// It must have one of the signatures:
// - static const NoInterceptor OnFinalize:
// the filter does not intercept call finalization.
// - void OnFinalize(const grpc_call_final_info*):
// the filter intercepts call finalization.
template <typename Derived>
class ImplementChannelFilter : public ChannelFilter {
public:
// Natively construct a v3 call.
void InitCall(CallSpineInterface* call_spine) {
auto* call = GetContext<Arena>()->ManagedNew<typename Derived::Call>();
typename Derived::Call* call =
GetContext<Arena>()
->ManagedNew<promise_filter_detail::CallWrapper<Derived>>(
static_cast<Derived*>(this));
promise_filter_detail::InterceptClientInitialMetadata(
&Derived::Call::OnClientInitialMetadata, call,
static_cast<Derived*>(this), call_spine);
@ -590,6 +755,7 @@ class ImplementChannelFilter : public ChannelFilter {
promise_filter_detail::InterceptServerTrailingMetadata(
&Derived::Call::OnServerTrailingMetadata, call,
static_cast<Derived*>(this), call_spine);
promise_filter_detail::InterceptFinalize(&Derived::Call::OnFinalize, call);
}
// Polyfill for the original promise scheme.
@ -605,6 +771,9 @@ class ImplementChannelFilter : public ChannelFilter {
&Derived::Call::OnServerInitialMetadata, call, call_args);
promise_filter_detail::InterceptServerToClientMessage(
&Derived::Call::OnServerToClientMessage, call, call_args);
promise_filter_detail::InterceptFinalize(
&Derived::Call::OnFinalize,
static_cast<typename Derived::Call*>(&call->call));
return promise_filter_detail::MapResult(
&Derived::Call::OnServerTrailingMetadata,
promise_filter_detail::RaceAsyncCompletion<

@ -42,19 +42,55 @@ namespace grpc_core {
namespace {
// TODO(yashykt): This filter is not really needed. We should be able to move
// this to the connected filter.
class ServerCallTracerFilter : public ChannelFilter {
class ServerCallTracerFilter
: public ImplementChannelFilter<ServerCallTracerFilter> {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ServerCallTracerFilter> Create(
const ChannelArgs& /*args*/, ChannelFilter::Args /*filter_args*/);
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
class Call {
public:
void OnClientInitialMetadata(ClientMetadata& client_initial_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordReceivedInitialMetadata(&client_initial_metadata);
}
void OnServerInitialMetadata(ServerMetadata& server_initial_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordSendInitialMetadata(&server_initial_metadata);
}
void OnFinalize(const grpc_call_final_info* final_info) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordEnd(final_info);
}
void OnServerTrailingMetadata(ServerMetadata& server_trailing_metadata) {
auto* call_tracer = CallTracer();
if (call_tracer == nullptr) return;
call_tracer->RecordSendTrailingMetadata(&server_trailing_metadata);
}
static const NoInterceptor OnClientToServerMessage;
static const NoInterceptor OnServerToClientMessage;
private:
static ServerCallTracer* CallTracer() {
auto* call_context = GetContext<grpc_call_context_element>();
return static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
}
};
};
const NoInterceptor ServerCallTracerFilter::Call::OnClientToServerMessage;
const NoInterceptor ServerCallTracerFilter::Call::OnServerToClientMessage;
const grpc_channel_filter ServerCallTracerFilter::kFilter =
MakePromiseBasedFilter<ServerCallTracerFilter, FilterEndpoint::kServer,
kFilterExaminesServerInitialMetadata>(
@ -65,34 +101,6 @@ absl::StatusOr<ServerCallTracerFilter> ServerCallTracerFilter::Create(
return ServerCallTracerFilter();
}
ArenaPromise<ServerMetadataHandle> ServerCallTracerFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
auto* call_context = GetContext<grpc_call_context_element>();
auto* call_tracer = static_cast<ServerCallTracer*>(
call_context[GRPC_CONTEXT_CALL_TRACER].value);
if (call_tracer == nullptr) {
return next_promise_factory(std::move(call_args));
}
call_tracer->RecordReceivedInitialMetadata(
call_args.client_initial_metadata.get());
call_args.server_initial_metadata->InterceptAndMap(
[call_tracer](ServerMetadataHandle metadata) {
call_tracer->RecordSendInitialMetadata(metadata.get());
return metadata;
});
GetContext<CallFinalization>()->Add(
[call_tracer](const grpc_call_final_info* final_info) {
call_tracer->RecordEnd(final_info);
});
return OnCancel(
Map(next_promise_factory(std::move(call_args)),
[call_tracer](ServerMetadataHandle md) {
call_tracer->RecordSendTrailingMetadata(md.get());
return md;
}),
[call_tracer]() { call_tracer->RecordCancel(absl::CancelledError()); });
}
} // namespace
void RegisterServerCallTracerFilter(CoreConfiguration::Builder* builder) {

@ -839,9 +839,9 @@ void Server::Start() {
if (unregistered_request_matcher_ == nullptr) {
unregistered_request_matcher_ = make_real_request_matcher();
}
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) {
if (rm->matcher == nullptr) {
rm->matcher = make_real_request_matcher();
for (auto& rm : registered_methods_) {
if (rm.second->matcher == nullptr) {
rm.second->matcher = make_real_request_matcher();
}
}
{
@ -928,20 +928,11 @@ void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
cqs_.push_back(cq);
}
namespace {
bool streq(const std::string& a, const char* b) {
return (a.empty() && b == nullptr) ||
((b != nullptr) && !strcmp(a.c_str(), b));
}
} // namespace
Server::RegisteredMethod* Server::RegisterMethod(
const char* method, const char* host,
grpc_server_register_method_payload_handling payload_handling,
uint32_t flags) {
if (IsRegisteredMethodsMapEnabled() && started_) {
if (started_) {
Crash("Attempting to register method after server started");
}
@ -950,21 +941,21 @@ Server::RegisteredMethod* Server::RegisterMethod(
"grpc_server_register_method method string cannot be NULL");
return nullptr;
}
for (std::unique_ptr<RegisteredMethod>& m : registered_methods_) {
if (streq(m->method, method) && streq(m->host, host)) {
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
host ? host : "*");
return nullptr;
}
auto key = std::make_pair(host ? host : "", method);
if (registered_methods_.find(key) != registered_methods_.end()) {
gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
host ? host : "*");
return nullptr;
}
if (flags != 0) {
gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
flags);
return nullptr;
}
registered_methods_.emplace_back(std::make_unique<RegisteredMethod>(
method, host, payload_handling, flags));
return registered_methods_.back().get();
auto it = registered_methods_.emplace(
key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
flags));
return it.first->second.get();
}
void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
@ -1015,9 +1006,9 @@ void Server::KillPendingWorkLocked(grpc_error_handle error) {
if (started_) {
unregistered_request_matcher_->KillRequests(error);
unregistered_request_matcher_->ZombifyPending();
for (std::unique_ptr<RegisteredMethod>& rm : registered_methods_) {
rm->matcher->KillRequests(error);
rm->matcher->ZombifyPending();
for (auto& rm : registered_methods_) {
rm.second->matcher->KillRequests(error);
rm.second->matcher->ZombifyPending();
}
}
}
@ -1252,7 +1243,6 @@ class Server::ChannelData::ConnectivityWatcher
//
Server::ChannelData::~ChannelData() {
old_registered_methods_.reset();
if (server_ != nullptr) {
if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
@ -1276,50 +1266,6 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
channel_ = channel;
cq_idx_ = cq_idx;
channelz_socket_uuid_ = channelz_socket_uuid;
// Build a lookup table phrased in terms of mdstr's in this channels context
// to quickly find registered methods.
size_t num_registered_methods = server_->registered_methods_.size();
if (!IsRegisteredMethodsMapEnabled() && num_registered_methods > 0) {
uint32_t max_probes = 0;
size_t slots = 2 * num_registered_methods;
old_registered_methods_ =
std::make_unique<std::vector<ChannelRegisteredMethod>>(slots);
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) {
Slice host;
Slice method = Slice::FromExternalString(rm->method);
const bool has_host = !rm->host.empty();
if (has_host) {
host = Slice::FromExternalString(rm->host);
}
uint32_t hash = MixHash32(has_host ? host.Hash() : 0, method.Hash());
uint32_t probes = 0;
for (probes = 0; (*old_registered_methods_)[(hash + probes) % slots]
.server_registered_method != nullptr;
probes++) {
}
if (probes > max_probes) max_probes = probes;
ChannelRegisteredMethod* crm =
&(*old_registered_methods_)[(hash + probes) % slots];
crm->server_registered_method = rm.get();
crm->flags = rm->flags;
crm->has_host = has_host;
if (has_host) {
crm->host = std::move(host);
}
crm->method = std::move(method);
}
GPR_ASSERT(slots <= UINT32_MAX);
registered_method_max_probes_ = max_probes;
} else if (IsRegisteredMethodsMapEnabled()) {
for (std::unique_ptr<RegisteredMethod>& rm : server_->registered_methods_) {
auto key = std::make_pair(!rm->host.empty() ? rm->host : "", rm->method);
registered_methods_.emplace(
key, std::make_unique<ChannelRegisteredMethod>(
rm.get(), rm->flags, /*has_host=*/!rm->host.empty(),
Slice::FromExternalString(rm->method),
Slice::FromExternalString(rm->host)));
}
}
// Publish channel.
{
MutexLock lock(&server_->mu_global_);
@ -1345,45 +1291,17 @@ void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
transport->PerformOp(op);
}
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
const grpc_slice& host, const grpc_slice& path) {
if (old_registered_methods_ == nullptr) return nullptr;
// TODO(ctiller): unify these two searches
// check for an exact match with host
uint32_t hash = MixHash32(grpc_slice_hash(host), grpc_slice_hash(path));
for (size_t i = 0; i <= registered_method_max_probes_; i++) {
ChannelRegisteredMethod* rm = &(
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()];
if (rm->server_registered_method == nullptr) break;
if (!rm->has_host) continue;
if (rm->host != host) continue;
if (rm->method != path) continue;
return rm;
}
// check for a wildcard method definition (no host set)
hash = MixHash32(0, grpc_slice_hash(path));
for (size_t i = 0; i <= registered_method_max_probes_; i++) {
ChannelRegisteredMethod* rm = &(
*old_registered_methods_)[(hash + i) % old_registered_methods_->size()];
if (rm->server_registered_method == nullptr) break;
if (rm->has_host) continue;
if (rm->method != path) continue;
return rm;
}
return nullptr;
}
Server::ChannelRegisteredMethod* Server::ChannelData::GetRegisteredMethod(
Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod(
const absl::string_view& host, const absl::string_view& path) {
if (registered_methods_.empty()) return nullptr;
if (server_->registered_methods_.empty()) return nullptr;
// check for an exact match with host
auto it = registered_methods_.find(std::make_pair(host, path));
if (it != registered_methods_.end()) {
auto it = server_->registered_methods_.find(std::make_pair(host, path));
if (it != server_->registered_methods_.end()) {
return it->second.get();
}
// check for wildcard method definition (no host set)
it = registered_methods_.find(std::make_pair("", path));
if (it != registered_methods_.end()) {
it = server_->registered_methods_.find(std::make_pair("", path));
if (it != server_->registered_methods_.end()) {
return it->second.get();
}
return nullptr;
@ -1404,13 +1322,8 @@ void Server::ChannelData::SetRegisteredMethodOnMetadata(
// Path not being set would result in an RPC error.
return;
}
ChannelRegisteredMethod* method;
if (!IsRegisteredMethodsMapEnabled()) {
method = GetRegisteredMethod(authority->c_slice(), path->c_slice());
} else {
method = GetRegisteredMethod(authority->as_string_view(),
path->as_string_view());
}
RegisteredMethod* method =
GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
// insert in metadata
metadata.Set(GrpcRegisteredMethod(), method);
}
@ -1481,24 +1394,20 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
Timestamp deadline = GetContext<CallContext>()->deadline();
// Find request matcher.
RequestMatcherInterface* matcher;
ChannelRegisteredMethod* rm = nullptr;
RegisteredMethod* rm = nullptr;
if (IsRegisteredMethodLookupInTransportEnabled()) {
rm = static_cast<ChannelRegisteredMethod*>(
rm = static_cast<RegisteredMethod*>(
call_args.client_initial_metadata->get(GrpcRegisteredMethod())
.value_or(nullptr));
} else {
if (!IsRegisteredMethodsMapEnabled()) {
rm = chand->GetRegisteredMethod(host_ptr->c_slice(), path->c_slice());
} else {
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(),
path->as_string_view());
}
rm = chand->GetRegisteredMethod(host_ptr->as_string_view(),
path->as_string_view());
}
ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>>
maybe_read_first_message([] { return NextResult<MessageHandle>(); });
if (rm != nullptr) {
matcher = rm->server_registered_method->matcher.get();
switch (rm->server_registered_method->payload_handling) {
matcher = rm->matcher.get();
switch (rm->payload_handling) {
case GRPC_SRM_PAYLOAD_NONE:
break;
case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER:
@ -1752,22 +1661,18 @@ void Server::CallData::StartNewRpc(grpc_call_element* elem) {
grpc_server_register_method_payload_handling payload_handling =
GRPC_SRM_PAYLOAD_NONE;
if (path_.has_value() && host_.has_value()) {
ChannelRegisteredMethod* rm;
RegisteredMethod* rm;
if (IsRegisteredMethodLookupInTransportEnabled()) {
rm = static_cast<ChannelRegisteredMethod*>(
rm = static_cast<RegisteredMethod*>(
recv_initial_metadata_->get(GrpcRegisteredMethod())
.value_or(nullptr));
} else {
if (!IsRegisteredMethodsMapEnabled()) {
rm = chand->GetRegisteredMethod(host_->c_slice(), path_->c_slice());
} else {
rm = chand->GetRegisteredMethod(host_->as_string_view(),
path_->as_string_view());
}
rm = chand->GetRegisteredMethod(host_->as_string_view(),
path_->as_string_view());
}
if (rm != nullptr) {
matcher_ = rm->server_registered_method->matcher.get();
payload_handling = rm->server_registered_method->payload_handling;
matcher_ = rm->matcher.get();
payload_handling = rm->payload_handling;
}
}
// Start recv_message op if needed.

@ -211,26 +211,6 @@ class Server : public InternallyRefCounted<Server>,
private:
struct RequestedCall;
struct ChannelRegisteredMethod {
ChannelRegisteredMethod() = default;
ChannelRegisteredMethod(RegisteredMethod* server_registered_method_arg,
uint32_t flags_arg, bool has_host_arg,
Slice method_arg, Slice host_arg)
: server_registered_method(server_registered_method_arg),
flags(flags_arg),
has_host(has_host_arg),
method(std::move(method_arg)),
host(std::move(host_arg)) {}
~ChannelRegisteredMethod() = default;
RegisteredMethod* server_registered_method = nullptr;
uint32_t flags;
bool has_host;
Slice method;
Slice host;
};
class RequestMatcherInterface;
class RealRequestMatcherFilterStack;
class RealRequestMatcherPromises;
@ -251,11 +231,8 @@ class Server : public InternallyRefCounted<Server>,
Channel* channel() const { return channel_.get(); }
size_t cq_idx() const { return cq_idx_; }
ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
const grpc_slice& path);
ChannelRegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
const absl::string_view& path);
// Filter vtable functions.
static grpc_error_handle InitChannelElement(
grpc_channel_element* elem, grpc_channel_element_args* args);
@ -274,36 +251,12 @@ class Server : public InternallyRefCounted<Server>,
static void FinishDestroy(void* arg, grpc_error_handle error);
struct StringViewStringViewPairHash
: absl::flat_hash_set<
std::pair<absl::string_view, absl::string_view>>::hasher {
using is_transparent = void;
};
struct StringViewStringViewPairEq
: std::equal_to<std::pair<absl::string_view, absl::string_view>> {
using is_transparent = void;
};
RefCountedPtr<Server> server_;
RefCountedPtr<Channel> channel_;
// The index into Server::cqs_ of the CQ used as a starting point for
// where to publish new incoming calls.
size_t cq_idx_;
absl::optional<std::list<ChannelData*>::iterator> list_position_;
// A hash-table of the methods and hosts of the registered methods.
// TODO(vjpai): Convert this to an STL map type as opposed to a direct
// bucket implementation. (Consider performance impact, hash function to
// use, etc.)
std::unique_ptr<std::vector<ChannelRegisteredMethod>>
old_registered_methods_;
// Map of registered methods.
absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
std::unique_ptr<ChannelRegisteredMethod>,
StringViewStringViewPairHash,
StringViewStringViewPairEq>
registered_methods_;
uint32_t registered_method_max_probes_;
grpc_closure finish_destroy_channel_closure_;
intptr_t channelz_socket_uuid_;
};
@ -412,6 +365,17 @@ class Server : public InternallyRefCounted<Server>,
grpc_cq_completion completion;
};
struct StringViewStringViewPairHash
: absl::flat_hash_set<
std::pair<absl::string_view, absl::string_view>>::hasher {
using is_transparent = void;
};
struct StringViewStringViewPairEq
: std::equal_to<std::pair<absl::string_view, absl::string_view>> {
using is_transparent = void;
};
static void ListenerDestroyDone(void* arg, grpc_error_handle error);
static void DoneShutdownEvent(void* server,
@ -497,7 +461,11 @@ class Server : public InternallyRefCounted<Server>,
bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
CondVar starting_cv_;
std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
// Map of registered methods.
absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
std::unique_ptr<RegisteredMethod>,
StringViewStringViewPairHash, StringViewStringViewPairEq>
registered_methods_;
// Request matcher for unregistered methods.
std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>NSPrivacyTracking</key>
<false/>
<key>NSPrivacyCollectedDataTypes</key>
<array/>
<key>NSPrivacyTrackingDomains</key>
<array/>
<key>NSPrivacyAccessedAPITypes</key>
<array>
<dict>
<key>NSPrivacyAccessedAPIType</key>
<string>NSPrivacyAccessedAPICategoryFileTimestamp</string>
<key>NSPrivacyAccessedAPITypeReasons</key>
<array>
<string>C617.1</string>
</array>
</dict>
</array>
</dict>
</plist>

@ -579,7 +579,7 @@ def _call_behavior(
exception.__traceback__,
)
)
traceback.print_exc()
traceback.print_exc()
_LOGGER.exception(details)
_abort(
state,

@ -34,6 +34,8 @@
:tag => "v#{version}",
}
s.resource = 'src/objective-c/PrivacyInfo.xcprivacy'
name = 'GRPCClient'
s.module_name = name
s.header_dir = name

@ -21,8 +21,8 @@ import collections
import itertools
import json
import os
import pipes
import re
import shlex
import sys
import time
@ -121,7 +121,7 @@ def create_scenario_jobspec(
if bq_result_table:
cmd += 'BQ_RESULT_TABLE="%s" ' % bq_result_table
cmd += "tools/run_tests/performance/run_qps_driver.sh "
cmd += "--scenarios_json=%s " % pipes.quote(
cmd += "--scenarios_json=%s " % shlex.quote(
json.dumps({"scenarios": [scenario_json]})
)
cmd += "--scenario_result_file=scenario_result.json "
@ -135,7 +135,7 @@ def create_scenario_jobspec(
user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host,
pipes.quote(cmd),
shlex.quote(cmd),
)
return jobset.JobSpec(
@ -157,7 +157,7 @@ def create_quit_jobspec(workers, remote_host=None):
user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, remote_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host,
pipes.quote(cmd),
shlex.quote(cmd),
)
return jobset.JobSpec(
@ -192,7 +192,7 @@ def create_netperf_jobspec(
user_at_host = "%s@%s" % (_REMOTE_HOST_USERNAME, client_host)
cmd = 'ssh %s "cd ~/performance_workspace/grpc/ && "%s' % (
user_at_host,
pipes.quote(cmd),
shlex.quote(cmd),
)
return jobset.JobSpec(

@ -26,10 +26,10 @@ import logging
import multiprocessing
import os
import os.path
import pipes
import platform
import random
import re
import shlex
import socket
import subprocess
import sys
@ -479,7 +479,7 @@ class CLanguage(object):
cmdline = [binary] + target["args"]
shortname = target.get(
"shortname",
" ".join(pipes.quote(arg) for arg in cmdline),
" ".join(shlex.quote(arg) for arg in cmdline),
)
shortname += shortname_ext
out.append(

Loading…
Cancel
Save