[promises] Convert client load reporting to promises (#31854)

* [promises] Convert client load reporting to promises

* fix

* Automated change: Fix sanity tests

* fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31873/head
Craig Tiller 2 years ago committed by GitHub
parent 91083659fa
commit a6843d4659
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      build_autogenerated.yaml
  2. 4
      gRPC-C++.podspec
  3. 4
      gRPC-Core.podspec
  4. 2
      grpc.gemspec
  5. 2
      package.xml
  6. 4
      src/core/BUILD
  7. 143
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.cc
  8. 15
      src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h
  9. 7
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  10. 2
      tools/doxygen/Doxyfile.c++.internal
  11. 2
      tools/doxygen/Doxyfile.core.internal

@ -888,6 +888,7 @@ libs:
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -896,6 +897,7 @@ libs:
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h
@ -2169,6 +2171,7 @@ libs:
- src/core/lib/promise/activity.h
- src/core/lib/promise/arena_promise.h
- src/core/lib/promise/context.h
- src/core/lib/promise/detail/basic_join.h
- src/core/lib/promise/detail/basic_seq.h
- src/core/lib/promise/detail/promise_factory.h
- src/core/lib/promise/detail/promise_like.h
@ -2177,6 +2180,7 @@ libs:
- src/core/lib/promise/exec_ctx_wakeup_scheduler.h
- src/core/lib/promise/for_each.h
- src/core/lib/promise/intra_activity_waiter.h
- src/core/lib/promise/join.h
- src/core/lib/promise/latch.h
- src/core/lib/promise/loop.h
- src/core/lib/promise/map.h

4
gRPC-C++.podspec generated

@ -871,6 +871,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -879,6 +880,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/join.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
@ -1766,6 +1768,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -1774,6 +1777,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/join.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',

4
gRPC-Core.podspec generated

@ -1400,6 +1400,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -1408,6 +1409,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/join.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',
@ -2405,6 +2407,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/activity.h',
'src/core/lib/promise/arena_promise.h',
'src/core/lib/promise/context.h',
'src/core/lib/promise/detail/basic_join.h',
'src/core/lib/promise/detail/basic_seq.h',
'src/core/lib/promise/detail/promise_factory.h',
'src/core/lib/promise/detail/promise_like.h',
@ -2413,6 +2416,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/exec_ctx_wakeup_scheduler.h',
'src/core/lib/promise/for_each.h',
'src/core/lib/promise/intra_activity_waiter.h',
'src/core/lib/promise/join.h',
'src/core/lib/promise/latch.h',
'src/core/lib/promise/loop.h',
'src/core/lib/promise/map.h',

2
grpc.gemspec generated

@ -1311,6 +1311,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/activity.h )
s.files += %w( src/core/lib/promise/arena_promise.h )
s.files += %w( src/core/lib/promise/context.h )
s.files += %w( src/core/lib/promise/detail/basic_join.h )
s.files += %w( src/core/lib/promise/detail/basic_seq.h )
s.files += %w( src/core/lib/promise/detail/promise_factory.h )
s.files += %w( src/core/lib/promise/detail/promise_like.h )
@ -1319,6 +1320,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/exec_ctx_wakeup_scheduler.h )
s.files += %w( src/core/lib/promise/for_each.h )
s.files += %w( src/core/lib/promise/intra_activity_waiter.h )
s.files += %w( src/core/lib/promise/join.h )
s.files += %w( src/core/lib/promise/latch.h )
s.files += %w( src/core/lib/promise/loop.h )
s.files += %w( src/core/lib/promise/map.h )

2
package.xml generated

@ -1293,6 +1293,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/activity.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/arena_promise.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_join.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/basic_seq.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_factory.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/detail/promise_like.h" role="src" />
@ -1301,6 +1302,7 @@
<file baseinstalldir="/" name="src/core/lib/promise/exec_ctx_wakeup_scheduler.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/for_each.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/intra_activity_waiter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/join.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/latch.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/loop.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/promise/map.h" role="src" />

@ -3440,6 +3440,7 @@ grpc_cc_library(
"error",
"gpr_atm",
"grpc_sockaddr",
"join",
"json",
"json_args",
"json_object_loader",
@ -3449,11 +3450,13 @@ grpc_cc_library(
"pollset_set",
"ref_counted",
"resolved_address",
"seq",
"slice",
"slice_refcount",
"status_helper",
"subchannel_interface",
"time",
"try_concurrently",
"useful",
"validation_errors",
"//:backoff",
@ -3472,6 +3475,7 @@ grpc_cc_library(
"//:grpc_security_base",
"//:grpc_trace",
"//:orphanable",
"//:promise",
"//:protobuf_duration_upb",
"//:protobuf_timestamp_upb",
"//:ref_counted_ptr",

@ -33,118 +33,49 @@
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_concurrently.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
static grpc_error_handle clr_init_channel_elem(
grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
return absl::OkStatus();
}
static void clr_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
namespace {
struct call_data {
// Stats object to update.
grpc_core::RefCountedPtr<grpc_core::GrpcLbClientStats> client_stats;
// State for intercepting send_initial_metadata.
grpc_closure on_complete_for_send;
grpc_closure* original_on_complete_for_send;
bool send_initial_metadata_succeeded = false;
// State for intercepting recv_initial_metadata.
grpc_closure recv_initial_metadata_ready;
grpc_closure* original_recv_initial_metadata_ready;
bool recv_initial_metadata_succeeded = false;
};
namespace grpc_core {
const grpc_channel_filter ClientLoadReportingFilter::kFilter =
MakePromiseBasedFilter<ClientLoadReportingFilter, FilterEndpoint::kClient,
kFilterExaminesServerInitialMetadata>(
"client_load_reporting");
} // namespace
static void on_complete_for_send(void* arg, grpc_error_handle error) {
call_data* calld = static_cast<call_data*>(arg);
if (error.ok()) {
calld->send_initial_metadata_succeeded = true;
}
grpc_core::Closure::Run(DEBUG_LOCATION, calld->original_on_complete_for_send,
error);
absl::StatusOr<ClientLoadReportingFilter> ClientLoadReportingFilter::Create(
const ChannelArgs&, ChannelFilter::Args) {
return ClientLoadReportingFilter();
}
static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
call_data* calld = static_cast<call_data*>(arg);
if (error.ok()) {
calld->recv_initial_metadata_succeeded = true;
}
grpc_core::Closure::Run(DEBUG_LOCATION,
calld->original_recv_initial_metadata_ready, error);
}
static grpc_error_handle clr_init_call_elem(
grpc_call_element* elem, const grpc_call_element_args* args) {
GPR_ASSERT(args->context != nullptr);
new (elem->call_data) call_data();
return absl::OkStatus();
}
static void clr_destroy_call_elem(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* /*ignored*/) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->client_stats != nullptr) {
// Record call finished, optionally setting client_failed_to_send and
// received.
calld->client_stats->AddCallFinished(
!calld->send_initial_metadata_succeeded /* client_failed_to_send */,
calld->recv_initial_metadata_succeeded /* known_received */);
ArenaPromise<ServerMetadataHandle> ClientLoadReportingFilter::MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) {
// Stats object to update.
RefCountedPtr<GrpcLbClientStats> client_stats;
// Handle client initial metadata.
// Grab client stats object from metadata.
auto client_stats_md =
call_args.client_initial_metadata->Take(GrpcLbClientStatsMetadata());
if (client_stats_md.has_value()) {
client_stats.reset(*client_stats_md);
}
calld->~call_data();
}
static void clr_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
call_data* calld = static_cast<call_data*>(elem->call_data);
// Handle send_initial_metadata.
if (batch->send_initial_metadata) {
// Grab client stats object from metadata.
auto client_stats_md =
batch->payload->send_initial_metadata.send_initial_metadata->Take(
grpc_core::GrpcLbClientStatsMetadata());
if (client_stats_md.has_value()) {
grpc_core::GrpcLbClientStats* client_stats = *client_stats_md;
if (client_stats != nullptr) {
calld->client_stats.reset(client_stats);
// Intercept completion.
calld->original_on_complete_for_send = batch->on_complete;
GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send,
calld, grpc_schedule_on_exec_ctx);
batch->on_complete = &calld->on_complete_for_send;
}
}
}
// Intercept completion of recv_initial_metadata.
if (batch->recv_initial_metadata) {
calld->original_recv_initial_metadata_ready =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, calld,
grpc_schedule_on_exec_ctx);
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->recv_initial_metadata_ready;
}
// Chain to next filter.
grpc_call_next_op(elem, batch);
auto* server_initial_metadata = call_args.server_initial_metadata;
return Seq(next_promise_factory(std::move(call_args)),
[server_initial_metadata, client_stats = std::move(client_stats)](
ServerMetadataHandle trailing_metadata) {
if (client_stats != nullptr) {
client_stats->AddCallFinished(
trailing_metadata->get(GrpcStreamNetworkState()) ==
GrpcStreamNetworkState::kNotSentOnWire,
NowOrNever(server_initial_metadata->Wait()).has_value());
}
return trailing_metadata;
});
}
const grpc_channel_filter grpc_client_load_reporting_filter = {
clr_start_transport_stream_op_batch,
nullptr,
grpc_channel_next_op,
sizeof(call_data),
clr_init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
clr_destroy_call_elem,
0, // sizeof(channel_data)
clr_init_channel_elem,
grpc_channel_stack_no_post_init,
clr_destroy_channel_elem,
grpc_channel_next_get_info,
"client_load_reporting"};
} // namespace grpc_core

@ -23,8 +23,21 @@
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/promise_based_filter.h"
extern const grpc_channel_filter grpc_client_load_reporting_filter;
namespace grpc_core {
class ClientLoadReportingFilter : public ChannelFilter {
public:
static const grpc_channel_filter kFilter;
static absl::StatusOr<ClientLoadReportingFilter> Create(
const ChannelArgs& args, ChannelFilter::Args filter_args);
// Construct a promise for one call.
ArenaPromise<ServerMetadataHandle> MakeCallPromise(
CallArgs call_args, NextPromiseFactory next_promise_factory) override;
};
} // namespace grpc_core
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \
*/

@ -1911,12 +1911,7 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
[](ChannelStackBuilder* builder) {
if (builder->channel_args().GetString(GRPC_ARG_LB_POLICY_NAME) ==
"grpclb") {
// TODO(roth): When we get around to re-attempting
// https://github.com/grpc/grpc/pull/16214, we should try to keep
// this filter at the very top of the subchannel stack, since that
// will minimize the number of metadata elements that the filter
// needs to iterate through to find the ClientStats object.
builder->PrependFilter(&grpc_client_load_reporting_filter);
builder->PrependFilter(&ClientLoadReportingFilter::kFilter);
}
return true;
});

@ -2309,6 +2309,7 @@ src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
@ -2317,6 +2318,7 @@ src/core/lib/promise/detail/switch.h \
src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/for_each.h \
src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/join.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \

@ -2087,6 +2087,7 @@ src/core/lib/promise/activity.cc \
src/core/lib/promise/activity.h \
src/core/lib/promise/arena_promise.h \
src/core/lib/promise/context.h \
src/core/lib/promise/detail/basic_join.h \
src/core/lib/promise/detail/basic_seq.h \
src/core/lib/promise/detail/promise_factory.h \
src/core/lib/promise/detail/promise_like.h \
@ -2095,6 +2096,7 @@ src/core/lib/promise/detail/switch.h \
src/core/lib/promise/exec_ctx_wakeup_scheduler.h \
src/core/lib/promise/for_each.h \
src/core/lib/promise/intra_activity_waiter.h \
src/core/lib/promise/join.h \
src/core/lib/promise/latch.h \
src/core/lib/promise/loop.h \
src/core/lib/promise/map.h \

Loading…
Cancel
Save