Revert "[filter-stack] Eliminate flags on initial metadata (#30444)" (#30475)

This reverts commit cd30b2dda1.
pull/30476/head
Craig Tiller 3 years ago committed by GitHub
parent cd30b2dda1
commit 4bc69cbe38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      include/grpc/impl/codegen/grpc_types.h
  2. 33
      src/core/ext/filters/client_channel/client_channel.cc
  3. 7
      src/core/ext/filters/client_channel/retry_filter.cc
  4. 2
      src/core/ext/filters/client_channel/subchannel_stream_client.cc
  5. 72
      src/core/ext/transport/inproc/inproc_transport.cc
  6. 2
      src/core/lib/channel/call_tracer.h
  7. 10
      src/core/lib/surface/call.cc
  8. 5
      src/core/lib/surface/server.cc
  9. 1
      src/core/lib/surface/server.h
  10. 5
      src/core/lib/transport/metadata_batch.cc
  11. 58
      src/core/lib/transport/metadata_batch.h
  12. 7
      src/core/lib/transport/transport.h
  13. 7
      src/cpp/common/channel_filter.h
  14. 3
      src/cpp/ext/filters/census/client_filter.cc
  15. 4
      src/cpp/ext/filters/census/open_census_call_tracer.h
  16. 2
      test/core/end2end/fixtures/proxy.cc
  17. 1
      test/core/end2end/inproc_callback_test.cc
  18. 1
      test/core/end2end/tests/channelz.cc
  19. 1
      test/core/end2end/tests/max_connection_idle.cc
  20. 1
      test/core/end2end/tests/no_error_on_hotpath.cc
  21. 1
      test/core/end2end/tests/no_logging.cc
  22. 1
      test/core/end2end/tests/proxy_auth.cc
  23. 1
      test/core/end2end/tests/retry.cc
  24. 1
      test/core/end2end/tests/retry_disabled.cc
  25. 1
      test/core/end2end/tests/retry_exceeds_buffer_size_in_delay.cc
  26. 1
      test/core/end2end/tests/retry_exceeds_buffer_size_in_initial_batch.cc
  27. 1
      test/core/end2end/tests/retry_exceeds_buffer_size_in_subsequent_batch.cc
  28. 1
      test/core/end2end/tests/retry_non_retriable_status.cc
  29. 1
      test/core/end2end/tests/retry_non_retriable_status_before_recv_trailing_metadata_started.cc
  30. 1
      test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
  31. 1
      test/core/end2end/tests/retry_per_attempt_recv_timeout_on_last_attempt.cc
  32. 1
      test/core/end2end/tests/retry_recv_initial_metadata.cc
  33. 1
      test/core/end2end/tests/retry_recv_message.cc
  34. 1
      test/core/end2end/tests/retry_recv_message_replay.cc
  35. 1
      test/core/end2end/tests/retry_recv_trailing_metadata_error.cc
  36. 1
      test/core/end2end/tests/retry_send_initial_metadata_refs.cc
  37. 1
      test/core/end2end/tests/retry_send_op_fails.cc
  38. 1
      test/core/end2end/tests/retry_send_recv_batch.cc
  39. 1
      test/core/end2end/tests/retry_server_pushback_delay.cc
  40. 1
      test/core/end2end/tests/retry_server_pushback_disabled.cc
  41. 1
      test/core/end2end/tests/retry_streaming.cc
  42. 1
      test/core/end2end/tests/retry_streaming_after_commit.cc
  43. 1
      test/core/end2end/tests/retry_streaming_succeeds_before_replay_finished.cc
  44. 1
      test/core/end2end/tests/retry_throttled.cc
  45. 1
      test/core/end2end/tests/retry_too_many_attempts.cc
  46. 1
      test/core/end2end/tests/retry_transparent_goaway.cc
  47. 2
      test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
  48. 1
      test/core/end2end/tests/retry_transparent_not_sent_on_wire.cc
  49. 1
      test/core/end2end/tests/retry_unref_before_finish.cc
  50. 1
      test/core/end2end/tests/retry_unref_before_recv.cc
  51. 1
      test/core/end2end/tests/simple_request.cc

@ -583,6 +583,8 @@ typedef struct {
grpc_slice method;
grpc_slice host;
gpr_timespec deadline;
uint32_t flags;
void* reserved;
} grpc_call_details;
typedef enum {

@ -2192,13 +2192,17 @@ grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
}
// If the service config set wait_for_ready and the application
// did not explicitly set it, use the value from the service config.
auto* wait_for_ready =
pending_batches_[0]
->payload->send_initial_metadata.send_initial_metadata
->GetOrCreatePointer(WaitForReady());
uint32_t* send_initial_metadata_flags =
&pending_batches_[0]
->payload->send_initial_metadata.send_initial_metadata_flags;
if (method_params->wait_for_ready().has_value() &&
!wait_for_ready->explicitly_set) {
wait_for_ready->value = method_params->wait_for_ready().value();
!(*send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
if (method_params->wait_for_ready().value()) {
*send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
} else {
*send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
}
}
}
// Set the dynamic filter stack.
@ -2306,14 +2310,16 @@ bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
pending_batches_[0]->payload->send_initial_metadata;
grpc_metadata_batch* initial_metadata_batch =
send_initial_metadata.send_initial_metadata;
const uint32_t send_initial_metadata_flags =
send_initial_metadata.send_initial_metadata_flags;
// If we don't yet have a resolver result, we need to queue the call
// until we get one.
if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
// If the resolver returned transient failure before returning the
// first service config, fail any non-wait_for_ready calls.
absl::Status resolver_error = chand->resolver_transient_failure_error_;
if (!resolver_error.ok() &&
!initial_metadata_batch->GetOrCreatePointer(WaitForReady())->value) {
if (!resolver_error.ok() && (send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
chand, this);
@ -2711,7 +2717,8 @@ void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
}
if (batch->send_initial_metadata) {
call_attempt_tracer_->RecordSendInitialMetadata(
batch->payload->send_initial_metadata.send_initial_metadata);
batch->payload->send_initial_metadata.send_initial_metadata,
batch->payload->send_initial_metadata.send_initial_metadata_flags);
peer_string_ = batch->payload->send_initial_metadata.peer_string;
original_send_initial_metadata_on_complete_ = batch->on_complete;
GRPC_CLOSURE_INIT(&send_initial_metadata_on_complete_,
@ -3086,6 +3093,8 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
pending_batches_[0]->payload->send_initial_metadata;
grpc_metadata_batch* initial_metadata_batch =
send_initial_metadata.send_initial_metadata;
const uint32_t send_initial_metadata_flags =
send_initial_metadata.send_initial_metadata_flags;
// Perform LB pick.
LoadBalancingPolicy::PickArgs pick_args;
pick_args.path = path_.as_string_view();
@ -3143,7 +3152,7 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
return false;
},
// FailPick
[this, initial_metadata_batch,
[this, send_initial_metadata_flags,
&error](LoadBalancingPolicy::PickResult::Fail* fail_pick)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
@ -3152,8 +3161,8 @@ bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
}
// If wait_for_ready is false, then the error indicates the RPC
// attempt's final status.
if (!initial_metadata_batch->GetOrCreatePointer(WaitForReady())
->value) {
if ((send_initial_metadata_flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
grpc_error_handle lb_error =
absl_status_to_grpc_error(fail_pick->status);
*error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(

@ -610,6 +610,7 @@ class RetryFilter::CallData {
// send_initial_metadata
bool seen_send_initial_metadata_ = false;
grpc_metadata_batch send_initial_metadata_{arena_};
uint32_t send_initial_metadata_flags_;
// TODO(roth): As part of implementing hedging, we'll probably need to
// have the LB call set a value in CallAttempt and then propagate it
// from CallAttempt to the parent call when we commit. Otherwise, we
@ -1067,6 +1068,8 @@ void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
}
// recv_initial_metadata.
if (batch->recv_initial_metadata) {
// recv_flags is only used on the server side.
GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
batch_data->AddRetriableRecvInitialMetadataOp();
}
// recv_message.
@ -2011,6 +2014,8 @@ void RetryFilter::CallData::CallAttempt::BatchData::
batch_.send_initial_metadata = true;
batch_.payload->send_initial_metadata.send_initial_metadata =
&call_attempt_->send_initial_metadata_;
batch_.payload->send_initial_metadata.send_initial_metadata_flags =
calld->send_initial_metadata_flags_;
batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
}
@ -2362,6 +2367,8 @@ void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
grpc_metadata_batch* send_initial_metadata =
batch->payload->send_initial_metadata.send_initial_metadata;
send_initial_metadata_ = send_initial_metadata->Copy();
send_initial_metadata_flags_ =
batch->payload->send_initial_metadata.send_initial_metadata_flags;
peer_string_ = batch->payload->send_initial_metadata.peer_string;
}
// Set up cache for send_message ops.

@ -244,6 +244,7 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
GPR_ASSERT(GRPC_ERROR_IS_NONE(error));
payload_.send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
payload_.send_initial_metadata.send_initial_metadata_flags = 0;
payload_.send_initial_metadata.peer_string = nullptr;
batch_.send_initial_metadata = true;
// Add send_message op.
@ -258,6 +259,7 @@ void SubchannelStreamClient::CallState::StartCallLocked() {
// Add recv_initial_metadata op.
payload_.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
payload_.recv_initial_metadata.recv_flags = nullptr;
payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
payload_.recv_initial_metadata.peer_string = nullptr;
// recv_initial_metadata_ready callback takes ref, handled manually.

@ -83,7 +83,8 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
bool is_initial);
void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
grpc_metadata_batch* out_md, bool* markfilled);
uint32_t flags, grpc_metadata_batch* out_md,
uint32_t* outflags, bool* markfilled);
void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
absl::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
@ -193,15 +194,18 @@ struct inproc_stream {
// Now transfer from the other side's write_buffer if any to the to_read
// buffer
if (cs->write_buffer_initial_md_filled) {
fill_in_metadata(this, &cs->write_buffer_initial_md,
&to_read_initial_md, &to_read_initial_md_filled);
(void)fill_in_metadata(this, &cs->write_buffer_initial_md,
cs->write_buffer_initial_md_flags,
&to_read_initial_md, &to_read_initial_md_flags,
&to_read_initial_md_filled);
deadline = std::min(deadline, cs->write_buffer_deadline);
cs->write_buffer_initial_md.Clear();
cs->write_buffer_initial_md_filled = false;
}
if (cs->write_buffer_trailing_md_filled) {
fill_in_metadata(this, &cs->write_buffer_trailing_md,
&to_read_trailing_md, &to_read_trailing_md_filled);
(void)fill_in_metadata(this, &cs->write_buffer_trailing_md, 0,
&to_read_trailing_md, nullptr,
&to_read_trailing_md_filled);
cs->write_buffer_trailing_md.Clear();
cs->write_buffer_trailing_md_filled = false;
}
@ -247,6 +251,7 @@ struct inproc_stream {
grpc_core::Arena* arena;
grpc_metadata_batch to_read_initial_md{arena};
uint32_t to_read_initial_md_flags = 0;
bool to_read_initial_md_filled = false;
grpc_metadata_batch to_read_trailing_md{arena};
bool to_read_trailing_md_filled = false;
@ -255,6 +260,7 @@ struct inproc_stream {
// stream is set up but server side stream is not yet set up
grpc_metadata_batch write_buffer_initial_md{arena};
bool write_buffer_initial_md_filled = false;
uint32_t write_buffer_initial_md_flags = 0;
grpc_core::Timestamp write_buffer_deadline =
grpc_core::Timestamp::InfFuture();
grpc_metadata_batch write_buffer_trailing_md{arena};
@ -330,12 +336,15 @@ class CopySink {
} // namespace
void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
grpc_metadata_batch* out_md, bool* markfilled) {
uint32_t flags, grpc_metadata_batch* out_md,
uint32_t* outflags, bool* markfilled) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
log_metadata(metadata, s->t->is_client,
metadata->get_pointer(grpc_core::WaitForReady()) != nullptr);
log_metadata(metadata, s->t->is_client, outflags != nullptr);
}
if (outflags != nullptr) {
*outflags = flags;
}
if (markfilled != nullptr) {
*markfilled = true;
}
@ -444,7 +453,7 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
: &other->to_read_trailing_md;
bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
fill_in_metadata(s, &fake_md, dest, destfilled);
(void)fill_in_metadata(s, &fake_md, 0, dest, nullptr, destfilled);
if (other != nullptr) {
if (GRPC_ERROR_IS_NONE(other->cancel_other_error)) {
@ -466,10 +475,12 @@ void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
fake_md.Set(grpc_core::HttpAuthorityMetadata(),
grpc_core::Slice::FromStaticString("inproc-fail"));
fill_in_metadata(s, &fake_md,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
nullptr);
(void)fill_in_metadata(
s, &fake_md, 0,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
nullptr);
err = GRPC_ERROR_NONE;
} else {
err = GRPC_ERROR_REF(error);
@ -643,10 +654,11 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
goto done;
} else {
if (!other || !other->closed) {
fill_in_metadata(s,
s->send_trailing_md_op->payload->send_trailing_metadata
.send_trailing_metadata,
dest, destfilled);
(void)fill_in_metadata(
s,
s->send_trailing_md_op->payload->send_trailing_metadata
.send_trailing_metadata,
0, dest, nullptr, destfilled);
}
s->trailing_md_sent = true;
if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
@ -690,10 +702,12 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->to_read_initial_md_filled) {
s->initial_md_recvd = true;
fill_in_metadata(s, &s->to_read_initial_md,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
nullptr);
fill_in_metadata(
s, &s->to_read_initial_md, s->to_read_initial_md_flags,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
nullptr);
if (s->deadline != grpc_core::Timestamp::InfFuture()) {
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
@ -773,10 +787,10 @@ void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
if (s->recv_trailing_md_op != nullptr) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
fill_in_metadata(s, &s->to_read_trailing_md,
fill_in_metadata(s, &s->to_read_trailing_md, 0,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata,
nullptr);
nullptr, nullptr);
s->to_read_trailing_md.Clear();
s->to_read_trailing_md_filled = false;
@ -887,7 +901,7 @@ bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
: &other->to_read_trailing_md;
bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
fill_in_metadata(s, &cancel_md, dest, destfilled);
(void)fill_in_metadata(s, &cancel_md, 0, dest, nullptr, destfilled);
if (other != nullptr) {
if (GRPC_ERROR_IS_NONE(other->cancel_other_error)) {
@ -981,6 +995,9 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_metadata_batch* dest = (other == nullptr)
? &s->write_buffer_initial_md
: &other->to_read_initial_md;
uint32_t* destflags = (other == nullptr)
? &s->write_buffer_initial_md_flags
: &other->to_read_initial_md_flags;
bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
: &other->to_read_initial_md_filled;
if (*destfilled || s->initial_md_sent) {
@ -989,9 +1006,10 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
} else {
if (!s->other_side_closed) {
fill_in_metadata(
s, op->payload->send_initial_metadata.send_initial_metadata, dest,
destfilled);
(void)fill_in_metadata(
s, op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
dest, destflags, destfilled);
}
if (s->t->is_client) {
grpc_core::Timestamp* dl =

@ -50,7 +50,7 @@ class CallTracer {
// Please refer to `grpc_transport_stream_op_batch_payload` for details on
// arguments.
virtual void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) = 0;
grpc_metadata_batch* send_initial_metadata, uint32_t flags) = 0;
// TODO(yashkt): We are using gpr_atm here instead of absl::string_view
// since that's what the transport API uses, and performing an atomic load
// is unnecessary if the census tracer does not need it at present. Fix this

@ -1415,16 +1415,10 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
if (is_client() && send_deadline() != Timestamp::InfFuture()) {
send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline());
}
if (is_client()) {
send_initial_metadata_.Set(
WaitForReady(),
WaitForReady::ValueType{
(op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
(op->flags &
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
}
stream_op_payload->send_initial_metadata.send_initial_metadata =
&send_initial_metadata_;
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
if (is_client()) {
stream_op_payload->send_initial_metadata.peer_string = &peer_string_;
}

@ -80,6 +80,7 @@ struct Server::RequestedCall {
cq_bound_to_call(call_cq),
call(call_arg),
initial_metadata(initial_md) {
details->reserved = nullptr;
data.batch.details = details;
}
@ -1249,6 +1250,7 @@ void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {
grpc_slice_ref_internal(path_->c_slice());
rc->data.batch.details->deadline =
deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
rc->data.batch.details->flags = recv_initial_metadata_flags_;
break;
case RequestedCall::Type::REGISTERED_CALL:
*rc->data.registered.deadline =
@ -1347,12 +1349,15 @@ void Server::CallData::RecvInitialMetadataBatchComplete(
void Server::CallData::StartTransportStreamOpBatchImpl(
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
if (batch->recv_initial_metadata) {
GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
recv_initial_metadata_ =
batch->payload->recv_initial_metadata.recv_initial_metadata;
original_recv_initial_metadata_ready_ =
batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&recv_initial_metadata_ready_;
batch->payload->recv_initial_metadata.recv_flags =
&recv_initial_metadata_flags_;
}
if (batch->recv_trailing_metadata) {
original_recv_trailing_metadata_ready_ =

@ -341,6 +341,7 @@ class Server : public InternallyRefCounted<Server>,
grpc_closure recv_initial_metadata_batch_complete_;
grpc_metadata_batch* recv_initial_metadata_ = nullptr;
uint32_t recv_initial_metadata_flags_ = 0;
grpc_closure recv_initial_metadata_ready_;
grpc_closure* original_recv_initial_metadata_ready_;
grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;

@ -284,9 +284,4 @@ const std::string& GrpcStatusContext::DisplayValue(const std::string& x) {
return x;
}
std::string WaitForReady::DisplayValue(ValueType x) {
return absl::StrCat(x.value ? "true" : "false",
x.explicitly_set ? " (explicit)" : "");
}
} // namespace grpc_core

@ -387,17 +387,6 @@ struct GrpcStatusContext {
static const std::string& DisplayValue(const std::string& x);
};
// Annotation added by client surface code to denote wait-for-ready state
struct WaitForReady {
struct ValueType {
bool value = false;
bool explicitly_set = false;
};
static absl::string_view DebugKey() { return "WaitForReady"; }
static constexpr bool kRepeatable = false;
static std::string DisplayValue(ValueType x);
};
namespace metadata_detail {
// Build a key/value formatted debug string.
@ -688,10 +677,6 @@ struct Value<Which, absl::enable_if_t<Which::kRepeatable == false &&
void EncodeTo(Encoder* encoder) const {
encoder->Encode(Which(), value);
}
template <typename Encoder>
void VisitWith(Encoder* encoder) const {
return EncodeTo(encoder);
}
void LogTo(LogFn log_fn) const {
LogKeyValueTo(Which::key(), value, Which::Encode, log_fn);
}
@ -716,10 +701,6 @@ struct Value<Which, absl::enable_if_t<Which::kRepeatable == false &&
}
template <typename Encoder>
void EncodeTo(Encoder*) const {}
template <typename Encoder>
void VisitWith(Encoder* encoder) const {
encoder->Encode(Which(), value);
}
void LogTo(LogFn log_fn) const {
LogKeyValueTo(Which::DebugKey(), value, Which::DisplayValue, log_fn);
}
@ -751,10 +732,6 @@ struct Value<Which, absl::enable_if_t<Which::kRepeatable == true &&
encoder->Encode(Which(), v);
}
}
template <typename Encoder>
void VisitWith(Encoder* encoder) const {
return EncodeTo(encoder);
}
void LogTo(LogFn log_fn) const {
for (const auto& v : value) {
LogKeyValueTo(Which::key(), v, Which::Encode, log_fn);
@ -784,12 +761,6 @@ struct Value<Which, absl::enable_if_t<Which::kRepeatable == true &&
}
template <typename Encoder>
void EncodeTo(Encoder*) const {}
template <typename Encoder>
void VisitWith(Encoder* encoder) const {
for (const auto& v : value) {
encoder->Encode(Which(), v);
}
}
void LogTo(LogFn log_fn) const {
for (const auto& v : value) {
LogKeyValueTo(Which::DebugKey(), v, Which::DisplayValue, log_fn);
@ -834,17 +805,6 @@ struct EncodeWrapper {
}
};
// Callable for the table ForEach in ForEach() -- for each value, call the
// appropriate visitor method.
template <typename Encoder>
struct ForEachWrapper {
Encoder* encoder;
template <typename Which>
void operator()(const Value<Which>& which) {
which.VisitWith(encoder);
}
};
// Callable for the ForEach in Log()
struct LogWrapper {
LogFn log_fn;
@ -1042,7 +1002,10 @@ class MetadataMap {
// void Encode(TraitsType, typename TraitsType::ValueType value);
// For fields for which we do not have traits, this will be a method
// with the signature:
// void Encode(string_view key, Slice value);
// void Encode(grpc_mdelem md);
// TODO(ctiller): It's expected that the latter Encode method will
// become Encode(Slice, Slice) by the end of the current metadata API
// transitions.
template <typename Encoder>
void Encode(Encoder* encoder) const {
table_.ForEach(metadata_detail::EncodeWrapper<Encoder>{encoder});
@ -1051,15 +1014,6 @@ class MetadataMap {
}
}
// Like Encode, but also visit the non-encodable fields.
template <typename Encoder>
void ForEach(Encoder* encoder) const {
table_.ForEach(metadata_detail::ForEachWrapper<Encoder>{encoder});
for (const auto& unk : unknown_) {
encoder->Encode(unk.first, unk.second);
}
}
// Similar to Encode, but targeted at logging: for each metadatum,
// call f(key, value) as absl::string_views.
void Log(metadata_detail::LogFn log_fn) const {
@ -1272,7 +1226,7 @@ template <typename Derived, typename... Traits>
Derived MetadataMap<Derived, Traits...>::Copy() const {
Derived out(unknown_.arena());
metadata_detail::CopySink<Derived> sink(&out);
ForEach(&sink);
Encode(&sink);
return out;
}
@ -1299,7 +1253,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::LbCostBinMetadata, grpc_core::LbTokenMetadata,
// Non-encodable things
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::WaitForReady>;
grpc_core::GrpcStatusContext>;
struct grpc_metadata_batch : public grpc_metadata_batch_base {
using grpc_metadata_batch_base::grpc_metadata_batch_base;

@ -344,6 +344,9 @@ struct grpc_transport_stream_op_batch_payload {
: context(context) {}
struct {
grpc_metadata_batch* send_initial_metadata = nullptr;
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags = 0;
// If non-NULL, will be set by the transport to the peer string (a char*).
// The transport retains ownership of the string.
// Note: This pointer may be used by the transport after the
@ -385,6 +388,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
grpc_metadata_batch* recv_initial_metadata = nullptr;
// Flags are used only on the server side. If non-null, will be set to
// a bitfield of the GRPC_INITIAL_METADATA_xxx macros (e.g., to
// indicate if the call is idempotent).
uint32_t* recv_flags = nullptr;
/** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure* recv_initial_metadata_ready = nullptr;
// If not NULL, will be set to true if trailing metadata is

@ -20,6 +20,7 @@
#define GRPCXX_CHANNEL_FILTER_H
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <new>
@ -137,6 +138,12 @@ class TransportStreamOpBatch {
return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr;
}
uint32_t* send_initial_metadata_flags() const {
return op_->send_initial_metadata ? &op_->payload->send_initial_metadata
.send_initial_metadata_flags
: nullptr;
}
grpc_closure* recv_initial_metadata_ready() const {
return op_->recv_initial_metadata
? op_->payload->recv_initial_metadata.recv_initial_metadata_ready

@ -117,7 +117,8 @@ OpenCensusCallTracer::OpenCensusCallAttemptTracer::OpenCensusCallAttemptTracer(
}
void OpenCensusCallTracer::OpenCensusCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
uint32_t /*flags*/) {
char tracing_buf[kMaxTraceContextLen];
size_t tracing_len = TraceContextSerialize(context_.Context(), tracing_buf,
kMaxTraceContextLen);

@ -52,8 +52,8 @@ class OpenCensusCallTracer : public grpc_core::CallTracer {
OpenCensusCallAttemptTracer(OpenCensusCallTracer* parent,
uint64_t attempt_num, bool is_transparent_retry,
bool arena_allocated);
void RecordSendInitialMetadata(
grpc_metadata_batch* send_initial_metadata) override;
void RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata,
uint32_t /*flags*/) override;
void RecordOnDoneSendInitialMetadata(gpr_atm* /*peer_string*/) override {}
void RecordSendTrailingMetadata(
grpc_metadata_batch* /*send_trailing_metadata*/) override {}

@ -382,7 +382,7 @@ static void on_new_call(void* arg, int success) {
GPR_ASSERT(err == GRPC_CALL_OK);
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.flags = 0;
op.flags = proxy->new_call_details.flags;
op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
refpc(pc, "on_p2s_sent_initial_metadata");

@ -415,6 +415,7 @@ static void simple_request_body(grpc_end2end_test_config /* config */,
GPR_ASSERT(nullptr != strstr(error_string, "grpc_message"));
GPR_ASSERT(nullptr != strstr(error_string, "grpc_status"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -186,6 +186,7 @@ static void run_one_request(grpc_end2end_test_config /*config*/,
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);

@ -151,6 +151,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -190,6 +190,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(GRPC_SLICE_LENGTH(details) == 0);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -221,6 +221,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -194,6 +194,7 @@ static void simple_request_body(grpc_end2end_test_config /*config*/,
GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -293,6 +293,7 @@ static void test_retry(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -226,6 +226,7 @@ static void test_retry_disabled(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -284,6 +284,7 @@ static void test_retry_exceeds_buffer_size_in_delay(
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "message2"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -229,6 +229,7 @@ static void test_retry_exceeds_buffer_size_in_initial_batch(
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -243,6 +243,7 @@ static void test_retry_exceeds_buffer_size_in_subsequent_batch(
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -222,6 +222,7 @@ static void test_retry_non_retriable_status(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -236,6 +236,7 @@ test_retry_non_retriable_status_before_recv_trailing_metadata_started(
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -322,6 +322,7 @@ static void test_retry_per_attempt_recv_timeout(
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -237,6 +237,7 @@ static void test_retry_per_attempt_recv_timeout_on_last_attempt(
GPR_ASSERT(
0 == grpc_slice_str_cmp(details, "retry perAttemptRecvTimeout exceeded"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);

@ -244,6 +244,7 @@ static void test_retry_recv_initial_metadata(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -226,6 +226,7 @@ static void test_retry_recv_message(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -247,6 +247,7 @@ static void test_retry_recv_message_replay(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -247,6 +247,7 @@ static void test_retry_recv_trailing_metadata_error(
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "injected error"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -329,6 +329,7 @@ static void test_retry_send_initial_metadata_refs(
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -248,6 +248,7 @@ static void test_retry_send_op_fails(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
// Make sure the "grpc-previous-rpc-attempts" header was sent in the retry.

@ -222,6 +222,7 @@ static void test_retry_send_recv_batch(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -293,6 +293,7 @@ static void test_retry_server_pushback_delay(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "message2"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -272,6 +272,7 @@ static void test_retry_server_pushback_disabled(
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -400,6 +400,7 @@ static void test_retry_streaming(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(channelz_channel != nullptr);

@ -316,6 +316,7 @@ static void test_retry_streaming_after_commit(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -369,6 +369,7 @@ static void test_retry_streaming_succeeds_before_replay_finished(
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -229,6 +229,7 @@ static void test_retry_throttled(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -265,6 +265,7 @@ static void test_retry_too_many_attempts(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_ABORTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

@ -235,6 +235,7 @@ static void test_retry_transparent_goaway(grpc_end2end_test_config config) {
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(

@ -167,6 +167,7 @@ static void test_retry_transparent_max_concurrent_streams(
cqv.Expect(tag(101), true);
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
grpc_metadata_array_destroy(&request_metadata_recv);
@ -284,6 +285,7 @@ static void test_retry_transparent_max_concurrent_streams(
cqv.Expect(tag(201), true);
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
grpc_call_details_destroy(&call_details);
// Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
// we don't do that for transparent retries.

@ -236,6 +236,7 @@ static void test_retry_transparent_not_sent_on_wire(
GPR_ASSERT(status == GRPC_STATUS_OK);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
GPR_ASSERT(

@ -207,6 +207,7 @@ static void test_retry_unref_before_finish(grpc_end2end_test_config config) {
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_call_unref(s);

@ -212,6 +212,7 @@ static void test_retry_unref_before_recv(grpc_end2end_test_config config) {
cqv.Verify();
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
GPR_ASSERT(0 == call_details.flags);
// Note: Not checking the value of was_cancelled here, because it will
// be flaky, depending on whether the server sent its response before
// the client sent its cancellation.

@ -226,6 +226,7 @@ static void simple_request_body(grpc_end2end_test_config config,
GPR_ASSERT(nullptr != strstr(error_string, "grpc_message"));
GPR_ASSERT(nullptr != strstr(error_string, "grpc_status"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
GPR_ASSERT(0 == call_details.flags);
GPR_ASSERT(was_cancelled == 0);
grpc_slice_unref(details);

Loading…
Cancel
Save