[promises] Change physical layout of retry filter (#33479)

In preparation for implementing the promise based version, separate out
the legacy call data from the filter.

There are two commits here, each representing one phase of this code
movement:
66676d398c moves `class RetryFilter` into
the header and the vtable name into that class, as this will be shared
code between the implementations
4c84f115ad then moves `class
RetryFilter::CallData` into `class RetryFilterLegacyCallData`, and moves
*that* into its own file

Doing so makes me less confused as to what I'm editing going forward.

No functionality should be affected.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/33653/head
Craig Tiller 2 years ago committed by GitHub
parent 4ce51fe45d
commit 20dfe78d95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      Makefile
  4. 2
      Package.swift
  5. 4
      build_autogenerated.yaml
  6. 1
      config.m4
  7. 1
      config.w32
  8. 2
      gRPC-C++.podspec
  9. 3
      gRPC-Core.podspec
  10. 2
      grpc.gemspec
  11. 2
      grpc.gyp
  12. 2
      package.xml
  13. 2
      src/core/ext/filters/client_channel/client_channel.cc
  14. 2537
      src/core/ext/filters/client_channel/retry_filter.cc
  15. 92
      src/core/ext/filters/client_channel/retry_filter.h
  16. 2052
      src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
  17. 442
      src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h
  18. 1
      src/python/grpcio/grpc_core_dependencies.py
  19. 2
      tools/doxygen/Doxyfile.c++.internal
  20. 2
      tools/doxygen/Doxyfile.core.internal

@ -2955,6 +2955,7 @@ grpc_cc_library(
"//src/core:ext/filters/client_channel/lb_policy/oob_backend_metric.cc",
"//src/core:ext/filters/client_channel/local_subchannel_pool.cc",
"//src/core:ext/filters/client_channel/retry_filter.cc",
"//src/core:ext/filters/client_channel/retry_filter_legacy_call_data.cc",
"//src/core:ext/filters/client_channel/retry_service_config.cc",
"//src/core:ext/filters/client_channel/retry_throttle.cc",
"//src/core:ext/filters/client_channel/service_config_channel_arg_filter.cc",
@ -2980,6 +2981,7 @@ grpc_cc_library(
"//src/core:ext/filters/client_channel/lb_policy/oob_backend_metric_internal.h",
"//src/core:ext/filters/client_channel/local_subchannel_pool.h",
"//src/core:ext/filters/client_channel/retry_filter.h",
"//src/core:ext/filters/client_channel/retry_filter_legacy_call_data.h",
"//src/core:ext/filters/client_channel/retry_service_config.h",
"//src/core:ext/filters/client_channel/retry_throttle.h",
"//src/core:ext/filters/client_channel/subchannel.h",

2
CMakeLists.txt generated

@ -1739,6 +1739,7 @@ add_library(grpc
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
src/core/ext/filters/client_channel/retry_filter.cc
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
src/core/ext/filters/client_channel/retry_service_config.cc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
@ -2771,6 +2772,7 @@ add_library(grpc_unsecure
src/core/ext/filters/client_channel/resolver/polling_resolver.cc
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
src/core/ext/filters/client_channel/retry_filter.cc
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
src/core/ext/filters/client_channel/retry_service_config.cc
src/core/ext/filters/client_channel/retry_throttle.cc
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc

2
Makefile generated

@ -1021,6 +1021,7 @@ LIBGRPC_SRC = \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/retry_filter.cc \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc \
src/core/ext/filters/client_channel/retry_service_config.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \
@ -1906,6 +1907,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/ext/filters/client_channel/resolver/polling_resolver.cc \
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/retry_filter.cc \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc \
src/core/ext/filters/client_channel/retry_service_config.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \

2
Package.swift generated

@ -217,6 +217,8 @@ let package = Package(
"src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h",
"src/core/ext/filters/client_channel/retry_filter.cc",
"src/core/ext/filters/client_channel/retry_filter.h",
"src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc",
"src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h",
"src/core/ext/filters/client_channel/retry_service_config.cc",
"src/core/ext/filters/client_channel/retry_service_config.h",
"src/core/ext/filters/client_channel/retry_throttle.cc",

@ -259,6 +259,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/polling_resolver.h
- src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h
- src/core/ext/filters/client_channel/retry_filter.h
- src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h
- src/core/ext/filters/client_channel/retry_service_config.h
- src/core/ext/filters/client_channel/retry_throttle.h
- src/core/ext/filters/client_channel/subchannel.h
@ -1077,6 +1078,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
- src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc
- src/core/ext/filters/client_channel/retry_filter.cc
- src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
- src/core/ext/filters/client_channel/retry_service_config.cc
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc
@ -1980,6 +1982,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
- src/core/ext/filters/client_channel/resolver/polling_resolver.h
- src/core/ext/filters/client_channel/retry_filter.h
- src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h
- src/core/ext/filters/client_channel/retry_service_config.h
- src/core/ext/filters/client_channel/retry_throttle.h
- src/core/ext/filters/client_channel/subchannel.h
@ -2400,6 +2403,7 @@ libs:
- src/core/ext/filters/client_channel/resolver/polling_resolver.cc
- src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
- src/core/ext/filters/client_channel/retry_filter.cc
- src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc
- src/core/ext/filters/client_channel/retry_service_config.cc
- src/core/ext/filters/client_channel/retry_throttle.cc
- src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc

1
config.m4 generated

@ -100,6 +100,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/retry_filter.cc \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc \
src/core/ext/filters/client_channel/retry_service_config.cc \
src/core/ext/filters/client_channel/retry_throttle.cc \
src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc \

1
config.w32 generated

@ -65,6 +65,7 @@ if (PHP_GRPC != "no") {
"src\\core\\ext\\filters\\client_channel\\resolver\\sockaddr\\sockaddr_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\resolver\\xds\\xds_resolver.cc " +
"src\\core\\ext\\filters\\client_channel\\retry_filter.cc " +
"src\\core\\ext\\filters\\client_channel\\retry_filter_legacy_call_data.cc " +
"src\\core\\ext\\filters\\client_channel\\retry_service_config.cc " +
"src\\core\\ext\\filters\\client_channel\\retry_throttle.cc " +
"src\\core\\ext\\filters\\client_channel\\service_config_channel_arg_filter.cc " +

2
gRPC-C++.podspec generated

@ -291,6 +291,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/polling_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/retry_filter.h',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h',
'src/core/ext/filters/client_channel/retry_service_config.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/subchannel.h',
@ -1354,6 +1355,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/polling_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/retry_filter.h',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h',
'src/core/ext/filters/client_channel/retry_service_config.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/subchannel.h',

3
gRPC-Core.podspec generated

@ -318,6 +318,8 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/retry_filter.cc',
'src/core/ext/filters/client_channel/retry_filter.h',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h',
'src/core/ext/filters/client_channel/retry_service_config.cc',
'src/core/ext/filters/client_channel/retry_service_config.h',
'src/core/ext/filters/client_channel/retry_throttle.cc',
@ -2105,6 +2107,7 @@ Pod::Spec.new do |s|
'src/core/ext/filters/client_channel/resolver/polling_resolver.h',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h',
'src/core/ext/filters/client_channel/retry_filter.h',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h',
'src/core/ext/filters/client_channel/retry_service_config.h',
'src/core/ext/filters/client_channel/retry_throttle.h',
'src/core/ext/filters/client_channel/subchannel.h',

2
grpc.gemspec generated

@ -223,6 +223,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h )
s.files += %w( src/core/ext/filters/client_channel/retry_filter.cc )
s.files += %w( src/core/ext/filters/client_channel/retry_filter.h )
s.files += %w( src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc )
s.files += %w( src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h )
s.files += %w( src/core/ext/filters/client_channel/retry_service_config.cc )
s.files += %w( src/core/ext/filters/client_channel/retry_service_config.h )
s.files += %w( src/core/ext/filters/client_channel/retry_throttle.cc )

2
grpc.gyp generated

@ -325,6 +325,7 @@
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/retry_filter.cc',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc',
'src/core/ext/filters/client_channel/retry_service_config.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',
@ -1150,6 +1151,7 @@
'src/core/ext/filters/client_channel/resolver/polling_resolver.cc',
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/retry_filter.cc',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc',
'src/core/ext/filters/client_channel/retry_service_config.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',

2
package.xml generated

@ -205,6 +205,8 @@
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_filter.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_service_config.cc" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_service_config.h" role="src" />
<file baseinstalldir="/" name="src/core/ext/filters/client_channel/retry_throttle.cc" role="src" />

@ -1499,7 +1499,7 @@ void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
std::vector<const grpc_channel_filter*> filters =
config_selector->GetFilters();
if (enable_retries) {
filters.push_back(&kRetryFilterVtable);
filters.push_back(&RetryFilter::kVtable);
} else {
filters.push_back(&DynamicTerminationFilter::kFilterVtable);
}

File diff suppressed because it is too large Load Diff

@ -19,12 +19,102 @@
#include <grpc/support/port_platform.h>
#include <limits.h>
#include <stddef.h>
#include <new>
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/retry_service_config.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/transport/transport.h"
extern grpc_core::TraceFlag grpc_retry_trace;
namespace grpc_core {
extern const grpc_channel_filter kRetryFilterVtable;
class RetryFilter {
public:
static const grpc_channel_filter kVtable;
private:
// Old filter-stack style call implementation, in
// retry_filter_legacy_call_data.{h,cc}
class LegacyCallData;
grpc_event_engine::experimental::EventEngine* event_engine() const {
return event_engine_;
}
// This value was picked arbitrarily. It can be changed if there is
// any even moderately compelling reason to do so.
static double BackoffJitter() { return 0.2; }
const internal::RetryMethodConfig* GetRetryPolicy(
const grpc_call_context_element* context);
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data() const {
return retry_throttle_data_;
}
ClientChannel* client_channel() const { return client_channel_; }
size_t per_rpc_retry_buffer_size() const {
return per_rpc_retry_buffer_size_;
}
static size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
// By default, we buffer 256 KiB per RPC for retries.
// TODO(roth): Do we have any data to suggest a better value?
static constexpr int kDefaultPerRpcRetryBufferSize = (256 << 10);
return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
.value_or(kDefaultPerRpcRetryBufferSize),
0, INT_MAX);
}
RetryFilter(const ChannelArgs& args, grpc_error_handle* error);
static grpc_error_handle Init(grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &kVtable);
grpc_error_handle error;
new (elem->channel_data) RetryFilter(args->channel_args, &error);
return error;
}
static void Destroy(grpc_channel_element* elem) {
auto* chand = static_cast<RetryFilter*>(elem->channel_data);
chand->~RetryFilter();
}
// Will never be called.
static void StartTransportOp(grpc_channel_element* /*elem*/,
grpc_transport_op* /*op*/) {}
static void GetChannelInfo(grpc_channel_element* /*elem*/,
const grpc_channel_info* /*info*/) {}
ClientChannel* client_channel_;
grpc_event_engine::experimental::EventEngine* const event_engine_;
size_t per_rpc_retry_buffer_size_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
const size_t service_config_parser_index_;
};
} // namespace grpc_core

@ -0,0 +1,442 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H
#define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <stdint.h>
#include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include "src/core/ext/filters/client_channel/client_channel.h"
#include "src/core/ext/filters/client_channel/retry_filter.h"
#include "src/core/ext/filters/client_channel/retry_service_config.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
namespace grpc_core {
class RetryFilter::LegacyCallData {
public:
static grpc_error_handle Init(grpc_call_element* elem,
const grpc_call_element_args* args);
static void Destroy(grpc_call_element* elem,
const grpc_call_final_info* /*final_info*/,
grpc_closure* then_schedule_closure);
static void StartTransportStreamOpBatch(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
private:
class CallStackDestructionBarrier;
// Pending batches stored in call data.
struct PendingBatch {
// The pending batch. If nullptr, this slot is empty.
grpc_transport_stream_op_batch* batch = nullptr;
// Indicates whether payload for send ops has been cached in
// LegacyCallData.
bool send_ops_cached = false;
};
// State associated with each call attempt.
class CallAttempt : public RefCounted<CallAttempt> {
public:
CallAttempt(LegacyCallData* calld, bool is_transparent_retry);
~CallAttempt() override;
bool lb_call_committed() const { return lb_call_committed_; }
// Constructs and starts whatever batches are needed on this call
// attempt.
void StartRetriableBatches();
// Frees cached send ops that have already been completed after
// committing the call.
void FreeCachedSendOpDataAfterCommit();
// Cancels the call attempt.
void CancelFromSurface(grpc_transport_stream_op_batch* cancel_batch);
private:
// State used for starting a retryable batch on the call attempt's LB call.
// This provides its own grpc_transport_stream_op_batch and other data
// structures needed to populate the ops in the batch.
// We allocate one struct on the arena for each attempt at starting a
// batch on a given LB call.
class BatchData
: public RefCounted<BatchData, PolymorphicRefCount, UnrefCallDtor> {
public:
BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
bool set_on_complete);
~BatchData() override;
grpc_transport_stream_op_batch* batch() { return &batch_; }
// Adds retriable send_initial_metadata op.
void AddRetriableSendInitialMetadataOp();
// Adds retriable send_message op.
void AddRetriableSendMessageOp();
// Adds retriable send_trailing_metadata op.
void AddRetriableSendTrailingMetadataOp();
// Adds retriable recv_initial_metadata op.
void AddRetriableRecvInitialMetadataOp();
// Adds retriable recv_message op.
void AddRetriableRecvMessageOp();
// Adds retriable recv_trailing_metadata op.
void AddRetriableRecvTrailingMetadataOp();
// Adds cancel_stream op.
void AddCancelStreamOp(grpc_error_handle error);
private:
// Frees cached send ops that were completed by the completed batch in
// batch_data. Used when batches are completed after the call is
// committed.
void FreeCachedSendOpDataForCompletedBatch();
// If there is a pending recv_initial_metadata op, adds a closure
// to closures for recv_initial_metadata_ready.
void MaybeAddClosureForRecvInitialMetadataCallback(
grpc_error_handle error, CallCombinerClosureList* closures);
// Intercepts recv_initial_metadata_ready callback for retries.
// Commits the call and returns the initial metadata up the stack.
static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
// If there is a pending recv_message op, adds a closure to closures
// for recv_message_ready.
void MaybeAddClosureForRecvMessageCallback(
grpc_error_handle error, CallCombinerClosureList* closures);
// Intercepts recv_message_ready callback for retries.
// Commits the call and returns the message up the stack.
static void RecvMessageReady(void* arg, grpc_error_handle error);
// If there is a pending recv_trailing_metadata op, adds a closure to
// closures for recv_trailing_metadata_ready.
void MaybeAddClosureForRecvTrailingMetadataReady(
grpc_error_handle error, CallCombinerClosureList* closures);
// Adds any necessary closures for deferred batch completion
// callbacks to closures.
void AddClosuresForDeferredCompletionCallbacks(
CallCombinerClosureList* closures);
// For any pending batch containing an op that has not yet been started,
// adds the pending batch's completion closures to closures.
void AddClosuresToFailUnstartedPendingBatches(
grpc_error_handle error, CallCombinerClosureList* closures);
// Runs necessary closures upon completion of a call attempt.
void RunClosuresForCompletedCall(grpc_error_handle error);
// Intercepts recv_trailing_metadata_ready callback for retries.
// Commits the call and returns the trailing metadata up the stack.
static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
// Adds the on_complete closure for the pending batch completed in
// batch_data to closures.
void AddClosuresForCompletedPendingBatch(
grpc_error_handle error, CallCombinerClosureList* closures);
// If there are any cached ops to replay or pending ops to start on the
// LB call, adds them to closures.
void AddClosuresForReplayOrPendingSendOps(
CallCombinerClosureList* closures);
// Callback used to intercept on_complete from LB calls.
static void OnComplete(void* arg, grpc_error_handle error);
// Callback used to handle on_complete for internally generated
// cancel_stream op.
static void OnCompleteForCancelOp(void* arg, grpc_error_handle error);
// This DOES hold a ref, but it cannot be a RefCountedPtr<>, because
// our dtor unrefs the owning call, which may delete the arena in
// which we are allocated, which means that running the dtor of any
// data members after that would cause a crash.
CallAttempt* call_attempt_;
// The batch to use in the LB call.
// Its payload field points to CallAttempt::batch_payload_.
grpc_transport_stream_op_batch batch_;
// For intercepting on_complete.
grpc_closure on_complete_;
};
// Creates a BatchData object on the call's arena with the
// specified refcount. If set_on_complete is true, the batch's
// on_complete callback will be set to point to on_complete();
// otherwise, the batch's on_complete callback will be null.
BatchData* CreateBatch(int refcount, bool set_on_complete) {
return calld_->arena_->New<BatchData>(Ref(DEBUG_LOCATION, "CreateBatch"),
refcount, set_on_complete);
}
// If there are any cached send ops that need to be replayed on this
// call attempt, creates and returns a new batch to replay those ops.
// Otherwise, returns nullptr.
BatchData* MaybeCreateBatchForReplay();
// Adds a closure to closures that will execute batch in the call combiner.
void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
const char* reason,
CallCombinerClosureList* closures);
// Helper function used to start a recv_trailing_metadata batch. This
// is used in the case where a recv_initial_metadata or recv_message
// op fails in a way that we know the call is over but when the application
// has not yet started its own recv_trailing_metadata op.
void AddBatchForInternalRecvTrailingMetadata(
CallCombinerClosureList* closures);
// Adds a batch to closures to cancel this call attempt, if
// cancellation has not already been sent on the LB call.
void MaybeAddBatchForCancelOp(grpc_error_handle error,
CallCombinerClosureList* closures);
// Adds batches for pending batches to closures.
void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
// Adds whatever batches are needed on this attempt to closures.
void AddRetriableBatches(CallCombinerClosureList* closures);
// Returns true if any send op in the batch was not yet started on this
// attempt.
bool PendingBatchContainsUnstartedSendOps(PendingBatch* pending);
// Returns true if there are cached send ops to replay.
bool HaveSendOpsToReplay();
// If our retry state is no longer needed, switch to fast path by moving
// our LB call into calld_->committed_call_ and having calld_ drop
// its ref to us.
void MaybeSwitchToFastPath();
// Returns true if the call should be retried.
bool ShouldRetry(absl::optional<grpc_status_code> status,
absl::optional<Duration> server_pushback_ms);
// Abandons the call attempt. Unrefs any deferred batches.
void Abandon();
void OnPerAttemptRecvTimer();
static void OnPerAttemptRecvTimerLocked(void* arg, grpc_error_handle error);
void MaybeCancelPerAttemptRecvTimer();
LegacyCallData* calld_;
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> lb_call_;
bool lb_call_committed_ = false;
grpc_closure on_per_attempt_recv_timer_;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
per_attempt_recv_timer_handle_;
// BatchData.batch.payload points to this.
grpc_transport_stream_op_batch_payload batch_payload_;
// For send_initial_metadata.
grpc_metadata_batch send_initial_metadata_{calld_->arena_};
// For send_trailing_metadata.
grpc_metadata_batch send_trailing_metadata_{calld_->arena_};
// For intercepting recv_initial_metadata.
grpc_metadata_batch recv_initial_metadata_{calld_->arena_};
grpc_closure recv_initial_metadata_ready_;
bool trailing_metadata_available_ = false;
// For intercepting recv_message.
grpc_closure recv_message_ready_;
absl::optional<SliceBuffer> recv_message_;
uint32_t recv_message_flags_;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata_{calld_->arena_};
grpc_transport_stream_stats collect_stats_;
grpc_closure recv_trailing_metadata_ready_;
// These fields indicate which ops have been started and completed on
// this call attempt.
size_t started_send_message_count_ = 0;
size_t completed_send_message_count_ = 0;
size_t started_recv_message_count_ = 0;
size_t completed_recv_message_count_ = 0;
bool started_send_initial_metadata_ : 1;
bool completed_send_initial_metadata_ : 1;
bool started_send_trailing_metadata_ : 1;
bool completed_send_trailing_metadata_ : 1;
bool started_recv_initial_metadata_ : 1;
bool completed_recv_initial_metadata_ : 1;
bool started_recv_trailing_metadata_ : 1;
bool completed_recv_trailing_metadata_ : 1;
bool sent_cancel_stream_ : 1;
// State for callback processing.
RefCountedPtr<BatchData> recv_initial_metadata_ready_deferred_batch_;
grpc_error_handle recv_initial_metadata_error_;
RefCountedPtr<BatchData> recv_message_ready_deferred_batch_;
grpc_error_handle recv_message_error_;
struct OnCompleteDeferredBatch {
OnCompleteDeferredBatch(RefCountedPtr<BatchData> batch,
grpc_error_handle error)
: batch(std::move(batch)), error(error) {}
RefCountedPtr<BatchData> batch;
grpc_error_handle error;
};
// There cannot be more than 3 pending send op batches at a time.
absl::InlinedVector<OnCompleteDeferredBatch, 3>
on_complete_deferred_batches_;
RefCountedPtr<BatchData> recv_trailing_metadata_internal_batch_;
grpc_error_handle recv_trailing_metadata_error_;
bool seen_recv_trailing_metadata_from_surface_ : 1;
// NOTE: Do not move this next to the metadata bitfields above. That would
// save space but will also result in a data race because compiler
// will generate a 2 byte store which overwrites the meta-data
// fields upon setting this field.
bool abandoned_ : 1;
};
LegacyCallData(RetryFilter* chand, const grpc_call_element_args& args);
~LegacyCallData();
void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
// Returns the index into pending_batches_ to be used for batch.
static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
void PendingBatchClear(PendingBatch* pending);
void MaybeClearPendingBatch(PendingBatch* pending);
static void FailPendingBatchInCallCombiner(void* arg,
grpc_error_handle error);
// Fails all pending batches. Does NOT yield call combiner.
void PendingBatchesFail(grpc_error_handle error);
// Returns a pointer to the first pending batch for which predicate(batch)
// returns true, or null if not found.
template <typename Predicate>
PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
// Caches data for send ops so that it can be retried later, if not
// already cached.
void MaybeCacheSendOpsForBatch(PendingBatch* pending);
void FreeCachedSendInitialMetadata();
// Frees cached send_message at index idx.
void FreeCachedSendMessage(size_t idx);
void FreeCachedSendTrailingMetadata();
void FreeAllCachedSendOpData();
// Commits the call so that no further retry attempts will be performed.
void RetryCommit(CallAttempt* call_attempt);
// Starts a timer to retry after appropriate back-off.
// If server_pushback is nullopt, retry_backoff_ is used.
void StartRetryTimer(absl::optional<Duration> server_pushback);
void OnRetryTimer();
static void OnRetryTimerLocked(void* arg, grpc_error_handle /*error*/);
// Adds a closure to closures to start a transparent retry.
void AddClosureToStartTransparentRetry(CallCombinerClosureList* closures);
static void StartTransparentRetry(void* arg, grpc_error_handle error);
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(absl::AnyInvocable<void()> on_commit,
bool is_transparent_retry);
void CreateCallAttempt(bool is_transparent_retry);
RetryFilter* chand_;
grpc_polling_entity* pollent_;
RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
const internal::RetryMethodConfig* retry_policy_ = nullptr;
BackOff retry_backoff_;
grpc_slice path_; // Request path.
Timestamp deadline_;
Arena* arena_;
grpc_call_stack* owning_call_;
CallCombiner* call_combiner_;
grpc_call_context_element* call_context_;
grpc_error_handle cancelled_from_surface_;
RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
// TODO(roth): As part of implementing hedging, we will need to maintain a
// list of all pending attempts, so that we can cancel them all if the call
// gets cancelled.
RefCountedPtr<CallAttempt> call_attempt_;
// LB call used when we've committed to a call attempt and the retry
// state for that attempt is no longer needed. This provides a fast
// path for long-running streaming calls that minimizes overhead.
OrphanablePtr<ClientChannel::FilterBasedLoadBalancedCall> committed_call_;
// When are are not yet fully committed to a particular call (i.e.,
// either we might still retry or we have committed to the call but
// there are still some cached ops to be replayed on the call),
// batches received from above will be added to this list, and they
// will not be removed until we have invoked their completion callbacks.
size_t bytes_buffered_for_retry_ = 0;
PendingBatch pending_batches_[MAX_PENDING_BATCHES];
bool pending_send_initial_metadata_ : 1;
bool pending_send_message_ : 1;
bool pending_send_trailing_metadata_ : 1;
// Retry state.
bool retry_committed_ : 1;
bool retry_codepath_started_ : 1;
bool sent_transparent_retry_not_seen_by_server_ : 1;
int num_attempts_completed_ = 0;
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
retry_timer_handle_;
grpc_closure retry_closure_;
// Cached data for retrying send ops.
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
// send_message
// When we get a send_message op, we replace the original byte stream
// with a CachingByteStream that caches the slices to a local buffer for
// use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
struct CachedSendMessage {
SliceBuffer* slices;
uint32_t flags;
};
absl::InlinedVector<CachedSendMessage, 3> send_messages_;
// send_trailing_metadata
bool seen_send_trailing_metadata_ = false;
grpc_metadata_batch send_trailing_metadata_{arena_};
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RETRY_FILTER_LEGACY_CALL_DATA_H

@ -74,6 +74,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc',
'src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc',
'src/core/ext/filters/client_channel/retry_filter.cc',
'src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc',
'src/core/ext/filters/client_channel/retry_service_config.cc',
'src/core/ext/filters/client_channel/retry_throttle.cc',
'src/core/ext/filters/client_channel/service_config_channel_arg_filter.cc',

@ -1182,6 +1182,8 @@ src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/retry_filter.cc \
src/core/ext/filters/client_channel/retry_filter.h \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h \
src/core/ext/filters/client_channel/retry_service_config.cc \
src/core/ext/filters/client_channel/retry_service_config.h \
src/core/ext/filters/client_channel/retry_throttle.cc \

@ -992,6 +992,8 @@ src/core/ext/filters/client_channel/resolver/xds/xds_resolver.cc \
src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h \
src/core/ext/filters/client_channel/retry_filter.cc \
src/core/ext/filters/client_channel/retry_filter.h \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.cc \
src/core/ext/filters/client_channel/retry_filter_legacy_call_data.h \
src/core/ext/filters/client_channel/retry_service_config.cc \
src/core/ext/filters/client_channel/retry_service_config.h \
src/core/ext/filters/client_channel/retry_throttle.cc \

Loading…
Cancel
Save