diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 65ebe234d0a..e231c01a505 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -43,14 +43,6 @@ #include "test/core/end2end/fuzzers/api_fuzzer.pb.h" #include "test/core/util/passthru_endpoint.h" -#define MAX_ADVANCE_TIME_MICROS (24 * 3600 * 365 * 1000000) // 1 year -// Applicable when simulating channel actions. Prevents overflows. -#define MAX_WAIT_MS (24 * 3600 * 365 * 1000) // 1 year -// Applicable when simulating channel actions. Prevents overflows. -#define MAX_ADD_N_READABLE_BYTES (2 * 1024 * 1024) // 2GB -// Applicable when simulating channel actions. Prevents overflows. -#define MAX_ADD_N_WRITABLE_BYTES (2 * 1024 * 1024) // 2GB - //////////////////////////////////////////////////////////////////////////////// // logging @@ -66,8 +58,6 @@ static gpr_timespec g_now; static grpc_server* g_server; static grpc_channel* g_channel; static grpc_resource_quota* g_resource_quota; -static std::vector g_channel_actions; -static std::atomic g_channel_force_delete{false}; extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); @@ -156,6 +146,7 @@ grpc_ares_request* my_dns_lookup_ares_locked( static void my_cancel_ares_request_locked(grpc_ares_request* request) { GPR_ASSERT(request == nullptr); } + //////////////////////////////////////////////////////////////////////////////// // client connection @@ -181,10 +172,9 @@ static void do_connect(void* arg, grpc_error_handle error) { grpc_slice_allocator_destroy(fc->slice_allocator); grpc_endpoint* client; grpc_endpoint* server; - grpc_passthru_endpoint_create(&client, &server, nullptr, true); + grpc_passthru_endpoint_create(&client, &server, nullptr); *fc->ep = client; - start_scheduling_grpc_passthru_endpoint_channel_effects( - client, g_channel_actions, [&]() { g_channel_force_delete = true; }); + grpc_transport* transport = grpc_create_chttp2_transport( nullptr, server, false, grpc_resource_user_create(g_resource_quota, "transport-user")); @@ -285,12 +275,7 @@ enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED }; class Call : public std::enable_shared_from_this { public: - explicit Call(CallType type) : type_(type) { - grpc_metadata_array_init(&recv_initial_metadata_); - grpc_metadata_array_init(&recv_trailing_metadata_); - grpc_call_details_init(&call_details_); - } - + explicit Call(CallType type) : type_(type) {} ~Call(); CallType type() const { return type_; } @@ -422,30 +407,18 @@ class Call : public std::enable_shared_from_this { : nullptr; } break; case api_fuzzer::BatchOp::kReceiveInitialMetadata: - if (enqueued_recv_initial_metadata_) { - *batch_is_ok = false; - } else { - enqueued_recv_initial_metadata_ = true; - op.op = GRPC_OP_RECV_INITIAL_METADATA; - *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; - op.data.recv_initial_metadata.recv_initial_metadata = - &recv_initial_metadata_; - } + op.op = GRPC_OP_RECV_INITIAL_METADATA; + *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; + op.data.recv_initial_metadata.recv_initial_metadata = + &recv_initial_metadata_; break; case api_fuzzer::BatchOp::kReceiveMessage: - // Allow only one active pending_recv_message_op to exist. Otherwise if - // the previous enqueued recv_message_op is not complete by the time - // we get here, then under certain conditions, enqueing this op will - // over-write the internal call->receiving_buffer maintained by grpc - // leading to a memory leak. - if (call_closed_ || pending_recv_message_op_) { + if (call_closed_) { *batch_is_ok = false; } else { op.op = GRPC_OP_RECV_MESSAGE; *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE; - pending_recv_message_op_ = true; op.data.recv_message.recv_message = &recv_message_; - unwinders->push_back([this]() { pending_recv_message_op_ = false; }); } break; case api_fuzzer::BatchOp::kReceiveStatusOnClient: @@ -454,7 +427,6 @@ class Call : public std::enable_shared_from_this { op.data.recv_status_on_client.trailing_metadata = &recv_trailing_metadata_; op.data.recv_status_on_client.status_details = &recv_status_details_; - *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT; break; case api_fuzzer::BatchOp::kReceiveCloseOnServer: op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; @@ -470,14 +442,12 @@ class Call : public std::enable_shared_from_this { Validator* FinishedBatchValidator(uint8_t has_ops) { ++pending_ops_; auto self = shared_from_this(); - return MakeValidator([self, has_ops](bool /*success*/) { + return MakeValidator([self, has_ops](bool) { --self->pending_ops_; - if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) { - self->pending_recv_message_op_ = false; - if (self->recv_message_ != nullptr) { - grpc_byte_buffer_destroy(self->recv_message_); - self->recv_message_ = nullptr; - } + if ((has_ops & (1u << GRPC_OP_RECV_MESSAGE) && + self->recv_message_ != nullptr)) { + grpc_byte_buffer_destroy(self->recv_message_); + self->recv_message_ = nullptr; } if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) { grpc_byte_buffer_destroy(self->send_message_); @@ -518,11 +488,9 @@ class Call : public std::enable_shared_from_this { int cancelled_; int pending_ops_ = 0; bool sent_initial_metadata_ = false; - bool enqueued_recv_initial_metadata_ = false; grpc_call_details call_details_{}; grpc_byte_buffer* send_message_ = nullptr; bool call_closed_ = false; - bool pending_recv_message_op_ = false; std::vector free_pointers_; std::vector unref_slices_; @@ -548,6 +516,7 @@ Call::~Call() { if (call_ != nullptr) { grpc_call_unref(call_); } + grpc_slice_unref(recv_status_details_); grpc_call_details_destroy(&call_details_); @@ -558,11 +527,6 @@ Call::~Call() { grpc_slice_unref(s); } - if (recv_message_ != nullptr) { - grpc_byte_buffer_destroy(recv_message_); - recv_message_ = nullptr; - } - grpc_metadata_array_destroy(&recv_initial_metadata_); grpc_metadata_array_destroy(&recv_trailing_metadata_); } @@ -748,6 +712,7 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { int action_index = 0; auto no_more_actions = [&]() { action_index = msg.actions_size(); }; + auto poll_cq = [&]() -> bool { grpc_event ev = grpc_completion_queue_next( cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); @@ -790,11 +755,7 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { call->Shutdown(); } - g_now = gpr_time_add( - g_now, - gpr_time_from_seconds( - std::max(1, static_cast(MAX_WAIT_MS / 1000)), - GPR_TIMESPAN)); + g_now = gpr_time_add(g_now, gpr_time_from_seconds(1, GPR_TIMESPAN)); grpc_timer_manager_tick(); GPR_ASSERT(!poll_cq()); continue; @@ -802,12 +763,6 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_timer_manager_tick(); - if (g_channel_force_delete.exchange(false) && g_channel) { - grpc_channel_destroy(g_channel); - g_channel = nullptr; - g_channel_actions.clear(); - } - const api_fuzzer::Action& action = msg.actions(action_index); action_index++; switch (action.type_case()) { @@ -822,18 +777,12 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { // increment global time case api_fuzzer::Action::kAdvanceTime: { g_now = gpr_time_add( - g_now, gpr_time_from_micros( - std::min(static_cast(action.advance_time()), - static_cast(MAX_ADVANCE_TIME_MICROS)), - GPR_TIMESPAN)); + g_now, gpr_time_from_micros(action.advance_time(), GPR_TIMESPAN)); break; } // create an insecure channel case api_fuzzer::Action::kCreateChannel: { - if (!action.create_channel().channel_actions_size() || - g_channel != nullptr) { - no_more_actions(); - } else { + if (g_channel == nullptr) { grpc_channel_args* args = ReadArgs(action.create_channel().channel_args()); if (action.create_channel().has_channel_creds()) { @@ -846,28 +795,13 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { g_channel = grpc_insecure_channel_create( action.create_channel().target().c_str(), args, nullptr); } - g_channel_actions.clear(); - for (int i = 0; i < action.create_channel().channel_actions_size(); - i++) { - g_channel_actions.push_back({ - std::min(action.create_channel().channel_actions(i).wait_ms(), - static_cast(MAX_WAIT_MS)), - std::min(action.create_channel() - .channel_actions(i) - .add_n_bytes_writable(), - static_cast(MAX_ADD_N_WRITABLE_BYTES)), - std::min(action.create_channel() - .channel_actions(i) - .add_n_bytes_readable(), - static_cast(MAX_ADD_N_READABLE_BYTES)), - }); - } GPR_ASSERT(g_channel != nullptr); - g_channel_force_delete = false; { grpc_core::ExecCtx exec_ctx; grpc_channel_args_destroy(args); } + } else { + no_more_actions(); } break; } @@ -1026,7 +960,6 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { if (!op.has_value()) continue; ops.push_back(*op); } - if (g_channel == nullptr) ok = false; if (ok) { auto* v = active_call->FinishedBatchValidator(has_ops); @@ -1133,5 +1066,6 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) { grpc_completion_queue_destroy(cq); grpc_resource_quota_unref(g_resource_quota); + grpc_shutdown_blocking(); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.proto b/test/core/end2end/fuzzers/api_fuzzer.proto index 85001b6d8bf..84968b90ed3 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.proto +++ b/test/core/end2end/fuzzers/api_fuzzer.proto @@ -88,17 +88,10 @@ message Metadatum { ByteSlice value = 2; } -message ChannelAction { - uint64 add_n_bytes_writable = 1; - uint64 add_n_bytes_readable = 2; - uint64 wait_ms = 3; -} - message CreateChannel { string target = 1; repeated ChannelArg channel_args = 2; ChannelCreds channel_creds = 3; - repeated ChannelAction channel_actions = 4; } message CreateServer { diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming deleted file mode 100644 index 4d52068086d..00000000000 --- a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming +++ /dev/null @@ -1,618 +0,0 @@ -actions { - create_server { - } -} -actions { - create_channel { - target: "dns:server" - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - } -} -actions { - create_call { - method: { value: "/foo" } - timeout: 1000000000 - } -} -actions { - queue_batch { - operations { - send_initial_metadata {} - } - operations { - receive_initial_metadata {} - } - } -} -actions { - request_call {} -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - send_initial_metadata {} - } - } -} -actions { - poll_cq: {} -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - send_message { - message { - value: "hello world" - } - } - } - } -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - receive_message {} - } - } -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - send_message { - message { - value: "hello world" - } - } - } - } -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - receive_message {} - } - } -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - receive_status_on_client {} - } - } -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - send_status_from_server { - status_code: 0 - } - } - operations { - receive_close_on_server {} - } - } -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - queue_batch { - operations { - send_close_from_client {} - } - } -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} - diff --git a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary b/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary deleted file mode 100644 index 5c9d9d6c052..00000000000 --- a/test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary +++ /dev/null @@ -1,496 +0,0 @@ -actions { - create_server { - } -} -actions { - create_channel { - target: "dns:server" - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 20 - add_n_bytes_readable: 20 - } - channel_actions { - wait_ms: 1000 - add_n_bytes_writable: 10 - add_n_bytes_readable: 10 - } - } -} -actions { - create_call { - method: { value: "/foo" } - timeout: 1000000000 - } -} -actions { - queue_batch { - operations { - send_initial_metadata {} - } - operations { - receive_initial_metadata {} - } - operations { - receive_message {} - } - operations { - send_close_from_client {} - } - operations { - receive_status_on_client {} - } - } -} -actions { - request_call {} -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - change_active_call {} -} -actions { - queue_batch { - operations { - send_initial_metadata {} - } - operations { - send_message { - message { - value: "hello world" - } - } - } - } -} -actions { - queue_batch { - operations { - send_status_from_server { - status_code: 0 - } - } - operations { - receive_close_on_server {} - } - } -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} -actions { - poll_cq: {} -} -actions { - advance_time: 1000000 -} diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index cedc4a69582..50534f7ae9d 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -29,85 +29,29 @@ #include #include "src/core/lib/iomgr/sockaddr.h" -#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/util/resource_user_util.h" typedef struct passthru_endpoint passthru_endpoint; -typedef struct { - bool is_armed; - grpc_endpoint* ep; - grpc_slice_buffer* slices; - grpc_closure* cb; -} pending_op; - -typedef struct { - grpc_timer timer; - uint64_t allowed_write_bytes; - uint64_t allowed_read_bytes; - std::vector actions; - std::function on_complete; -} grpc_passthru_endpoint_channel_effects; - typedef struct { grpc_endpoint base; passthru_endpoint* parent; grpc_slice_buffer read_buffer; - grpc_slice_buffer write_buffer; grpc_slice_buffer* on_read_out; grpc_closure* on_read; grpc_slice_allocator* slice_allocator; - pending_op pending_read_op; - pending_op pending_write_op; - uint64_t bytes_read_so_far; - uint64_t bytes_written_so_far; } half; struct passthru_endpoint { gpr_mu mu; int halves; grpc_passthru_endpoint_stats* stats; - grpc_passthru_endpoint_channel_effects* channel_effects; - bool simulate_channel_actions; bool shutdown; half client; half server; }; -static void do_pending_read_op_locked(half* m, grpc_error_handle error) { - GPR_ASSERT(m->pending_read_op.is_armed); - GPR_ASSERT(m->bytes_read_so_far <= - m->parent->channel_effects->allowed_read_bytes); - if (m->parent->shutdown) { - grpc_core::ExecCtx::Run( - DEBUG_LOCATION, m->pending_read_op.cb, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); - grpc_slice_buffer_reset_and_unref(&m->read_buffer); - m->pending_read_op.is_armed = false; - return; - } - - if (m->bytes_read_so_far == m->parent->channel_effects->allowed_read_bytes) { - // Keep it in pending state. - return; - } - // This delayed processing should only be invoked when read_buffer has - // something in it. - GPR_ASSERT(m->read_buffer.count > 0); - uint64_t readable_length = std::min( - m->read_buffer.length, - m->parent->channel_effects->allowed_read_bytes - m->bytes_read_so_far); - GPR_ASSERT(readable_length > 0); - grpc_slice_buffer_move_first_no_ref(&m->read_buffer, readable_length, - m->pending_read_op.slices); - grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_read_op.cb, error); - if (m->parent->simulate_channel_actions) { - m->bytes_read_so_far += readable_length; - } - m->pending_read_op.is_armed = false; -} - static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, bool /*urgent*/) { half* m = reinterpret_cast(ep); @@ -117,177 +61,42 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, DEBUG_LOCATION, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); } else if (m->read_buffer.count > 0) { - GPR_ASSERT(!m->pending_read_op.is_armed); - GPR_ASSERT(!m->on_read); - m->pending_read_op.is_armed = true; - m->pending_read_op.cb = cb; - m->pending_read_op.ep = ep; - m->pending_read_op.slices = slices; - do_pending_read_op_locked(m, GRPC_ERROR_NONE); + grpc_slice_buffer_swap(&m->read_buffer, slices); + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE); } else { - GPR_ASSERT(!m->pending_read_op.is_armed); m->on_read = cb; m->on_read_out = slices; } gpr_mu_unlock(&m->parent->mu); } -// Copy src slice and split the copy at n bytes into two separate slices -void grpc_slice_copy_split(grpc_slice src, size_t n, grpc_slice& split1, - grpc_slice& split2) { - GPR_ASSERT(n <= GRPC_SLICE_LENGTH(src)); - if (n == GRPC_SLICE_LENGTH(src)) { - split1 = grpc_slice_copy(src); - split2 = grpc_empty_slice(); - return; - } - split1 = GRPC_SLICE_MALLOC(n); - memcpy(GRPC_SLICE_START_PTR(split1), GRPC_SLICE_START_PTR(src), n); - split2 = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(src) - n); - memcpy(GRPC_SLICE_START_PTR(split2), GRPC_SLICE_START_PTR(src) + n, - GRPC_SLICE_LENGTH(src) - n); -} - static half* other_half(half* h) { if (h == &h->parent->client) return &h->parent->server; return &h->parent->client; } -static void do_pending_write_op_locked(half* m, grpc_error_handle error) { - GPR_ASSERT(m->pending_write_op.is_armed); - GPR_ASSERT(m->bytes_written_so_far <= - m->parent->channel_effects->allowed_write_bytes); - if (m->parent->shutdown) { - grpc_core::ExecCtx::Run( - DEBUG_LOCATION, m->pending_write_op.cb, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); - m->pending_write_op.is_armed = false; - grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices); - return; - } - if (m->bytes_written_so_far == - m->parent->channel_effects->allowed_write_bytes) { - // Keep it in pending state. - return; - } - - half* other = other_half(m); - uint64_t max_writable = - std::min(m->pending_write_op.slices->length, - m->parent->channel_effects->allowed_write_bytes - - m->bytes_written_so_far); - uint64_t max_readable = other->parent->channel_effects->allowed_read_bytes - - other->bytes_read_so_far; - uint64_t immediate_bytes_read = - other->on_read != nullptr ? std::min(max_readable, max_writable) - : 0; - - GPR_ASSERT(max_writable > 0); - GPR_ASSERT(max_readable >= 0); - // At the end of this process, we should have written max_writable bytes; - if (m->parent->simulate_channel_actions) { - m->bytes_written_so_far += max_writable; - } - // Estimate if the original write would still be pending at the end of this - // process - bool would_write_be_pending = - max_writable < m->pending_write_op.slices->length; - if (!m->parent->simulate_channel_actions) { - GPR_ASSERT(!would_write_be_pending); - } - grpc_slice_buffer* slices = m->pending_write_op.slices; - grpc_slice_buffer* dest = - other->on_read != nullptr ? other->on_read_out : &other->read_buffer; - while (max_writable > 0) { - grpc_slice slice = grpc_slice_buffer_take_first(slices); - uint64_t slice_length = GPR_SLICE_LENGTH(slice); - GPR_ASSERT(slice_length > 0); - grpc_slice split1, split2; - ssize_t split_length = 0; - if (slice_length <= max_readable) { - split_length = std::min(slice_length, max_writable); - } else if (max_readable > 0) { - // slice_length > max_readable - split_length = std::min(max_readable, max_writable); - } else { - // slice_length still > max_readable but max_readable is 0. - // In this case put the bytes into other->read_buffer. During a future - // read if max_readable still remains zero at the time of read, the - // pending read logic will kick in. - dest = &other->read_buffer; - split_length = std::min(slice_length, max_writable); - } - - grpc_slice_copy_split(slice, split_length, split1, split2); - grpc_slice_unref_internal(slice); - // Write a copy of the slice to the destination to be read - grpc_slice_buffer_add_indexed(dest, split1); - // Re-insert split2 into source for next iteration. - if (GPR_SLICE_LENGTH(split2) > 0) { - grpc_slice_buffer_undo_take_first(slices, split2); - } else { - grpc_slice_unref_internal(split2); - } - - if (max_readable > 0) { - GPR_ASSERT(max_readable >= static_cast(split_length)); - max_readable -= split_length; - } - - GPR_ASSERT(max_writable >= static_cast(split_length)); - max_writable -= split_length; - } - if (immediate_bytes_read > 0) { - GPR_ASSERT(!other->pending_read_op.is_armed); - if (m->parent->simulate_channel_actions) { - other->bytes_read_so_far += immediate_bytes_read; - } - grpc_core::ExecCtx::Run(DEBUG_LOCATION, other->on_read, error); - other->on_read = nullptr; - } - - if (!would_write_be_pending) { - // No slices should be left - GPR_ASSERT(m->pending_write_op.slices->count == 0); - grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices); - m->pending_write_op.is_armed = false; - grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_write_op.cb, error); - } -} - static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* /*arg*/) { - half* m = reinterpret_cast(ep); + half* m = other_half(reinterpret_cast(ep)); gpr_mu_lock(&m->parent->mu); + grpc_error_handle error = GRPC_ERROR_NONE; gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1); if (m->parent->shutdown) { - grpc_core::ExecCtx::Run( - DEBUG_LOCATION, cb, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown")); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); + } else if (m->on_read != nullptr) { + for (size_t i = 0; i < slices->count; i++) { + grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i])); + } + grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE); + m->on_read = nullptr; } else { - GPR_ASSERT(!m->pending_write_op.is_armed); - m->pending_write_op.is_armed = true; - m->pending_write_op.cb = cb; - m->pending_write_op.ep = ep; - // Copy slices into m->pending_write_op.slices - m->pending_write_op.slices = &m->write_buffer; - GPR_ASSERT(m->pending_write_op.slices->count == 0); - for (int i = 0; i < static_cast(slices->count); i++) { - grpc_slice_buffer_add_indexed(m->pending_write_op.slices, - grpc_slice_copy(slices->slices[i])); + for (size_t i = 0; i < slices->count; i++) { + grpc_slice_buffer_add(&m->read_buffer, + grpc_slice_copy(slices->slices[i])); } - do_pending_write_op_locked(m, GRPC_ERROR_NONE); } gpr_mu_unlock(&m->parent->mu); -} - -void flush_pending_ops_locked(half* m, grpc_error_handle error) { - if (m->pending_read_op.is_armed) { - do_pending_read_op_locked(m, error); - } - if (m->pending_write_op.is_armed) { - do_pending_write_op_locked(m, error); - } + grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void me_add_to_pollset(grpc_endpoint* /*ep*/, @@ -303,7 +112,6 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { half* m = reinterpret_cast(ep); gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; - flush_pending_ops_locked(m, GRPC_ERROR_NONE); if (m->on_read) { grpc_core::ExecCtx::Run( DEBUG_LOCATION, m->on_read, @@ -311,7 +119,6 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { m->on_read = nullptr; } m = other_half(m); - flush_pending_ops_locked(m, GRPC_ERROR_NONE); if (m->on_read) { grpc_core::ExecCtx::Run( DEBUG_LOCATION, m->on_read, @@ -322,30 +129,19 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) { GRPC_ERROR_UNREF(why); } -void grpc_passthru_endpoint_destroy(passthru_endpoint* p) { - gpr_mu_destroy(&p->mu); - grpc_passthru_endpoint_stats_destroy(p->stats); - delete p->channel_effects; - grpc_slice_buffer_destroy_internal(&p->client.read_buffer); - grpc_slice_buffer_destroy_internal(&p->server.read_buffer); - grpc_slice_allocator_destroy(p->client.slice_allocator); - grpc_slice_allocator_destroy(p->server.slice_allocator); - grpc_slice_buffer_destroy_internal(&p->client.write_buffer); - grpc_slice_buffer_destroy_internal(&p->server.write_buffer); - gpr_free(p); -} - static void me_destroy(grpc_endpoint* ep) { passthru_endpoint* p = (reinterpret_cast(ep))->parent; gpr_mu_lock(&p->mu); - if (0 == --p->halves && p->channel_effects->actions.empty()) { - // no pending channel actions exist + if (0 == --p->halves) { gpr_mu_unlock(&p->mu); - grpc_passthru_endpoint_destroy(p); + gpr_mu_destroy(&p->mu); + grpc_passthru_endpoint_stats_destroy(p->stats); + grpc_slice_buffer_destroy_internal(&p->client.read_buffer); + grpc_slice_buffer_destroy_internal(&p->server.read_buffer); + grpc_slice_allocator_destroy(p->client.slice_allocator); + grpc_slice_allocator_destroy(p->server.slice_allocator); + gpr_free(p); } else { - if (p->halves == 0 && p->simulate_channel_actions) { - grpc_timer_cancel(&p->channel_effects->timer); - } gpr_mu_unlock(&p->mu); } } @@ -388,13 +184,7 @@ static void half_init(half* m, passthru_endpoint* parent, m->base.vtable = &vtable; m->parent = parent; grpc_slice_buffer_init(&m->read_buffer); - grpc_slice_buffer_init(&m->write_buffer); - m->pending_write_op.slices = nullptr; m->on_read = nullptr; - m->bytes_read_so_far = 0; - m->bytes_written_so_far = 0; - m->pending_write_op.is_armed = false; - m->pending_read_op.is_armed = false; std::string name = absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent); m->slice_allocator = slice_allocator; @@ -402,8 +192,7 @@ static void half_init(half* m, passthru_endpoint* parent, void grpc_passthru_endpoint_create(grpc_endpoint** client, grpc_endpoint** server, - grpc_passthru_endpoint_stats* stats, - bool simulate_channel_actions) { + grpc_passthru_endpoint_stats* stats) { passthru_endpoint* m = static_cast(gpr_malloc(sizeof(*m))); m->halves = 2; @@ -414,12 +203,6 @@ void grpc_passthru_endpoint_create(grpc_endpoint** client, gpr_ref(&stats->refs); m->stats = stats; } - m->channel_effects = new grpc_passthru_endpoint_channel_effects(); - m->simulate_channel_actions = simulate_channel_actions; - if (!simulate_channel_actions) { - m->channel_effects->allowed_read_bytes = UINT64_MAX; - m->channel_effects->allowed_write_bytes = UINT64_MAX; - } half_init(&m->client, m, grpc_slice_allocator_create_unlimited(), "client"); half_init(&m->server, m, grpc_slice_allocator_create_unlimited(), "server"); gpr_mu_init(&m->mu); @@ -441,55 +224,3 @@ void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) { gpr_free(stats); } } - -static void sched_next_channel_action_locked(half* m); - -static void do_next_sched_channel_action(void* arg, grpc_error_handle error) { - half* m = reinterpret_cast(arg); - gpr_mu_lock(&m->parent->mu); - GPR_ASSERT(!m->parent->channel_effects->actions.empty()); - if (m->parent->halves == 0) { - gpr_mu_unlock(&m->parent->mu); - grpc_passthru_endpoint_destroy(m->parent); - return; - } - auto curr_action = m->parent->channel_effects->actions[0]; - m->parent->channel_effects->actions.erase( - m->parent->channel_effects->actions.begin()); - m->parent->channel_effects->allowed_read_bytes += - curr_action.add_n_readable_bytes; - m->parent->channel_effects->allowed_write_bytes += - curr_action.add_n_writable_bytes; - flush_pending_ops_locked(m, error); - flush_pending_ops_locked(other_half(m), error); - sched_next_channel_action_locked(m); - gpr_mu_unlock(&m->parent->mu); -} - -static void sched_next_channel_action_locked(half* m) { - if (m->parent->channel_effects->actions.empty()) { - m->parent->channel_effects->on_complete(); - return; - } - grpc_timer_init(&m->parent->channel_effects->timer, - m->parent->channel_effects->actions[0].wait_ms + - grpc_core::ExecCtx::Get()->Now(), - GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m, - grpc_schedule_on_exec_ctx)); -} - -void start_scheduling_grpc_passthru_endpoint_channel_effects( - grpc_endpoint* ep, - const std::vector& actions, - std::function on_complete) { - half* m = reinterpret_cast(ep); - gpr_mu_lock(&m->parent->mu); - if (!m->parent->simulate_channel_actions || m->parent->shutdown) { - gpr_mu_unlock(&m->parent->mu); - return; - } - m->parent->channel_effects->actions = actions; - m->parent->channel_effects->on_complete = std::move(on_complete); - sched_next_channel_action_locked(m); - gpr_mu_unlock(&m->parent->mu); -} diff --git a/test/core/util/passthru_endpoint.h b/test/core/util/passthru_endpoint.h index b34cbc25f9d..bc6535596cb 100644 --- a/test/core/util/passthru_endpoint.h +++ b/test/core/util/passthru_endpoint.h @@ -31,24 +31,12 @@ typedef struct { gpr_atm num_writes; } grpc_passthru_endpoint_stats; -typedef struct { - uint64_t wait_ms; - uint64_t add_n_writable_bytes; - uint64_t add_n_readable_bytes; -} grpc_passthru_endpoint_channel_action; - void grpc_passthru_endpoint_create(grpc_endpoint** client, grpc_endpoint** server, - grpc_passthru_endpoint_stats* stats, - bool simulate_channel_actions = false); + grpc_passthru_endpoint_stats* stats); grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create(); void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats); -void start_scheduling_grpc_passthru_endpoint_channel_effects( - grpc_endpoint* ep, - const std::vector& actions, - std::function on_complete); - #endif // PASSTHRU_ENDPOINT_H