From 4dd0bba12e90f490fcccef2dca1ae7e907cebbbf Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 18 Nov 2021 05:25:04 +0000 Subject: [PATCH] Revert "Revert "Api fuzzer extensions to support simulating traffic congestion (#27820)" (#27973)" (#27974) * Revert "Revert "Api fuzzer extensions to support simulating traffic congestion (#27820)" (#27973)" This reverts commit 879f97ef70b5bd09538dab5e69cf1fc3df3da8d0. * updating passthru_endpoint file to fix windows breakages * Automated change: Fix sanity tests Co-authored-by: Vignesh2208 --- test/core/end2end/fuzzers/api_fuzzer.cc | 107 ++- test/core/end2end/fuzzers/api_fuzzer.proto | 7 + .../api_fuzzer_corpus/call-complete-streaming | 618 ++++++++++++++++++ .../api_fuzzer_corpus/call-complete-unary | 496 ++++++++++++++ test/core/util/passthru_endpoint.cc | 313 ++++++++- test/core/util/passthru_endpoint.h | 14 +- 6 files changed, 1511 insertions(+), 44 deletions(-) create mode 100644 test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming create mode 100644 test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 20929a58c23..ed861a2b7b7 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -43,6 +43,14 @@ #include "test/core/end2end/fuzzers/api_fuzzer.pb.h" #include "test/core/util/passthru_endpoint.h" +#define MAX_ADVANCE_TIME_MICROS (24 * 3600 * 365 * 1000000) // 1 year +// Applicable when simulating channel actions. Prevents overflows. +#define MAX_WAIT_MS (24 * 3600 * 365 * 1000) // 1 year +// Applicable when simulating channel actions. Prevents overflows. +#define MAX_ADD_N_READABLE_BYTES (2 * 1024 * 1024) // 2GB +// Applicable when simulating channel actions. Prevents overflows. +#define MAX_ADD_N_WRITABLE_BYTES (2 * 1024 * 1024) // 2GB + //////////////////////////////////////////////////////////////////////////////// // logging @@ -58,6 +66,8 @@ static gpr_timespec g_now; static grpc_server* g_server; static grpc_channel* g_channel; static grpc_resource_quota* g_resource_quota; +static std::vector g_channel_actions; +static std::atomic g_channel_force_delete{false}; extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); @@ -146,7 +156,6 @@ grpc_ares_request* my_dns_lookup_ares_locked( static void my_cancel_ares_request_locked(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } - //////////////////////////////////////////////////////////////////////////////// // client connection @@ -168,8 +177,10 @@ static void do_connect(void* arg, grpc_error_handle error) { } else if (g_server != nullptr) { grpc_endpoint* client; grpc_endpoint* server; - grpc_passthru_endpoint_create(&client, &server, nullptr); + grpc_passthru_endpoint_create(&client, &server, nullptr, true); *fc->ep = client; + start_scheduling_grpc_passthru_endpoint_channel_effects( + client, g_channel_actions, [&]() { g_channel_force_delete = true; }); grpc_core::Server* core_server = grpc_core::Server::FromC(g_server); grpc_transport* transport = grpc_create_chttp2_transport( @@ -267,7 +278,12 @@ enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED }; class Call : public std::enable_shared_from_this { public: - explicit Call(CallType type) : type_(type) {} + explicit Call(CallType type) : type_(type) { + grpc_metadata_array_init(&recv_initial_metadata_); + grpc_metadata_array_init(&recv_trailing_metadata_); + grpc_call_details_init(&call_details_); + } + ~Call(); CallType type() const { return type_; } @@ -402,18 +418,30 @@ class Call : public std::enable_shared_from_this { : nullptr; } break; case api_fuzzer::BatchOp::kReceiveInitialMetadata: - op.op = GRPC_OP_RECV_INITIAL_METADATA; - *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; - op.data.recv_initial_metadata.recv_initial_metadata = - &recv_initial_metadata_; + if (enqueued_recv_initial_metadata_) { + *batch_is_ok = false; + } else { + enqueued_recv_initial_metadata_ = true; + op.op = GRPC_OP_RECV_INITIAL_METADATA; + *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; + op.data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata_; + } break; case api_fuzzer::BatchOp::kReceiveMessage: - if (call_closed_) { + // Allow only one active pending_recv_message_op to exist. Otherwise if + // the previous enqueued recv_message_op is not complete by the time + // we get here, then under certain conditions, enqueing this op will + // over-write the internal call->receiving_buffer maintained by grpc + // leading to a memory leak. + if (call_closed_ || pending_recv_message_op_) { *batch_is_ok = false; } else { op.op = GRPC_OP_RECV_MESSAGE; *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE; + pending_recv_message_op_ = true; op.data.recv_message.recv_message = &recv_message_; + unwinders->push_back([this]() { pending_recv_message_op_ = false; }); } break; case api_fuzzer::BatchOp::kReceiveStatusOnClient: @@ -422,6 +450,7 @@ class Call : public std::enable_shared_from_this { op.data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_; op.data.recv_status_on_client.status_details = &recv_status_details_; + *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT; break; case api_fuzzer::BatchOp::kReceiveCloseOnServer: op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -437,12 +466,14 @@ class Call : public std::enable_shared_from_this { Validator* FinishedBatchValidator(uint8_t has_ops) { ++pending_ops_; auto self = shared_from_this(); - return MakeValidator([self, has_ops](bool) { + return MakeValidator([self, has_ops](bool /*success*/) { --self->pending_ops_; - if ((has_ops & (1u << GRPC_OP_RECV_MESSAGE) && - self->recv_message_ != nullptr)) { - grpc_byte_buffer_destroy(self->recv_message_); - self->recv_message_ = nullptr; + if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) { + self->pending_recv_message_op_ = false; + if (self->recv_message_ != nullptr) { + grpc_byte_buffer_destroy(self->recv_message_); + self->recv_message_ = nullptr; + } } if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) { grpc_byte_buffer_destroy(self->send_message_); @@ -483,9 +514,11 @@ class Call : public std::enable_shared_from_this { int cancelled_; int pending_ops_ = 0; bool sent_initial_metadata_ = false; + bool enqueued_recv_initial_metadata_ = false; grpc_call_details call_details_{}; grpc_byte_buffer* send_message_ = nullptr; bool call_closed_ = false; + bool pending_recv_message_op_ = false; std::vector free_pointers_; std::vector unref_slices_; @@ -511,7 +544,6 @@ Call::~Call() { if (call_ != nullptr) { grpc_call_unref(call_); } - grpc_slice_unref(recv_status_details_); grpc_call_details_destroy(&call_details_); @@ -522,6 +554,11 @@ Call::~Call() { grpc_slice_unref(s); } + if (recv_message_ != nullptr) { + grpc_byte_buffer_destroy(recv_message_); + recv_message_ = nullptr; + } + grpc_metadata_array_destroy(&recv_initial_metadata_); grpc_metadata_array_destroy(&recv_trailing_metadata_); } @@ -707,7 +744,6 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { int action_index = 0; auto no_more_actions = [&]() { action_index = msg.actions_size(); }; - auto poll_cq = [&]() -> bool { grpc_event ev = grpc_completion_queue_next( cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); @@ -750,7 +786,11 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { call->Shutdown(); } - g_now = gpr_time_add(g_now, gpr_time_from_seconds(1, GPR_TIMESPAN)); + g_now = gpr_time_add( + g_now, + gpr_time_from_seconds( + std::max(1, static_cast(MAX_WAIT_MS / 1000)), + GPR_TIMESPAN)); grpc_timer_manager_tick(); GPR_ASSERT(!poll_cq()); continue; @@ -758,6 +798,12 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_timer_manager_tick(); + if (g_channel_force_delete.exchange(false) && g_channel) { + grpc_channel_destroy(g_channel); + g_channel = nullptr; + g_channel_actions.clear(); + } + const api_fuzzer::Action& action = msg.actions(action_index); action_index++; switch (action.type_case()) { @@ -772,12 +818,18 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { // increment global time case api_fuzzer::Action::kAdvanceTime: { g_now = gpr_time_add( - g_now, gpr_time_from_micros(action.advance_time(), GPR_TIMESPAN)); + g_now, gpr_time_from_micros( + std::min(static_cast(action.advance_time()), + static_cast(MAX_ADVANCE_TIME_MICROS)), + GPR_TIMESPAN)); break; } // create an insecure channel case api_fuzzer::Action::kCreateChannel: { - if (g_channel == nullptr) { + if (!action.create_channel().channel_actions_size() || + g_channel != nullptr) { + no_more_actions(); + } else { grpc_channel_args* args = ReadArgs(action.create_channel().channel_args()); if (action.create_channel().has_channel_creds()) { @@ -790,13 +842,26 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { g_channel = grpc_insecure_channel_create( action.create_channel().target().c_str(), args, nullptr); } + g_channel_actions.clear(); + for (int i = 0; i < action.create_channel().channel_actions_size(); + i++) { + const api_fuzzer::ChannelAction& channel_action = + action.create_channel().channel_actions(i); + g_channel_actions.push_back({ + std::min(channel_action.wait_ms(), + static_cast(MAX_WAIT_MS)), + std::min(channel_action.add_n_bytes_writable(), + static_cast(MAX_ADD_N_WRITABLE_BYTES)), + std::min(channel_action.add_n_bytes_readable(), + static_cast(MAX_ADD_N_READABLE_BYTES)), + }); + } GPR_ASSERT(g_channel != nullptr); + g_channel_force_delete = false; { grpc_core::ExecCtx exec_ctx; grpc_channel_args_destroy(args); } - } else { - no_more_actions(); } break; } @@ -955,6 +1020,7 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { if (!op.has_value()) continue; ops.push_back(*op); } + if (g_channel == nullptr) ok = false; if (ok) { auto* v = active_call->FinishedBatchValidator(has_ops); @@ -1061,6 +1127,5 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_completion_queue_destroy(cq); grpc_resource_quota_unref(g_resource_quota); - grpc_shutdown_blocking(); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.proto b/test/core/end2end/fuzzers/api_fuzzer.proto index b546d93f96d..af867c3a931 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.proto +++ b/test/core/end2end/fuzzers/api_fuzzer.proto @@ -88,10 +88,17 @@ message Metadatum { ByteSlice value = 2; } +message ChannelAction { + uint64 add_n_bytes_writable = 1; + uint64 add_n_bytes_readable = 2; + uint64 wait_ms = 3; +} + message CreateChannel { string target = 1; repeated ChannelArg channel_args = 2; ChannelCreds channel_creds = 3; + repeated ChannelAction channel_actions = 4; } message CreateServer { diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming new file mode 100644 index 00000000000..4d52068086d --- /dev/null +++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming @@ -0,0 +1,618 @@ +actions { + create_server { + } +} +actions { + create_channel { + target: "dns:server" + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + } +} +actions { + create_call { + method: { value: "/foo" } + timeout: 1000000000 + } +} +actions { + queue_batch { + operations { + send_initial_metadata {} + } + operations { + receive_initial_metadata {} + } + } +} +actions { + request_call {} +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + send_initial_metadata {} + } + } +} +actions { + poll_cq: {} +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + send_message { + message { + value: "hello world" + } + } + } + } +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + receive_message {} + } + } +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + send_message { + message { + value: "hello world" + } + } + } + } +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + receive_message {} + } + } +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + receive_status_on_client {} + } + } +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + send_status_from_server { + status_code: 0 + } + } + operations { + receive_close_on_server {} + } + } +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + queue_batch { + operations { + send_close_from_client {} + } + } +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} + diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary new file mode 100644 index 00000000000..5c9d9d6c052 --- /dev/null +++ b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary @@ -0,0 +1,496 @@ +actions { + create_server { + } +} +actions { + create_channel { + target: "dns:server" + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 20 + add_n_bytes_readable: 20 + } + channel_actions { + wait_ms: 1000 + add_n_bytes_writable: 10 + add_n_bytes_readable: 10 + } + } +} +actions { + create_call { + method: { value: "/foo" } + timeout: 1000000000 + } +} +actions { + queue_batch { + operations { + send_initial_metadata {} + } + operations { + receive_initial_metadata {} + } + operations { + receive_message {} + } + operations { + send_close_from_client {} + } + operations { + receive_status_on_client {} + } + } +} +actions { + request_call {} +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + change_active_call {} +} +actions { + queue_batch { + operations { + send_initial_metadata {} + } + operations { + send_message { + message { + value: "hello world" + } + } + } + } +} +actions { + queue_batch { + operations { + send_status_from_server { + status_code: 0 + } + } + operations { + receive_close_on_server {} + } + } +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} +actions { + poll_cq: {} +} +actions { + advance_time: 1000000 +} diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index da509bcbd27..27689c5e588 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -29,27 +29,83 @@ #include #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" typedef struct passthru_endpoint passthru_endpoint; +typedef struct { + bool is_armed; + grpc_endpoint* ep; + grpc_slice_buffer* slices; + grpc_closure* cb; +} pending_op; + +typedef struct { + grpc_timer timer; + uint64_t allowed_write_bytes; + uint64_t allowed_read_bytes; + std::vector actions; + std::function on_complete; +} grpc_passthru_endpoint_channel_effects; + typedef struct { grpc_endpoint base; passthru_endpoint* parent; grpc_slice_buffer read_buffer; + grpc_slice_buffer write_buffer; grpc_slice_buffer* on_read_out; grpc_closure* on_read; + pending_op pending_read_op; + pending_op pending_write_op; + uint64_t bytes_read_so_far; + uint64_t bytes_written_so_far; } half; struct passthru_endpoint { gpr_mu mu; int halves; grpc_passthru_endpoint_stats* stats; + grpc_passthru_endpoint_channel_effects* channel_effects; + bool simulate_channel_actions; bool shutdown; half client; half server; }; +static void do_pending_read_op_locked(half* m, grpc_error_handle error) { + GPR_ASSERT(m->pending_read_op.is_armed); + GPR_ASSERT(m->bytes_read_so_far <= + m->parent->channel_effects->allowed_read_bytes); + if (m->parent->shutdown) { + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, m->pending_read_op.cb, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); + grpc_slice_buffer_reset_and_unref(&m->read_buffer); + m->pending_read_op.is_armed = false; + return; + } + + if (m->bytes_read_so_far == m->parent->channel_effects->allowed_read_bytes) { + // Keep it in pending state. + return; + } + // This delayed processing should only be invoked when read_buffer has + // something in it. + GPR_ASSERT(m->read_buffer.count > 0); + uint64_t readable_length = std::min( + m->read_buffer.length, + m->parent->channel_effects->allowed_read_bytes - m->bytes_read_so_far); + GPR_ASSERT(readable_length > 0); + grpc_slice_buffer_move_first_no_ref(&m->read_buffer, readable_length, + m->pending_read_op.slices); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_read_op.cb, error); + if (m->parent->simulate_channel_actions) { + m->bytes_read_so_far += readable_length; + } + m->pending_read_op.is_armed = false; +} + static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, bool /*urgent*/) { half* m = reinterpret_cast(ep); @@ -59,42 +115,177 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, DEBUG_LOCATION, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); } else if (m->read_buffer.count > 0) { - grpc_slice_buffer_swap(&m->read_buffer, slices); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE); + GPR_ASSERT(!m->pending_read_op.is_armed); + GPR_ASSERT(!m->on_read); + m->pending_read_op.is_armed = true; + m->pending_read_op.cb = cb; + m->pending_read_op.ep = ep; + m->pending_read_op.slices = slices; + do_pending_read_op_locked(m, GRPC_ERROR_NONE); } else { + GPR_ASSERT(!m->pending_read_op.is_armed); m->on_read = cb; m->on_read_out = slices; } gpr_mu_unlock(&m->parent->mu); } +// Copy src slice and split the copy at n bytes into two separate slices +void grpc_slice_copy_split(grpc_slice src, uint64_t n, grpc_slice& split1, + grpc_slice& split2) { + GPR_ASSERT(n <= GRPC_SLICE_LENGTH(src)); + if (n == GRPC_SLICE_LENGTH(src)) { + split1 = grpc_slice_copy(src); + split2 = grpc_empty_slice(); + return; + } + split1 = GRPC_SLICE_MALLOC(n); + memcpy(GRPC_SLICE_START_PTR(split1), GRPC_SLICE_START_PTR(src), n); + split2 = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(src) - n); + memcpy(GRPC_SLICE_START_PTR(split2), GRPC_SLICE_START_PTR(src) + n, + GRPC_SLICE_LENGTH(src) - n); +} + static half* other_half(half* h) { if (h == &h->parent->client) return &h->parent->server; return &h->parent->client; } +static void do_pending_write_op_locked(half* m, grpc_error_handle error) { + GPR_ASSERT(m->pending_write_op.is_armed); + GPR_ASSERT(m->bytes_written_so_far <= + m->parent->channel_effects->allowed_write_bytes); + if (m->parent->shutdown) { + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, m->pending_write_op.cb, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); + m->pending_write_op.is_armed = false; + grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices); + return; + } + if (m->bytes_written_so_far == + m->parent->channel_effects->allowed_write_bytes) { + // Keep it in pending state. + return; + } + + half* other = other_half(m); + uint64_t max_writable = + std::min(m->pending_write_op.slices->length, + m->parent->channel_effects->allowed_write_bytes - + m->bytes_written_so_far); + uint64_t max_readable = other->parent->channel_effects->allowed_read_bytes - + other->bytes_read_so_far; + uint64_t immediate_bytes_read = + other->on_read != nullptr ? std::min(max_readable, max_writable) + : 0; + + GPR_ASSERT(max_writable > 0); + GPR_ASSERT(max_readable >= 0); + // At the end of this process, we should have written max_writable bytes; + if (m->parent->simulate_channel_actions) { + m->bytes_written_so_far += max_writable; + } + // Estimate if the original write would still be pending at the end of this + // process + bool would_write_be_pending = + max_writable < m->pending_write_op.slices->length; + if (!m->parent->simulate_channel_actions) { + GPR_ASSERT(!would_write_be_pending); + } + grpc_slice_buffer* slices = m->pending_write_op.slices; + grpc_slice_buffer* dest = + other->on_read != nullptr ? other->on_read_out : &other->read_buffer; + while (max_writable > 0) { + grpc_slice slice = grpc_slice_buffer_take_first(slices); + uint64_t slice_length = GPR_SLICE_LENGTH(slice); + GPR_ASSERT(slice_length > 0); + grpc_slice split1, split2; + uint64_t split_length = 0; + if (slice_length <= max_readable) { + split_length = std::min(slice_length, max_writable); + } else if (max_readable > 0) { + // slice_length > max_readable + split_length = std::min(max_readable, max_writable); + } else { + // slice_length still > max_readable but max_readable is 0. + // In this case put the bytes into other->read_buffer. During a future + // read if max_readable still remains zero at the time of read, the + // pending read logic will kick in. + dest = &other->read_buffer; + split_length = std::min(slice_length, max_writable); + } + + grpc_slice_copy_split(slice, split_length, split1, split2); + grpc_slice_unref_internal(slice); + // Write a copy of the slice to the destination to be read + grpc_slice_buffer_add_indexed(dest, split1); + // Re-insert split2 into source for next iteration. + if (GPR_SLICE_LENGTH(split2) > 0) { + grpc_slice_buffer_undo_take_first(slices, split2); + } else { + grpc_slice_unref_internal(split2); + } + + if (max_readable > 0) { + GPR_ASSERT(max_readable >= static_cast(split_length)); + max_readable -= split_length; + } + + GPR_ASSERT(max_writable >= static_cast(split_length)); + max_writable -= split_length; + } + if (immediate_bytes_read > 0) { + GPR_ASSERT(!other->pending_read_op.is_armed); + if (m->parent->simulate_channel_actions) { + other->bytes_read_so_far += immediate_bytes_read; + } + grpc_core::ExecCtx::Run(DEBUG_LOCATION, other->on_read, error); + other->on_read = nullptr; + } + + if (!would_write_be_pending) { + // No slices should be left + GPR_ASSERT(m->pending_write_op.slices->count == 0); + grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices); + m->pending_write_op.is_armed = false; + grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_write_op.cb, error); + } +} + static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* /*arg*/) { - half* m = other_half(reinterpret_cast(ep)); + half* m = reinterpret_cast(ep); gpr_mu_lock(&m->parent->mu); - grpc_error_handle error = GRPC_ERROR_NONE; gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1); if (m->parent->shutdown) { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); - } else if (m->on_read != nullptr) { - for (size_t i = 0; i < slices->count; i++) { - grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i])); - } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE); - m->on_read = nullptr; + grpc_core::ExecCtx::Run( + DEBUG_LOCATION, cb, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown")); } else { - for (size_t i = 0; i < slices->count; i++) { - grpc_slice_buffer_add(&m->read_buffer, - grpc_slice_copy(slices->slices[i])); + GPR_ASSERT(!m->pending_write_op.is_armed); + m->pending_write_op.is_armed = true; + m->pending_write_op.cb = cb; + m->pending_write_op.ep = ep; + // Copy slices into m->pending_write_op.slices + m->pending_write_op.slices = &m->write_buffer; + GPR_ASSERT(m->pending_write_op.slices->count == 0); + for (int i = 0; i < static_cast(slices->count); i++) { + grpc_slice_buffer_add_indexed(m->pending_write_op.slices, + grpc_slice_copy(slices->slices[i])); } + do_pending_write_op_locked(m, GRPC_ERROR_NONE); } gpr_mu_unlock(&m->parent->mu); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); +} + +void flush_pending_ops_locked(half* m, grpc_error_handle error) { + if (m->pending_read_op.is_armed) { + do_pending_read_op_locked(m, error); + } + if (m->pending_write_op.is_armed) { + do_pending_write_op_locked(m, error); + } } static void me_add_to_pollset(grpc_endpoint* /*ep*/, @@ -110,6 +301,7 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { half* m = reinterpret_cast(ep); gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; + flush_pending_ops_locked(m, GRPC_ERROR_NONE); if (m->on_read) { grpc_core::ExecCtx::Run( DEBUG_LOCATION, m->on_read, @@ -117,6 +309,7 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { m->on_read = nullptr; } m = other_half(m); + flush_pending_ops_locked(m, GRPC_ERROR_NONE); if (m->on_read) { grpc_core::ExecCtx::Run( DEBUG_LOCATION, m->on_read, @@ -127,17 +320,28 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { GRPC_ERROR_UNREF(why); } +void grpc_passthru_endpoint_destroy(passthru_endpoint* p) { + gpr_mu_destroy(&p->mu); + grpc_passthru_endpoint_stats_destroy(p->stats); + delete p->channel_effects; + grpc_slice_buffer_destroy_internal(&p->client.read_buffer); + grpc_slice_buffer_destroy_internal(&p->server.read_buffer); + grpc_slice_buffer_destroy_internal(&p->client.write_buffer); + grpc_slice_buffer_destroy_internal(&p->server.write_buffer); + gpr_free(p); +} + static void me_destroy(grpc_endpoint* ep) { passthru_endpoint* p = (reinterpret_cast(ep))->parent; gpr_mu_lock(&p->mu); - if (0 == --p->halves) { + if (0 == --p->halves && p->channel_effects->actions.empty()) { + // no pending channel actions exist gpr_mu_unlock(&p->mu); - gpr_mu_destroy(&p->mu); - grpc_passthru_endpoint_stats_destroy(p->stats); - grpc_slice_buffer_destroy_internal(&p->client.read_buffer); - grpc_slice_buffer_destroy_internal(&p->server.read_buffer); - gpr_free(p); + grpc_passthru_endpoint_destroy(p); } else { + if (p->halves == 0 && p->simulate_channel_actions) { + grpc_timer_cancel(&p->channel_effects->timer); + } gpr_mu_unlock(&p->mu); } } @@ -179,14 +383,21 @@ static void half_init(half* m, passthru_endpoint* parent, m->base.vtable = &vtable; m->parent = parent; grpc_slice_buffer_init(&m->read_buffer); + grpc_slice_buffer_init(&m->write_buffer); + m->pending_write_op.slices = nullptr; m->on_read = nullptr; + m->bytes_read_so_far = 0; + m->bytes_written_so_far = 0; + m->pending_write_op.is_armed = false; + m->pending_read_op.is_armed = false; std::string name = absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent); } void grpc_passthru_endpoint_create(grpc_endpoint** client, grpc_endpoint** server, - grpc_passthru_endpoint_stats* stats) { + grpc_passthru_endpoint_stats* stats, + bool simulate_channel_actions) { passthru_endpoint* m = static_cast(gpr_malloc(sizeof(*m))); m->halves = 2; @@ -197,6 +408,12 @@ void grpc_passthru_endpoint_create(grpc_endpoint** client, gpr_ref(&stats->refs); m->stats = stats; } + m->channel_effects = new grpc_passthru_endpoint_channel_effects(); + m->simulate_channel_actions = simulate_channel_actions; + if (!simulate_channel_actions) { + m->channel_effects->allowed_read_bytes = UINT64_MAX; + m->channel_effects->allowed_write_bytes = UINT64_MAX; + } half_init(&m->client, m, "client"); half_init(&m->server, m, "server"); gpr_mu_init(&m->mu); @@ -218,3 +435,55 @@ void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) { gpr_free(stats); } } + +static void sched_next_channel_action_locked(half* m); + +static void do_next_sched_channel_action(void* arg, grpc_error_handle error) { + half* m = reinterpret_cast(arg); + gpr_mu_lock(&m->parent->mu); + GPR_ASSERT(!m->parent->channel_effects->actions.empty()); + if (m->parent->halves == 0) { + gpr_mu_unlock(&m->parent->mu); + grpc_passthru_endpoint_destroy(m->parent); + return; + } + auto curr_action = m->parent->channel_effects->actions[0]; + m->parent->channel_effects->actions.erase( + m->parent->channel_effects->actions.begin()); + m->parent->channel_effects->allowed_read_bytes += + curr_action.add_n_readable_bytes; + m->parent->channel_effects->allowed_write_bytes += + curr_action.add_n_writable_bytes; + flush_pending_ops_locked(m, error); + flush_pending_ops_locked(other_half(m), error); + sched_next_channel_action_locked(m); + gpr_mu_unlock(&m->parent->mu); +} + +static void sched_next_channel_action_locked(half* m) { + if (m->parent->channel_effects->actions.empty()) { + m->parent->channel_effects->on_complete(); + return; + } + grpc_timer_init(&m->parent->channel_effects->timer, + m->parent->channel_effects->actions[0].wait_ms + + grpc_core::ExecCtx::Get()->Now(), + GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m, + grpc_schedule_on_exec_ctx)); +} + +void start_scheduling_grpc_passthru_endpoint_channel_effects( + grpc_endpoint* ep, + const std::vector& actions, + std::function on_complete) { + half* m = reinterpret_cast(ep); + gpr_mu_lock(&m->parent->mu); + if (!m->parent->simulate_channel_actions || m->parent->shutdown) { + gpr_mu_unlock(&m->parent->mu); + return; + } + m->parent->channel_effects->actions = actions; + m->parent->channel_effects->on_complete = std::move(on_complete); + sched_next_channel_action_locked(m); + gpr_mu_unlock(&m->parent->mu); +} diff --git a/test/core/util/passthru_endpoint.h b/test/core/util/passthru_endpoint.h index bc6535596cb..b34cbc25f9d 100644 --- a/test/core/util/passthru_endpoint.h +++ b/test/core/util/passthru_endpoint.h @@ -31,12 +31,24 @@ typedef struct { gpr_atm num_writes; } grpc_passthru_endpoint_stats; +typedef struct { + uint64_t wait_ms; + uint64_t add_n_writable_bytes; + uint64_t add_n_readable_bytes; +} grpc_passthru_endpoint_channel_action; + void grpc_passthru_endpoint_create(grpc_endpoint** client, grpc_endpoint** server, - grpc_passthru_endpoint_stats* stats); + grpc_passthru_endpoint_stats* stats, + bool simulate_channel_actions = false); grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create(); void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats); +void start_scheduling_grpc_passthru_endpoint_channel_effects( + grpc_endpoint* ep, + const std::vector& actions, + std::function on_complete); + #endif // PASSTHRU_ENDPOINT_H