[promises] Run C++ end to end tests with server promises (#32537)

Expand server promises to run with C++ end2end tests.

Across connected_channel/call/batch_builder/pipe/transport:
- fix a bug where read errors weren't propagated from transport to call
so that we can populate failed_before_recv_message for the c++ bindings
- ensure those errors are not, however, used to populate the returned
call status

Add a new latch call arg to lazily propagate the bound CQ for a server
call (and client call, but here it's used degenerately - it's always
populated). This allows server calls to be properly bound to
pollsets.(1)/(2)

In call.cc:
- move some profiling code from FilterStackCall to Call, and then use it
in PromiseBasedCall (this should be cleaned up with tracing work)
- implement GetServerAuthority

In server.cc:
- use an RAII pattern on `MatchResult` to avoid a bug whereby a tag
could be dropped if we cancel a request after it's been matched but
before it's published
- fix deadline export to ServerContext

In resource_quota_server.cc:
- fix some long standing flakes (that were finally obvious with the new
test code) - it's legal here to have client calls not arrive at the
server due to resource starvation, work through that (includes adding
expectations during a `Step` call, which required some small tweaks to
cq_verifier)

In the C++ end2end_test.cc:
- strengthen a flaky test so it passes consistently (it's likely we'll
revisit this with the fuzzing efforts to strengthen it into an actually
robust test)

(1) It's time to remove this concept
(2) Surprisingly the only test that *reliably* demonstrates this not
being done is time_change_test

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/32828/head
Craig Tiller 2 years ago committed by GitHub
parent c8d0110a10
commit 63c094cf5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      bazel/experiments.bzl
  2. 109
      src/core/lib/channel/connected_channel.cc
  3. 10
      src/core/lib/channel/promise_based_filter.cc
  4. 2
      src/core/lib/experiments/experiments.yaml
  5. 7
      src/core/lib/promise/pipe.h
  6. 91
      src/core/lib/surface/call.cc
  7. 6
      src/core/lib/surface/call.h
  8. 3
      src/core/lib/surface/completion_queue.cc
  9. 66
      src/core/lib/surface/server.cc
  10. 10
      src/core/lib/transport/batch_builder.h
  11. 3
      src/core/lib/transport/transport.h
  12. 12
      test/core/end2end/cq_verifier.cc
  13. 72
      test/core/end2end/tests/resource_quota_server.cc
  14. 5
      test/core/filters/filter_fuzzer.cc
  15. 2
      test/core/filters/filter_test.cc
  16. 30
      test/core/promise/pipe_test.cc
  17. 67
      test/cpp/end2end/BUILD
  18. 66
      test/cpp/end2end/end2end_test.cc
  19. 6
      test/cpp/end2end/test_service_impl.h
  20. 19
      test/cpp/end2end/xds/BUILD

@ -27,6 +27,9 @@ EXPERIMENTS = {
"promise_based_client_call",
"promise_based_server_call",
],
"cpp_end2end_test": [
"promise_based_server_call",
],
"endpoint_test": [
"tcp_frame_size_tuning",
"tcp_rcv_lowat",
@ -50,6 +53,9 @@ EXPERIMENTS = {
"memory_pressure_controller",
"unconstrained_max_quota_buffer_size",
],
"xds_end2end_test": [
"promise_based_server_call",
],
},
"on": {
"flow_control_test": [

@ -56,8 +56,8 @@
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/detail/basic_join.h"
#include "src/core/lib/promise/detail/basic_seq.h"
#include "src/core/lib/promise/detail/status.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/latch.h"
@ -69,7 +69,6 @@
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/race.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/promise/try_join.h"
#include "src/core/lib/promise/try_seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
@ -321,7 +320,8 @@ class ConnectedChannelStream : public Orphanable {
}
// Returns a promise that implements the receive message loop.
auto RecvMessages(PipeSender<MessageHandle>* incoming_messages);
auto RecvMessages(PipeSender<MessageHandle>* incoming_messages,
bool cancel_on_error);
// Returns a promise that implements the send message loop.
auto SendMessages(PipeReceiver<MessageHandle>* outgoing_messages);
@ -374,12 +374,12 @@ class ConnectedChannelStream : public Orphanable {
};
auto ConnectedChannelStream::RecvMessages(
PipeSender<MessageHandle>* incoming_messages) {
return Loop([self = InternalRef(),
PipeSender<MessageHandle>* incoming_messages, bool cancel_on_error) {
return Loop([self = InternalRef(), cancel_on_error,
incoming_messages = std::move(*incoming_messages)]() mutable {
return Seq(
GetContext<BatchBuilder>()->ReceiveMessage(self->batch_target()),
[&incoming_messages](
[cancel_on_error, &incoming_messages](
absl::StatusOr<absl::optional<MessageHandle>> status) mutable {
bool has_message = status.ok() && status->has_value();
auto publish_message = [&incoming_messages, &status]() {
@ -405,7 +405,8 @@ auto ConnectedChannelStream::RecvMessages(
return Continue{};
});
};
auto publish_close = [&status]() mutable {
auto publish_close = [cancel_on_error, &incoming_messages,
&status]() mutable {
if (grpc_call_trace.enabled()) {
gpr_log(GPR_INFO,
"%s[connected] RecvMessage: reached end of stream with "
@ -413,6 +414,9 @@ auto ConnectedChannelStream::RecvMessages(
Activity::current()->DebugTag().c_str(),
status.status().ToString().c_str());
}
if (cancel_on_error && !status.ok()) {
incoming_messages.CloseWithError();
}
return Immediate(LoopCtl<absl::Status>(status.status()));
};
return If(has_message, std::move(publish_message),
@ -442,9 +446,13 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(
grpc_transport_init_stream(transport, stream->stream(),
stream->stream_refcount(), nullptr,
GetContext<Arena>());
grpc_transport_set_pops(transport, stream->stream(),
GetContext<CallContext>()->polling_entity());
auto* party = static_cast<Party*>(Activity::current());
party->Spawn(
"set_polling_entity", call_args.polling_entity->Wait(),
[transport,
stream = stream->InternalRef()](grpc_polling_entity polling_entity) {
grpc_transport_set_pops(transport, stream->stream(), &polling_entity);
});
// Start a loop to send messages from client_to_server_messages to the
// transport. When the pipe closes and the loop completes, send a trailing
// metadata batch to close the stream.
@ -523,14 +531,44 @@ ArenaPromise<ServerMetadataHandle> MakeClientCallPromise(
// complete (or one fails).
// Next: receive trailing metadata, and return that up the stack.
auto recv_messages =
stream->RecvMessages(call_args.server_to_client_messages);
return Map(TrySeq(TryJoin(std::move(send_initial_metadata),
std::move(recv_messages)),
std::move(recv_trailing_metadata)),
[stream = std::move(stream)](ServerMetadataHandle result) {
stream->set_finished();
return result;
});
stream->RecvMessages(call_args.server_to_client_messages, false);
return Map(
[send_initial_metadata = std::move(send_initial_metadata),
recv_messages = std::move(recv_messages),
recv_trailing_metadata = std::move(recv_trailing_metadata),
done_send_initial_metadata = false, done_recv_messages = false,
done_recv_trailing_metadata =
false]() mutable -> Poll<ServerMetadataHandle> {
if (!done_send_initial_metadata) {
auto p = send_initial_metadata();
if (auto* r = p.value_if_ready()) {
done_send_initial_metadata = true;
if (!r->ok()) return StatusCast<ServerMetadataHandle>(*r);
}
}
if (!done_recv_messages) {
auto p = recv_messages();
if (auto* r = p.value_if_ready()) {
// NOTE: ignore errors here, they'll be collected in the
// recv_trailing_metadata.
done_recv_messages = true;
} else {
return Pending{};
}
}
if (!done_recv_trailing_metadata) {
auto p = recv_trailing_metadata();
if (auto* r = p.value_if_ready()) {
done_recv_trailing_metadata = true;
return std::move(*r);
}
}
return Pending{};
},
[stream = std::move(stream)](ServerMetadataHandle result) {
stream->set_finished();
return result;
});
}
#endif
@ -547,9 +585,6 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
transport, stream->stream(), stream->stream_refcount(),
GetContext<CallContext>()->server_call_context()->server_stream_data(),
GetContext<Arena>());
grpc_transport_set_pops(transport, stream->stream(),
GetContext<CallContext>()->polling_entity());
auto* party = static_cast<Party*>(Activity::current());
// Arifacts we need for the lifetime of the call.
@ -558,11 +593,19 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
Pipe<MessageHandle> client_to_server;
Pipe<ServerMetadataHandle> server_initial_metadata;
Latch<ServerMetadataHandle> failure_latch;
Latch<grpc_polling_entity> polling_entity_latch;
bool sent_initial_metadata = false;
bool sent_trailing_metadata = false;
};
auto* call_data = GetContext<Arena>()->ManagedNew<CallData>();
party->Spawn(
"set_polling_entity", call_data->polling_entity_latch.Wait(),
[transport,
stream = stream->InternalRef()](grpc_polling_entity polling_entity) {
grpc_transport_set_pops(transport, stream->stream(), &polling_entity);
});
auto server_to_client_empty =
call_data->server_to_client.receiver.AwaitEmpty();
@ -580,6 +623,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
auto call_promise = next_promise_factory(CallArgs{
std::move(client_initial_metadata),
ClientInitialMetadataOutstandingToken::Empty(),
&call_data->polling_entity_latch,
&call_data->server_initial_metadata.sender,
&call_data->client_to_server.receiver,
&call_data->server_to_client.sender,
@ -680,7 +724,7 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
"recv_messages",
Race(
Map(stream->WaitFinished(), [](Empty) { return absl::OkStatus(); }),
Map(stream->RecvMessages(&call_data->client_to_server.sender),
Map(stream->RecvMessages(&call_data->client_to_server.sender, true),
[failure_latch = &call_data->failure_latch](absl::Status status) {
if (!status.ok() && !failure_latch->is_set()) {
failure_latch->Set(ServerMetadataFromStatus(status));
@ -769,12 +813,23 @@ ArenaPromise<ServerMetadataHandle> MakeServerCallPromise(
// (allowing the call code to decide on what signalling to give the
// application).
return Map(Seq(std::move(recv_initial_metadata_then_run_promise),
std::move(send_trailing_metadata)),
[stream = std::move(stream)](ServerMetadataHandle md) {
stream->set_finished();
return md;
});
struct CleanupPollingEntityLatch {
void operator()(Latch<grpc_polling_entity>* latch) {
if (!latch->is_set()) latch->Set(grpc_polling_entity());
}
};
auto cleanup_polling_entity_latch =
std::unique_ptr<Latch<grpc_polling_entity>, CleanupPollingEntityLatch>(
&call_data->polling_entity_latch);
return Map(
Seq(std::move(recv_initial_metadata_then_run_promise),
std::move(send_trailing_metadata)),
[cleanup_polling_entity_latch = std::move(cleanup_polling_entity_latch),
stream = std::move(stream)](ServerMetadataHandle md) {
stream->set_finished();
return md;
});
}
#endif

@ -16,8 +16,6 @@
#include "src/core/lib/channel/promise_based_filter.h"
#include <inttypes.h>
#include <algorithm>
#include <initializer_list>
#include <memory>
@ -245,10 +243,6 @@ void BaseCallData::CapturedBatch::CancelWith(grpc_error_handle error,
auto* batch = std::exchange(batch_, nullptr);
GPR_ASSERT(batch != nullptr);
uintptr_t& refcnt = *RefCountField(batch);
gpr_log(GPR_DEBUG, "%sCancelWith: %p refs=%" PRIdPTR " err=%s [%s]",
releaser->call()->DebugTag().c_str(), batch, refcnt,
error.ToString().c_str(),
grpc_transport_stream_op_batch_string(batch, false).c_str());
if (refcnt == 0) {
// refcnt==0 ==> cancelled
if (grpc_trace_channel.enabled()) {
@ -1583,7 +1577,7 @@ void ClientCallData::StartPromise(Flusher* flusher) {
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(send_initial_metadata_batch_->payload
->send_initial_metadata.send_initial_metadata),
std::move(initial_metadata_outstanding_token_),
std::move(initial_metadata_outstanding_token_), nullptr,
server_initial_metadata_pipe() == nullptr
? nullptr
: &server_initial_metadata_pipe()->sender,
@ -2373,7 +2367,7 @@ void ServerCallData::RecvInitialMetadataReady(grpc_error_handle error) {
FakeActivity(this).Run([this, filter] {
promise_ = filter->MakeCallPromise(
CallArgs{WrapMetadata(recv_initial_metadata_),
ClientInitialMetadataOutstandingToken::Empty(),
ClientInitialMetadataOutstandingToken::Empty(), nullptr,
server_initial_metadata_pipe() == nullptr
? nullptr
: &server_initial_metadata_pipe()->sender,

@ -119,7 +119,7 @@
default: false
expiry: 2023/06/01
owner: ctiller@google.com
test_tags: ["core_end2end_test"]
test_tags: ["core_end2end_test", "cpp_end2end_test", "xds_end2end_test"]
- name: transport_supplies_client_latency
description:
If set, use the transport represented value for client latency in opencensus

@ -477,6 +477,13 @@ class PipeSender {
}
}
void CloseWithError() {
if (center_ != nullptr) {
center_->MarkCancelled();
center_.reset();
}
}
void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
// Send a single message along the pipe.

@ -242,6 +242,8 @@ class Call : public CppImplOf<Call, grpc_call> {
void HandleCompressionAlgorithmNotAccepted(
grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
gpr_cycle_counter start_time() const { return start_time_; }
private:
RefCountedPtr<Channel> channel_;
Arena* const arena_;
@ -263,6 +265,7 @@ class Call : public CppImplOf<Call, grpc_call> {
// of the recv_initial_metadata op. The mutex should be mostly uncontended.
mutable Mutex peer_mu_;
Slice peer_string_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
};
Call::ParentCall* Call::GetOrCreateParentCall() {
@ -706,7 +709,6 @@ class FilterStackCall final : public Call {
CallCombiner call_combiner_;
grpc_completion_queue* cq_;
grpc_polling_entity pollent_;
gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
/// has grpc_call_unref been called
bool destroy_called_ = false;
@ -868,7 +870,7 @@ grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
grpc_call_element_args call_args = {
call->call_stack(), args->server_transport_data,
call->context_, path,
call->start_time_, call->send_deadline(),
call->start_time(), call->send_deadline(),
call->arena(), &call->call_combiner_};
add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall,
call, &call_args));
@ -950,7 +952,7 @@ void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
&(c->final_info_.error_string));
c->status_error_.set(absl::OkStatus());
c->final_info_.stats.latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time_);
gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time());
grpc_call_stack_destroy(c->call_stack(), &c->final_info_,
GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c,
grpc_schedule_on_exec_ctx));
@ -1995,11 +1997,18 @@ class PromiseBasedCall : public Call,
void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
Timestamp deadline() {
MutexLock lock(&deadline_mu_);
return deadline_;
}
// Implementation of EventEngine::Closure, called when deadline expires
void Run() override;
virtual ServerCallContext* server_call_context() { return nullptr; }
bool failed_before_recv_message() const final {
return failed_before_recv_message_.load(std::memory_order_relaxed);
}
using Call::arena;
@ -2145,6 +2154,8 @@ class PromiseBasedCall : public Call,
final_info.stats = final_stats_;
final_info.final_status = status;
final_info.error_string = status_details;
final_info.stats.latency =
gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time());
finalization_.Run(&final_info);
}
@ -2320,6 +2331,7 @@ class PromiseBasedCall : public Call,
// GRPC_OP_SEND_MESSAGE (one count each), and 0 once those payloads have been
// pushed onto the outgoing pipe.
std::atomic<uint8_t> sends_queued_{0};
std::atomic<bool> failed_before_recv_message_{false};
// Waiter for when sends_queued_ becomes 0.
IntraActivityWaiter waiting_for_queued_sends_;
grpc_byte_buffer** recv_message_ = nullptr;
@ -2346,16 +2358,7 @@ PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs,
Party(arena, initial_external_refs),
cq_(args.cq) {
if (args.cq != nullptr) {
GPR_ASSERT(args.pollset_set_alternative == nullptr &&
"Only one of 'cq' and 'pollset_set_alternative' should be "
"non-nullptr.");
GRPC_CQ_INTERNAL_REF(args.cq, "bind");
call_context_.pollent_ =
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args.cq));
}
if (args.pollset_set_alternative != nullptr) {
call_context_.pollent_ = grpc_polling_entity_create_from_pollset_set(
args.pollset_set_alternative);
}
}
@ -2468,8 +2471,6 @@ void PromiseBasedCall::FinishOpOnCompletion(Completion* completion,
void PromiseBasedCall::SetCompletionQueue(grpc_completion_queue* cq) {
cq_ = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
call_context_.pollent_ =
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
}
void PromiseBasedCall::UpdateDeadline(Timestamp deadline) {
@ -2577,6 +2578,7 @@ void PromiseBasedCall::StartRecvMessage(
"finishes: received end-of-stream with error",
DebugTag().c_str());
}
failed_before_recv_message_.store(true);
FailCompletion(completion);
*recv_message_ = nullptr;
} else {
@ -2609,6 +2611,8 @@ void CallContext::UpdateDeadline(Timestamp deadline) {
call_->UpdateDeadline(deadline);
}
Timestamp CallContext::deadline() const { return call_->deadline(); }
ServerCallContext* CallContext::server_call_context() {
return call_->server_call_context();
}
@ -2639,6 +2643,17 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
ClientPromiseBasedCall(Arena* arena, grpc_call_create_args* args)
: PromiseBasedCall(arena, 1, *args) {
global_stats().IncrementClientCallsCreated();
if (args->cq != nullptr) {
GPR_ASSERT(args->pollset_set_alternative == nullptr &&
"Only one of 'cq' and 'pollset_set_alternative' should be "
"non-nullptr.");
polling_entity_.Set(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)));
}
if (args->pollset_set_alternative != nullptr) {
polling_entity_.Set(grpc_polling_entity_create_from_pollset_set(
args->pollset_set_alternative));
}
ScopedContext context(this);
send_initial_metadata_ =
GetContext<Arena>()->MakePooled<ClientMetadata>(GetContext<Arena>());
@ -2691,7 +2706,6 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
}
absl::string_view GetServerAuthority() const override { abort(); }
bool is_trailers_only() const override { return is_trailers_only_; }
bool failed_before_recv_message() const override { return false; }
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) override;
@ -2730,6 +2744,7 @@ class ClientPromiseBasedCall final : public PromiseBasedCall {
Pipe<ServerMetadataHandle> server_initial_metadata_{arena()};
Latch<ServerMetadataHandle> server_trailing_metadata_;
Latch<ServerMetadataHandle> cancel_error_;
Latch<grpc_polling_entity> polling_entity_;
Pipe<MessageHandle> client_to_server_messages_{arena()};
Pipe<MessageHandle> server_to_client_messages_{arena()};
bool is_trailers_only_;
@ -2763,11 +2778,11 @@ void ClientPromiseBasedCall::StartPromise(
token = std::move(token)]() mutable {
return Race(
cancel_error_.Wait(),
Map(channel()->channel_stack()->MakeClientCallPromise(
CallArgs{std::move(client_initial_metadata),
std::move(token), &server_initial_metadata_.sender,
&client_to_server_messages_.receiver,
&server_to_client_messages_.sender}),
Map(channel()->channel_stack()->MakeClientCallPromise(CallArgs{
std::move(client_initial_metadata), std::move(token),
&polling_entity_, &server_initial_metadata_.sender,
&client_to_server_messages_.receiver,
&server_to_client_messages_.sender}),
[this](ServerMetadataHandle trailing_metadata) {
// If we're cancelled the transport doesn't get to return
// stats.
@ -3023,9 +3038,13 @@ class ServerPromiseBasedCall final : public PromiseBasedCall {
void CancelWithError(grpc_error_handle) override;
grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
bool is_notify_tag_closure) override;
bool failed_before_recv_message() const override { return false; }
bool is_trailers_only() const override { abort(); }
absl::string_view GetServerAuthority() const override { return ""; }
absl::string_view GetServerAuthority() const override {
const Slice* authority_metadata =
client_initial_metadata_->get_pointer(HttpAuthorityMetadata());
if (authority_metadata == nullptr) return "";
return authority_metadata->as_string_view();
}
// Polling order for the server promise stack:
//
@ -3189,7 +3208,7 @@ ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena,
Spawn("server_promise",
channel()->channel_stack()->MakeServerCallPromise(
CallArgs{nullptr, ClientInitialMetadataOutstandingToken::Empty(),
nullptr, nullptr, nullptr}),
nullptr, nullptr, nullptr, nullptr}),
[this](ServerMetadataHandle result) { Finish(std::move(result)); });
}
@ -3207,15 +3226,27 @@ void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) {
if (server_initial_metadata_ != nullptr) {
server_initial_metadata_->Close();
}
const auto status =
result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
channelz::ServerNode* channelz_node = server_->channelz_node();
if (channelz_node != nullptr) {
if (result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN) ==
GRPC_STATUS_OK) {
if (status == GRPC_STATUS_OK) {
channelz_node->RecordCallSucceeded();
} else {
channelz_node->RecordCallFailed();
}
}
absl::string_view message_string;
if (Slice* message = result->get_pointer(GrpcMessageMetadata())) {
message_string = message->as_string_view();
}
AcceptTransportStatsFromContext();
if (message_string.empty()) {
RunFinalization(status, nullptr);
} else {
std::string error_string(message_string);
RunFinalization(status, error_string.c_str());
}
set_completed();
ResetDeadline();
PropagateCancellationToChildren();
@ -3307,7 +3338,12 @@ void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops,
metadata->Set(GrpcStatusMetadata(),
op.data.send_status_from_server.status);
if (auto* details = op.data.send_status_from_server.status_details) {
metadata->Set(GrpcMessageMetadata(), Slice(CSliceRef(*details)));
// TODO(ctiller): this should not be a copy, but we have callers that
// allocate and pass in a slice created with
// grpc_slice_from_static_string and then delete the string after
// passing it in, which shouldn't be a supported API.
metadata->Set(GrpcMessageMetadata(),
Slice(grpc_slice_copy(*details)));
}
spawner.Spawn(
"call_send_status_from_server",
@ -3403,11 +3439,14 @@ ServerCallContext::MakeTopOfServerCallPromise(
grpc_metadata_array* publish_initial_metadata,
absl::FunctionRef<void(grpc_call* call)> publish) {
call_->SetCompletionQueue(cq);
call_args.polling_entity->Set(
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)));
call_->server_to_client_messages_ = call_args.server_to_client_messages;
call_->client_to_server_messages_ = call_args.client_to_server_messages;
call_->server_initial_metadata_ = call_args.server_initial_metadata;
call_->client_initial_metadata_ =
std::move(call_args.client_initial_metadata);
call_->set_send_deadline(call_->deadline());
call_->ProcessIncomingInitialMetadata(*call_->client_initial_metadata_);
PublishMetadataArray(call_->client_initial_metadata_.get(),
publish_initial_metadata);

@ -43,7 +43,6 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/promise/arena_promise.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/resource_quota/arena.h"
@ -107,6 +106,7 @@ class CallContext {
// Update the deadline (if deadline < the current deadline).
void UpdateDeadline(Timestamp deadline);
Timestamp deadline() const;
// Run some action in the call activity context. This is needed to adapt some
// legacy systems to promises, and will likely disappear once that conversion
@ -126,7 +126,6 @@ class CallContext {
grpc_call_stats* call_stats() { return &call_stats_; }
gpr_atm* peer_string_atm_ptr();
grpc_polling_entity* polling_entity() { return &pollent_; }
ServerCallContext* server_call_context();
@ -137,9 +136,6 @@ class CallContext {
friend class PromiseBasedCall;
// Call final info.
grpc_call_stats call_stats_;
// Pollset stuff, can't wait to remove.
// TODO(ctiller): bring forth EventEngine.
grpc_polling_entity pollent_;
// TODO(ctiller): remove this once transport APIs are promise based and we
// don't need refcounting here.
PromiseBasedCall* const call_;

@ -539,7 +539,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal(
cq->poller_vtable = poller_vtable;
// One for destroy(), one for pollset_shutdown
new (&cq->owning_refs) grpc_core::RefCount(2);
new (&cq->owning_refs) grpc_core::RefCount(
2, grpc_trace_cq_refcount.enabled() ? "completion_queue" : nullptr);
poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback);

@ -39,6 +39,7 @@
#include "absl/types/variant.h"
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/impl/connectivity_state.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
@ -195,9 +196,35 @@ class Server::RequestMatcherInterface {
virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
RequestedCall* call) = 0;
struct MatchResult {
size_t cq_idx;
RequestedCall* requested_call;
class MatchResult {
public:
MatchResult(Server* server, size_t cq_idx, RequestedCall* requested_call)
: server_(server), cq_idx_(cq_idx), requested_call_(requested_call) {}
~MatchResult() {
if (requested_call_ != nullptr) {
server_->FailCall(cq_idx_, requested_call_, absl::CancelledError());
}
}
MatchResult(const MatchResult&) = delete;
MatchResult& operator=(const MatchResult&) = delete;
MatchResult(MatchResult&& other) noexcept
: server_(other.server_),
cq_idx_(other.cq_idx_),
requested_call_(std::exchange(other.requested_call_, nullptr)) {}
RequestedCall* TakeCall() {
return std::exchange(requested_call_, nullptr);
}
grpc_completion_queue* cq() const { return server_->cqs_[cq_idx_]; }
size_t cq_idx() const { return cq_idx_; }
private:
Server* server_;
size_t cq_idx_;
RequestedCall* requested_call_;
};
// This function is invoked on an incoming promise based RPC.
@ -294,18 +321,20 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
while (true) {
NextPendingCall next_pending = pop_next_pending();
if (next_pending.rc == nullptr) break;
auto mr = MatchResult{request_queue_index, next_pending.rc};
auto mr = MatchResult(server(), request_queue_index, next_pending.rc);
Match(
next_pending.pending,
[mr](CallData* calld) {
[&mr](CallData* calld) {
if (!calld->MaybeActivate()) {
// Zombied Call
calld->KillZombie();
} else {
calld->Publish(mr.cq_idx, mr.requested_call);
calld->Publish(mr.cq_idx(), mr.TakeCall());
}
},
[mr](const std::shared_ptr<ActivityWaiter>& w) { w->Finish(mr); });
[&mr](const std::shared_ptr<ActivityWaiter>& w) {
w->Finish(std::move(mr));
});
}
}
}
@ -357,7 +386,7 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
RequestedCall* rc =
reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
if (rc != nullptr) {
return Immediate(MatchResult{cq_idx, rc});
return Immediate(MatchResult(server(), cq_idx, rc));
}
}
// No cq to take the request found; queue it on the slow list.
@ -390,10 +419,10 @@ class Server::RealRequestMatcher : public RequestMatcherInterface {
};
}
}
return Immediate(MatchResult{cq_idx, rc});
return Immediate(MatchResult(server(), cq_idx, rc));
}
Server* server() const override { return server_; }
Server* server() const final { return server_; }
private:
Server* const server_;
@ -444,7 +473,7 @@ class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {
Crash("unreachable");
}
Server* server() const override { return server_; }
Server* server() const final { return server_; }
// Supply the completion queue related to this request matcher
grpc_completion_queue* cq() const { return cq_; }
@ -501,7 +530,7 @@ class Server::AllocatingRequestMatcherBatch
RequestedCall* rc = new RequestedCall(
static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
call_info.initial_metadata, call_info.details);
return Immediate(MatchResult{cq_idx(), rc});
return Immediate(MatchResult(server(), cq_idx(), rc));
} else {
return Immediate(absl::InternalError("Server shutdown"));
}
@ -556,7 +585,7 @@ class Server::AllocatingRequestMatcherRegistered
new RequestedCall(call_info.tag, call_info.cq, call_info.call,
call_info.initial_metadata, registered_method_,
call_info.deadline, call_info.optional_payload);
return Immediate(MatchResult{cq_idx(), rc});
return Immediate(MatchResult(server(), cq_idx(), rc));
} else {
return Immediate(absl::InternalError("Server shutdown"));
}
@ -1282,8 +1311,7 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
absl::InternalError("Missing :authority header"));
};
}
// TODO(ctiller): deadline handling
Timestamp deadline = Timestamp::InfFuture();
Timestamp deadline = GetContext<CallContext>()->deadline();
// Find request matcher.
RequestMatcherInterface* matcher;
ChannelRegisteredMethod* rm =
@ -1309,19 +1337,19 @@ ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
return TrySeq(
TryJoin(matcher->MatchRequest(chand->cq_idx()),
std::move(maybe_read_first_message)),
[path = std::move(*path), host = std::move(*host_ptr), deadline, server,
[path = std::move(*path), host_ptr, deadline,
call_args = std::move(call_args)](
std::tuple<RequestMatcherInterface::MatchResult,
NextResult<MessageHandle>>
match_result_and_payload) mutable {
auto& mr = std::get<0>(match_result_and_payload);
auto& payload = std::get<1>(match_result_and_payload);
auto* rc = mr.requested_call;
auto* cq_for_new_request = server->cqs_[mr.cq_idx];
auto* rc = mr.TakeCall();
auto* cq_for_new_request = mr.cq();
switch (rc->type) {
case RequestedCall::Type::BATCH_CALL:
GPR_ASSERT(!payload.has_value());
rc->data.batch.details->host = CSliceRef(host.c_slice());
rc->data.batch.details->host = CSliceRef(host_ptr->c_slice());
rc->data.batch.details->method = CSliceRef(path.c_slice());
rc->data.batch.details->deadline =
deadline.as_timespec(GPR_CLOCK_MONOTONIC);

@ -154,6 +154,7 @@ class BatchBuilder {
absl::optional<SliceBuffer> payload;
uint32_t flags;
bool call_failed_before_recv_message = false;
};
// A pending receive metadata.
@ -392,12 +393,19 @@ inline auto BatchBuilder::ReceiveMessage(Target target) {
payload_->recv_message.recv_message_ready = &pc->on_done_closure;
payload_->recv_message.recv_message = &pc->payload;
payload_->recv_message.flags = &pc->flags;
payload_->recv_message.call_failed_before_recv_message =
&pc->call_failed_before_recv_message;
return batch->RefUntil(
Map(pc->done_latch.Wait(),
[pc](absl::Status status)
-> absl::StatusOr<absl::optional<MessageHandle>> {
if (!status.ok()) return status;
if (!pc->payload.has_value()) return absl::nullopt;
if (!pc->payload.has_value()) {
if (pc->call_failed_before_recv_message) {
return absl::CancelledError();
}
return absl::nullopt;
}
return pc->IntoMessageHandle();
}));
}

@ -211,6 +211,9 @@ struct CallArgs {
// This should be moved around and only destroyed when the transport is
// satisfied that the metadata has passed any flow control measures it has.
ClientInitialMetadataOutstandingToken client_initial_metadata_outstanding;
// Latch that will ultimately contain the polling entity for the call.
// TODO(ctiller): remove once event engine lands
Latch<grpc_polling_entity>* polling_entity;
// Initial metadata from the server to the client.
// Set once when it's available.
// During promise setup filters can substitute their own latch for this

@ -213,8 +213,10 @@ std::string CqVerifier::Expectation::ToString() const {
},
[](Maybe) { return std::string("maybe"); },
[](AnyStatus) { return std::string("any success value"); },
[](PerformAction) { return std::string("perform some action"); },
[](MaybePerformAction) {
[](const PerformAction&) {
return std::string("perform some action");
},
[](const MaybePerformAction&) {
return std::string("maybe perform action");
}));
}
@ -284,8 +286,11 @@ void CqVerifier::Verify(Duration timeout, SourceLocation location) {
bool found = false;
for (auto it = expectations_.begin(); it != expectations_.end(); ++it) {
if (it->tag != ev.tag) continue;
auto expectation = std::move(*it);
expectations_.erase(it);
const bool expected = Match(
it->result, [ev](bool success) { return ev.success == success; },
expectation.result,
[ev](bool success) { return ev.success == success; },
[ev](Maybe m) {
if (m.seen != nullptr) *m.seen = true;
return ev.success != 0;
@ -305,7 +310,6 @@ void CqVerifier::Verify(Duration timeout, SourceLocation location) {
if (!expected) {
FailUnexpectedEvent(&ev, location);
}
expectations_.erase(it);
found = true;
break;
}

@ -16,6 +16,8 @@
//
//
#include <stddef.h>
#include <algorithm>
#include <initializer_list>
#include <vector>
@ -66,13 +68,20 @@ TEST_P(ResourceQuotaTest, ResourceQuota) {
auto requests = MakeVec([](int) { return RandomSlice(128 * 1024); });
auto server_calls =
MakeVec([this](int i) { return RequestCall(kServerRecvBaseTag + i); });
std::vector<IncomingMetadata> server_metadata(kNumCalls);
std::vector<IncomingStatusOnClient> server_status(kNumCalls);
std::vector<IncomingMessage> client_message(kNumCalls);
std::vector<IncomingCloseOnServer> client_close(kNumCalls);
IncomingMetadata server_metadata[kNumCalls];
IncomingStatusOnClient server_status[kNumCalls];
IncomingMessage client_message[kNumCalls];
IncomingCloseOnServer client_close[kNumCalls];
enum class SeenServerCall {
kNotSeen = 0,
kSeenWithSuccess,
kSeenWithFailure
};
// Yep, this really initializes all the elements.
SeenServerCall seen_server_call[kNumCalls] = {SeenServerCall::kNotSeen};
auto client_calls =
MakeVec([this, &requests, &server_metadata, &server_status](int i) {
auto c = NewClientCall("/foo").Timeout(Duration::Minutes(1)).Create();
auto c = NewClientCall("/foo").Timeout(Duration::Seconds(5)).Create();
c.NewBatch(kClientBaseTag + i)
.SendInitialMetadata({}, GRPC_INITIAL_METADATA_WAIT_FOR_READY)
.SendMessage(requests[i].Ref())
@ -83,24 +92,28 @@ TEST_P(ResourceQuotaTest, ResourceQuota) {
});
for (int i = 0; i < kNumCalls; i++) {
Expect(kClientBaseTag + i, true);
Expect(kServerRecvBaseTag + i,
PerformAction{[&server_calls, &client_message, i](bool success) {
EXPECT_TRUE(success);
server_calls[i]
.NewBatch(kServerStartBaseTag + i)
.RecvMessage(client_message[i])
.SendInitialMetadata({});
}});
Expect(kServerStartBaseTag + i,
PerformAction{[&server_calls, &client_close, i](bool) {
server_calls[i]
.NewBatch(kServerEndBaseTag + i)
.RecvCloseOnServer(client_close[i])
.SendStatusFromServer(GRPC_STATUS_OK, "xyz", {});
}});
Expect(kServerEndBaseTag + i, true);
Expect(
kServerRecvBaseTag + i,
MaybePerformAction{[this, &seen_server_call, &server_calls,
&client_message, &client_close, i](bool success) {
seen_server_call[i] = success ? SeenServerCall::kSeenWithSuccess
: SeenServerCall::kSeenWithFailure;
if (!success) return;
server_calls[i]
.NewBatch(kServerStartBaseTag + i)
.RecvMessage(client_message[i])
.SendInitialMetadata({});
Expect(kServerStartBaseTag + i,
PerformAction{[&server_calls, &client_close, i](bool) {
server_calls[i]
.NewBatch(kServerEndBaseTag + i)
.RecvCloseOnServer(client_close[i])
.SendStatusFromServer(GRPC_STATUS_OK, "xyz", {});
}});
Expect(kServerEndBaseTag + i, true);
}});
}
Step(Duration::Seconds(30));
Step();
int cancelled_calls_on_client = 0;
int cancelled_calls_on_server = 0;
@ -123,7 +136,8 @@ TEST_P(ResourceQuotaTest, ResourceQuota) {
Crash(absl::StrFormat("Unexpected status code: %d",
server_status[i].status()));
}
if (client_close[i].was_cancelled()) {
if (seen_server_call[i] == SeenServerCall::kSeenWithSuccess &&
client_close[i].was_cancelled()) {
cancelled_calls_on_server++;
}
}
@ -132,6 +146,18 @@ TEST_P(ResourceQuotaTest, ResourceQuota) {
"client, %d timed out, %d unavailable.",
kNumCalls, cancelled_calls_on_server, cancelled_calls_on_client,
deadline_exceeded, unavailable);
ShutdownServerAndNotify(0);
Expect(0, PerformAction{[this](bool success) {
EXPECT_TRUE(success);
DestroyServer();
}});
for (size_t i = 0; i < kNumCalls; i++) {
if (seen_server_call[i] == SeenServerCall::kNotSeen) {
Expect(kServerRecvBaseTag + i, false);
}
}
Step();
}
} // namespace

@ -479,7 +479,10 @@ class MainLoop {
CallArgs call_args{std::move(*LoadMetadata(client_initial_metadata,
&client_initial_metadata_)),
ClientInitialMetadataOutstandingToken::Empty(),
&server_initial_metadata->sender, nullptr, nullptr};
nullptr,
&server_initial_metadata->sender,
nullptr,
nullptr};
if (is_client) {
promise_ = main_loop_->channel_stack_->MakeClientCallPromise(
std::move(call_args));

@ -116,7 +116,7 @@ void FilterTestBase::Call::Impl::Start(ClientMetadataHandle md) {
EXPECT_EQ(promise_, absl::nullopt);
promise_ = channel_->filter->MakeCallPromise(
CallArgs{std::move(md), ClientInitialMetadataOutstandingToken::Empty(),
&pipe_server_initial_metadata_.sender,
nullptr, &pipe_server_initial_metadata_.sender,
&pipe_client_to_server_messages_.receiver,
&pipe_server_to_client_messages_.sender},
[this](CallArgs args) -> ArenaPromise<ServerMetadataHandle> {

@ -275,6 +275,36 @@ TEST_F(PipeTest, CanCloseSend) {
MakeScopedArena(1024, &memory_allocator_));
}
TEST_F(PipeTest, CanCloseWithErrorSend) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
MakeActivity(
[] {
auto* pipe = GetContext<Arena>()->ManagedNew<Pipe<int>>();
return Seq(
// Concurrently:
// - wait for a received value (will stall forever since we push
// nothing into the queue)
// - close the sender, which will signal the receiver to return an
// end-of-stream.
Join(pipe->receiver.Next(),
[pipe]() mutable {
pipe->sender.CloseWithError();
return absl::OkStatus();
}),
// Verify we received end-of-stream and closed the sender.
[](std::tuple<NextResult<int>, absl::Status> result) {
EXPECT_FALSE(std::get<0>(result).has_value());
EXPECT_TRUE(std::get<0>(result).cancelled());
EXPECT_EQ(std::get<1>(result), absl::OkStatus());
return absl::OkStatus();
});
},
NoWakeupScheduler(),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); },
MakeScopedArena(1024, &memory_allocator_));
}
TEST_F(PipeTest, CanCloseSendWithInterceptor) {
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));

@ -103,7 +103,11 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["no_test_ios"],
shard_count = 10,
tags = [
"cpp_end2end_test",
"no_test_ios",
],
deps = [
"//:gpr",
"//:grpc",
@ -127,6 +131,7 @@ grpc_cc_test(
"gtest",
],
tags = [
"cpp_end2end_test",
"no_test_android", # android_cc_test doesn't work with data dependency.
"no_test_ios",
"no_windows",
@ -197,6 +202,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":interceptors_util",
":test_service_impl",
@ -217,6 +223,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -235,6 +242,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":interceptors_util",
":test_service_impl",
@ -287,7 +295,10 @@ grpc_cc_test(
],
# TODO(yulin-liang): The test is not able to load the certificate files on
# iOS. Figure out why.
tags = ["no_test_ios"],
tags = [
"cpp_end2end_test",
"no_test_ios",
],
deps = [
":test_service_impl",
"//:gpr",
@ -309,6 +320,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -324,7 +336,11 @@ grpc_cc_test(
name = "end2end_test",
size = "large",
flaky = True, # TODO(b/151704375)
tags = ["no_test_ios"],
shard_count = 10,
tags = [
"cpp_end2end_test",
"no_test_ios",
],
deps = [
":end2end_test_lib",
# DO NOT REMOVE THE grpc++ dependence below since the internal build
@ -356,6 +372,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -374,6 +391,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -392,6 +410,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_health_check_service_impl",
":test_service_impl",
@ -413,6 +432,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -432,6 +452,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -451,6 +472,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -470,6 +492,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -488,7 +511,10 @@ grpc_cc_test(
"gtest",
],
flaky = True, # TODO(b/151315347)
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
tags = [
"cpp_end2end_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
":connection_attempt_injector",
":test_service_impl",
@ -518,7 +544,10 @@ grpc_cc_test(
"absl/types:optional",
],
flaky = True,
tags = ["no_test_ios"],
tags = [
"cpp_end2end_test",
"no_test_ios",
],
deps = [
":counted_service",
":rls_server",
@ -544,6 +573,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -565,7 +595,10 @@ grpc_cc_test(
"gtest",
],
flaky = True, # TODO(b/150567713)
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
tags = [
"cpp_end2end_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
":counted_service",
":test_service_impl",
@ -590,6 +623,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -611,6 +645,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -677,6 +712,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":interceptors_util",
":test_service_impl",
@ -697,6 +733,7 @@ grpc_cc_test(
"gtest",
],
tags = [
"cpp_end2end_test",
"no_test_ios",
"no_windows",
],
@ -739,6 +776,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -757,7 +795,10 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["no_windows"],
tags = [
"cpp_end2end_test",
"no_windows",
],
deps = [
"//:gpr",
"//:grpc",
@ -778,7 +819,10 @@ grpc_cc_test(
"gtest",
],
shard_count = 5,
tags = ["no_windows"], # TODO(jtattermusch): fix test on windows
tags = [
"cpp_end2end_test",
"no_windows",
], # TODO(jtattermusch): fix test on windows
deps = [
"//:gpr",
"//:grpc",
@ -825,6 +869,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -844,6 +889,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -863,6 +909,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -881,6 +928,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:grpc++",
"//:grpc++_reflection",
@ -903,6 +951,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
":test_service_impl",
"//:gpr",
@ -929,6 +978,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:gpr",
"//:grpc",
@ -946,6 +996,7 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["cpp_end2end_test"],
deps = [
"//:grpc++",
"//:grpcpp_backend_metric_recorder",

@ -1195,38 +1195,48 @@ TEST_P(End2endTest, CancelRpcBeforeStart) {
}
TEST_P(End2endTest, CancelRpcAfterStart) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
request.mutable_param()->set_server_notify_client_when_started(true);
request.mutable_param()->set_skip_cancelled_check(true);
Status s;
std::thread echo_thread([this, &s, &context, &request, &response] {
s = stub_->Echo(&context, request, &response);
EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
});
if (!GetParam().callback_server()) {
service_.ClientWaitUntilRpcStarted();
} else {
callback_service_.ClientWaitUntilRpcStarted();
}
for (int i = 0; i < 10; i++) {
ResetStub();
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
request.mutable_param()->set_server_notify_client_when_started(true);
request.mutable_param()->set_skip_cancelled_check(true);
Status s;
std::thread echo_thread([this, &s, &context, &request, &response] {
s = stub_->Echo(&context, request, &response);
});
if (!GetParam().callback_server()) {
service_.ClientWaitUntilRpcStarted();
} else {
callback_service_.ClientWaitUntilRpcStarted();
}
context.TryCancel();
context.TryCancel();
if (!GetParam().callback_server()) {
service_.SignalServerToContinue();
} else {
callback_service_.SignalServerToContinue();
}
if (!GetParam().callback_server()) {
service_.SignalServerToContinue();
} else {
callback_service_.SignalServerToContinue();
}
echo_thread.join();
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
echo_thread.join();
// TODO(ctiller): improve test to not be flaky
//
// TryCancel is best effort, and it can happen that the cancellation is not
// acted upon before the server wakes up, sends a response, and the client
// reads that.
// For this reason, we try a few times here to see the cancellation result.
if (s.ok()) continue;
EXPECT_EQ("", response.message());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
if (GetParam().use_interceptors()) {
EXPECT_EQ(20, PhonyInterceptor::GetNumTimesCancel());
}
return;
}
GTEST_FAIL() << "Failed to get cancellation";
}
// Client cancels request stream after sending two messages

@ -90,19 +90,25 @@ void ServerTryCancel(ServerContext* context);
class TestServiceSignaller {
public:
void ClientWaitUntilRpcStarted() {
gpr_log(GPR_DEBUG, "*** enter ClientWaitUntilRpcStarted ***");
std::unique_lock<std::mutex> lock(mu_);
cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
gpr_log(GPR_DEBUG, "*** leave ClientWaitUntilRpcStarted ***");
}
void ServerWaitToContinue() {
gpr_log(GPR_DEBUG, "*** enter ServerWaitToContinue ***");
std::unique_lock<std::mutex> lock(mu_);
cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
gpr_log(GPR_DEBUG, "*** leave ServerWaitToContinue ***");
}
void SignalClientThatRpcStarted() {
gpr_log(GPR_DEBUG, "*** SignalClientThatRpcStarted ***");
std::unique_lock<std::mutex> lock(mu_);
rpc_started_ = true;
cv_rpc_started_.notify_one();
}
void SignalServerToContinue() {
gpr_log(GPR_DEBUG, "*** SignalServerToContinue ***");
std::unique_lock<std::mutex> lock(mu_);
server_should_continue_ = true;
cv_server_continue_.notify_one();

@ -101,6 +101,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -139,10 +140,11 @@ grpc_cc_test(
],
flaky = True, # TODO(b/144705388)
linkstatic = True, # Fixes dyld error on MacOS
shard_count = 20,
shard_count = 50,
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -166,6 +168,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -192,6 +195,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -214,6 +218,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -242,6 +247,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -268,6 +274,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -291,6 +298,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -318,6 +326,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -346,6 +355,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -373,6 +383,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",
@ -391,7 +402,10 @@ grpc_cc_test(
external_deps = [
"gtest",
],
tags = ["no_test_ios"],
tags = [
"no_test_ios",
"xds_end2end_test",
],
deps = [
"//:gpr",
"//:grpc",
@ -415,6 +429,7 @@ grpc_cc_test(
tags = [
"no_test_ios",
"no_windows",
"xds_end2end_test",
], # TODO(jtattermusch): fix test on windows
deps = [
":xds_end2end_test_lib",

Loading…
Cancel
Save