Revert "Revert "Api fuzzer extensions to support simulating traffic congestion (#27820)" (#27973)" (#27974)

* Revert "Revert "Api fuzzer extensions to support simulating traffic congestion (#27820)" (#27973)"

This reverts commit 879f97ef70.

* updating passthru_endpoint file to fix windows breakages

* Automated change: Fix sanity tests

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/28123/head
Vignesh Babu 3 years ago committed by GitHub
parent d6214cbf5a
commit 4dd0bba12e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 107
      test/core/end2end/fuzzers/api_fuzzer.cc
  2. 7
      test/core/end2end/fuzzers/api_fuzzer.proto
  3. 618
      test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-streaming
  4. 496
      test/core/end2end/fuzzers/api_fuzzer_corpus/call-complete-unary
  5. 313
      test/core/util/passthru_endpoint.cc
  6. 14
      test/core/util/passthru_endpoint.h

@ -43,6 +43,14 @@
#include "test/core/end2end/fuzzers/api_fuzzer.pb.h" #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
#include "test/core/util/passthru_endpoint.h" #include "test/core/util/passthru_endpoint.h"
#define MAX_ADVANCE_TIME_MICROS (24 * 3600 * 365 * 1000000) // 1 year
// Applicable when simulating channel actions. Prevents overflows.
#define MAX_WAIT_MS (24 * 3600 * 365 * 1000) // 1 year
// Applicable when simulating channel actions. Prevents overflows.
#define MAX_ADD_N_READABLE_BYTES (2 * 1024 * 1024) // 2GB
// Applicable when simulating channel actions. Prevents overflows.
#define MAX_ADD_N_WRITABLE_BYTES (2 * 1024 * 1024) // 2GB
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// logging // logging
@ -58,6 +66,8 @@ static gpr_timespec g_now;
static grpc_server* g_server; static grpc_server* g_server;
static grpc_channel* g_channel; static grpc_channel* g_channel;
static grpc_resource_quota* g_resource_quota; static grpc_resource_quota* g_resource_quota;
static std::vector<grpc_passthru_endpoint_channel_action> g_channel_actions;
static std::atomic<bool> g_channel_force_delete{false};
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
@ -146,7 +156,6 @@ grpc_ares_request* my_dns_lookup_ares_locked(
static void my_cancel_ares_request_locked(grpc_ares_request* request) { static void my_cancel_ares_request_locked(grpc_ares_request* request) {
GPR_ASSERT(request == nullptr); GPR_ASSERT(request == nullptr);
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// client connection // client connection
@ -168,8 +177,10 @@ static void do_connect(void* arg, grpc_error_handle error) {
} else if (g_server != nullptr) { } else if (g_server != nullptr) {
grpc_endpoint* client; grpc_endpoint* client;
grpc_endpoint* server; grpc_endpoint* server;
grpc_passthru_endpoint_create(&client, &server, nullptr); grpc_passthru_endpoint_create(&client, &server, nullptr, true);
*fc->ep = client; *fc->ep = client;
start_scheduling_grpc_passthru_endpoint_channel_effects(
client, g_channel_actions, [&]() { g_channel_force_delete = true; });
grpc_core::Server* core_server = grpc_core::Server::FromC(g_server); grpc_core::Server* core_server = grpc_core::Server::FromC(g_server);
grpc_transport* transport = grpc_create_chttp2_transport( grpc_transport* transport = grpc_create_chttp2_transport(
@ -267,7 +278,12 @@ enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
class Call : public std::enable_shared_from_this<Call> { class Call : public std::enable_shared_from_this<Call> {
public: public:
explicit Call(CallType type) : type_(type) {} explicit Call(CallType type) : type_(type) {
grpc_metadata_array_init(&recv_initial_metadata_);
grpc_metadata_array_init(&recv_trailing_metadata_);
grpc_call_details_init(&call_details_);
}
~Call(); ~Call();
CallType type() const { return type_; } CallType type() const { return type_; }
@ -402,18 +418,30 @@ class Call : public std::enable_shared_from_this<Call> {
: nullptr; : nullptr;
} break; } break;
case api_fuzzer::BatchOp::kReceiveInitialMetadata: case api_fuzzer::BatchOp::kReceiveInitialMetadata:
op.op = GRPC_OP_RECV_INITIAL_METADATA; if (enqueued_recv_initial_metadata_) {
*batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA; *batch_is_ok = false;
op.data.recv_initial_metadata.recv_initial_metadata = } else {
&recv_initial_metadata_; enqueued_recv_initial_metadata_ = true;
op.op = GRPC_OP_RECV_INITIAL_METADATA;
*batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata.recv_initial_metadata =
&recv_initial_metadata_;
}
break; break;
case api_fuzzer::BatchOp::kReceiveMessage: case api_fuzzer::BatchOp::kReceiveMessage:
if (call_closed_) { // Allow only one active pending_recv_message_op to exist. Otherwise if
// the previous enqueued recv_message_op is not complete by the time
// we get here, then under certain conditions, enqueing this op will
// over-write the internal call->receiving_buffer maintained by grpc
// leading to a memory leak.
if (call_closed_ || pending_recv_message_op_) {
*batch_is_ok = false; *batch_is_ok = false;
} else { } else {
op.op = GRPC_OP_RECV_MESSAGE; op.op = GRPC_OP_RECV_MESSAGE;
*batch_ops |= 1 << GRPC_OP_RECV_MESSAGE; *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
pending_recv_message_op_ = true;
op.data.recv_message.recv_message = &recv_message_; op.data.recv_message.recv_message = &recv_message_;
unwinders->push_back([this]() { pending_recv_message_op_ = false; });
} }
break; break;
case api_fuzzer::BatchOp::kReceiveStatusOnClient: case api_fuzzer::BatchOp::kReceiveStatusOnClient:
@ -422,6 +450,7 @@ class Call : public std::enable_shared_from_this<Call> {
op.data.recv_status_on_client.trailing_metadata = op.data.recv_status_on_client.trailing_metadata =
&recv_trailing_metadata_; &recv_trailing_metadata_;
op.data.recv_status_on_client.status_details = &recv_status_details_; op.data.recv_status_on_client.status_details = &recv_status_details_;
*batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
break; break;
case api_fuzzer::BatchOp::kReceiveCloseOnServer: case api_fuzzer::BatchOp::kReceiveCloseOnServer:
op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
@ -437,12 +466,14 @@ class Call : public std::enable_shared_from_this<Call> {
Validator* FinishedBatchValidator(uint8_t has_ops) { Validator* FinishedBatchValidator(uint8_t has_ops) {
++pending_ops_; ++pending_ops_;
auto self = shared_from_this(); auto self = shared_from_this();
return MakeValidator([self, has_ops](bool) { return MakeValidator([self, has_ops](bool /*success*/) {
--self->pending_ops_; --self->pending_ops_;
if ((has_ops & (1u << GRPC_OP_RECV_MESSAGE) && if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
self->recv_message_ != nullptr)) { self->pending_recv_message_op_ = false;
grpc_byte_buffer_destroy(self->recv_message_); if (self->recv_message_ != nullptr) {
self->recv_message_ = nullptr; grpc_byte_buffer_destroy(self->recv_message_);
self->recv_message_ = nullptr;
}
} }
if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) { if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
grpc_byte_buffer_destroy(self->send_message_); grpc_byte_buffer_destroy(self->send_message_);
@ -483,9 +514,11 @@ class Call : public std::enable_shared_from_this<Call> {
int cancelled_; int cancelled_;
int pending_ops_ = 0; int pending_ops_ = 0;
bool sent_initial_metadata_ = false; bool sent_initial_metadata_ = false;
bool enqueued_recv_initial_metadata_ = false;
grpc_call_details call_details_{}; grpc_call_details call_details_{};
grpc_byte_buffer* send_message_ = nullptr; grpc_byte_buffer* send_message_ = nullptr;
bool call_closed_ = false; bool call_closed_ = false;
bool pending_recv_message_op_ = false;
std::vector<void*> free_pointers_; std::vector<void*> free_pointers_;
std::vector<grpc_slice> unref_slices_; std::vector<grpc_slice> unref_slices_;
@ -511,7 +544,6 @@ Call::~Call() {
if (call_ != nullptr) { if (call_ != nullptr) {
grpc_call_unref(call_); grpc_call_unref(call_);
} }
grpc_slice_unref(recv_status_details_); grpc_slice_unref(recv_status_details_);
grpc_call_details_destroy(&call_details_); grpc_call_details_destroy(&call_details_);
@ -522,6 +554,11 @@ Call::~Call() {
grpc_slice_unref(s); grpc_slice_unref(s);
} }
if (recv_message_ != nullptr) {
grpc_byte_buffer_destroy(recv_message_);
recv_message_ = nullptr;
}
grpc_metadata_array_destroy(&recv_initial_metadata_); grpc_metadata_array_destroy(&recv_initial_metadata_);
grpc_metadata_array_destroy(&recv_trailing_metadata_); grpc_metadata_array_destroy(&recv_trailing_metadata_);
} }
@ -707,7 +744,6 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
int action_index = 0; int action_index = 0;
auto no_more_actions = [&]() { action_index = msg.actions_size(); }; auto no_more_actions = [&]() { action_index = msg.actions_size(); };
auto poll_cq = [&]() -> bool { auto poll_cq = [&]() -> bool {
grpc_event ev = grpc_completion_queue_next( grpc_event ev = grpc_completion_queue_next(
cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr); cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
@ -750,7 +786,11 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
call->Shutdown(); call->Shutdown();
} }
g_now = gpr_time_add(g_now, gpr_time_from_seconds(1, GPR_TIMESPAN)); g_now = gpr_time_add(
g_now,
gpr_time_from_seconds(
std::max<int64_t>(1, static_cast<int64_t>(MAX_WAIT_MS / 1000)),
GPR_TIMESPAN));
grpc_timer_manager_tick(); grpc_timer_manager_tick();
GPR_ASSERT(!poll_cq()); GPR_ASSERT(!poll_cq());
continue; continue;
@ -758,6 +798,12 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_timer_manager_tick(); grpc_timer_manager_tick();
if (g_channel_force_delete.exchange(false) && g_channel) {
grpc_channel_destroy(g_channel);
g_channel = nullptr;
g_channel_actions.clear();
}
const api_fuzzer::Action& action = msg.actions(action_index); const api_fuzzer::Action& action = msg.actions(action_index);
action_index++; action_index++;
switch (action.type_case()) { switch (action.type_case()) {
@ -772,12 +818,18 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
// increment global time // increment global time
case api_fuzzer::Action::kAdvanceTime: { case api_fuzzer::Action::kAdvanceTime: {
g_now = gpr_time_add( g_now = gpr_time_add(
g_now, gpr_time_from_micros(action.advance_time(), GPR_TIMESPAN)); g_now, gpr_time_from_micros(
std::min(static_cast<uint64_t>(action.advance_time()),
static_cast<uint64_t>(MAX_ADVANCE_TIME_MICROS)),
GPR_TIMESPAN));
break; break;
} }
// create an insecure channel // create an insecure channel
case api_fuzzer::Action::kCreateChannel: { case api_fuzzer::Action::kCreateChannel: {
if (g_channel == nullptr) { if (!action.create_channel().channel_actions_size() ||
g_channel != nullptr) {
no_more_actions();
} else {
grpc_channel_args* args = grpc_channel_args* args =
ReadArgs(action.create_channel().channel_args()); ReadArgs(action.create_channel().channel_args());
if (action.create_channel().has_channel_creds()) { if (action.create_channel().has_channel_creds()) {
@ -790,13 +842,26 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
g_channel = grpc_insecure_channel_create( g_channel = grpc_insecure_channel_create(
action.create_channel().target().c_str(), args, nullptr); action.create_channel().target().c_str(), args, nullptr);
} }
g_channel_actions.clear();
for (int i = 0; i < action.create_channel().channel_actions_size();
i++) {
const api_fuzzer::ChannelAction& channel_action =
action.create_channel().channel_actions(i);
g_channel_actions.push_back({
std::min(channel_action.wait_ms(),
static_cast<uint64_t>(MAX_WAIT_MS)),
std::min(channel_action.add_n_bytes_writable(),
static_cast<uint64_t>(MAX_ADD_N_WRITABLE_BYTES)),
std::min(channel_action.add_n_bytes_readable(),
static_cast<uint64_t>(MAX_ADD_N_READABLE_BYTES)),
});
}
GPR_ASSERT(g_channel != nullptr); GPR_ASSERT(g_channel != nullptr);
g_channel_force_delete = false;
{ {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_channel_args_destroy(args); grpc_channel_args_destroy(args);
} }
} else {
no_more_actions();
} }
break; break;
} }
@ -955,6 +1020,7 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
if (!op.has_value()) continue; if (!op.has_value()) continue;
ops.push_back(*op); ops.push_back(*op);
} }
if (g_channel == nullptr) ok = false; if (g_channel == nullptr) ok = false;
if (ok) { if (ok) {
auto* v = active_call->FinishedBatchValidator(has_ops); auto* v = active_call->FinishedBatchValidator(has_ops);
@ -1061,6 +1127,5 @@ DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
grpc_completion_queue_destroy(cq); grpc_completion_queue_destroy(cq);
grpc_resource_quota_unref(g_resource_quota); grpc_resource_quota_unref(g_resource_quota);
grpc_shutdown_blocking(); grpc_shutdown_blocking();
} }

@ -88,10 +88,17 @@ message Metadatum {
ByteSlice value = 2; ByteSlice value = 2;
} }
message ChannelAction {
uint64 add_n_bytes_writable = 1;
uint64 add_n_bytes_readable = 2;
uint64 wait_ms = 3;
}
message CreateChannel { message CreateChannel {
string target = 1; string target = 1;
repeated ChannelArg channel_args = 2; repeated ChannelArg channel_args = 2;
ChannelCreds channel_creds = 3; ChannelCreds channel_creds = 3;
repeated ChannelAction channel_actions = 4;
} }
message CreateServer { message CreateServer {

@ -0,0 +1,618 @@
actions {
create_server {
}
}
actions {
create_channel {
target: "dns:server"
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
}
}
actions {
create_call {
method: { value: "/foo" }
timeout: 1000000000
}
}
actions {
queue_batch {
operations {
send_initial_metadata {}
}
operations {
receive_initial_metadata {}
}
}
}
actions {
request_call {}
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
send_initial_metadata {}
}
}
}
actions {
poll_cq: {}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
send_message {
message {
value: "hello world"
}
}
}
}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
receive_message {}
}
}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
send_message {
message {
value: "hello world"
}
}
}
}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
receive_message {}
}
}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
receive_status_on_client {}
}
}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
send_status_from_server {
status_code: 0
}
}
operations {
receive_close_on_server {}
}
}
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
queue_batch {
operations {
send_close_from_client {}
}
}
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}

@ -0,0 +1,496 @@
actions {
create_server {
}
}
actions {
create_channel {
target: "dns:server"
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 20
add_n_bytes_readable: 20
}
channel_actions {
wait_ms: 1000
add_n_bytes_writable: 10
add_n_bytes_readable: 10
}
}
}
actions {
create_call {
method: { value: "/foo" }
timeout: 1000000000
}
}
actions {
queue_batch {
operations {
send_initial_metadata {}
}
operations {
receive_initial_metadata {}
}
operations {
receive_message {}
}
operations {
send_close_from_client {}
}
operations {
receive_status_on_client {}
}
}
}
actions {
request_call {}
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
change_active_call {}
}
actions {
queue_batch {
operations {
send_initial_metadata {}
}
operations {
send_message {
message {
value: "hello world"
}
}
}
}
}
actions {
queue_batch {
operations {
send_status_from_server {
status_code: 0
}
}
operations {
receive_close_on_server {}
}
}
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}
actions {
poll_cq: {}
}
actions {
advance_time: 1000000
}

@ -29,27 +29,83 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
typedef struct passthru_endpoint passthru_endpoint; typedef struct passthru_endpoint passthru_endpoint;
typedef struct {
bool is_armed;
grpc_endpoint* ep;
grpc_slice_buffer* slices;
grpc_closure* cb;
} pending_op;
typedef struct {
grpc_timer timer;
uint64_t allowed_write_bytes;
uint64_t allowed_read_bytes;
std::vector<grpc_passthru_endpoint_channel_action> actions;
std::function<void()> on_complete;
} grpc_passthru_endpoint_channel_effects;
typedef struct { typedef struct {
grpc_endpoint base; grpc_endpoint base;
passthru_endpoint* parent; passthru_endpoint* parent;
grpc_slice_buffer read_buffer; grpc_slice_buffer read_buffer;
grpc_slice_buffer write_buffer;
grpc_slice_buffer* on_read_out; grpc_slice_buffer* on_read_out;
grpc_closure* on_read; grpc_closure* on_read;
pending_op pending_read_op;
pending_op pending_write_op;
uint64_t bytes_read_so_far;
uint64_t bytes_written_so_far;
} half; } half;
struct passthru_endpoint { struct passthru_endpoint {
gpr_mu mu; gpr_mu mu;
int halves; int halves;
grpc_passthru_endpoint_stats* stats; grpc_passthru_endpoint_stats* stats;
grpc_passthru_endpoint_channel_effects* channel_effects;
bool simulate_channel_actions;
bool shutdown; bool shutdown;
half client; half client;
half server; half server;
}; };
static void do_pending_read_op_locked(half* m, grpc_error_handle error) {
GPR_ASSERT(m->pending_read_op.is_armed);
GPR_ASSERT(m->bytes_read_so_far <=
m->parent->channel_effects->allowed_read_bytes);
if (m->parent->shutdown) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->pending_read_op.cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
grpc_slice_buffer_reset_and_unref(&m->read_buffer);
m->pending_read_op.is_armed = false;
return;
}
if (m->bytes_read_so_far == m->parent->channel_effects->allowed_read_bytes) {
// Keep it in pending state.
return;
}
// This delayed processing should only be invoked when read_buffer has
// something in it.
GPR_ASSERT(m->read_buffer.count > 0);
uint64_t readable_length = std::min<uint64_t>(
m->read_buffer.length,
m->parent->channel_effects->allowed_read_bytes - m->bytes_read_so_far);
GPR_ASSERT(readable_length > 0);
grpc_slice_buffer_move_first_no_ref(&m->read_buffer, readable_length,
m->pending_read_op.slices);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_read_op.cb, error);
if (m->parent->simulate_channel_actions) {
m->bytes_read_so_far += readable_length;
}
m->pending_read_op.is_armed = false;
}
static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/) { grpc_closure* cb, bool /*urgent*/) {
half* m = reinterpret_cast<half*>(ep); half* m = reinterpret_cast<half*>(ep);
@ -59,42 +115,177 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
DEBUG_LOCATION, cb, DEBUG_LOCATION, cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
} else if (m->read_buffer.count > 0) { } else if (m->read_buffer.count > 0) {
grpc_slice_buffer_swap(&m->read_buffer, slices); GPR_ASSERT(!m->pending_read_op.is_armed);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, GRPC_ERROR_NONE); GPR_ASSERT(!m->on_read);
m->pending_read_op.is_armed = true;
m->pending_read_op.cb = cb;
m->pending_read_op.ep = ep;
m->pending_read_op.slices = slices;
do_pending_read_op_locked(m, GRPC_ERROR_NONE);
} else { } else {
GPR_ASSERT(!m->pending_read_op.is_armed);
m->on_read = cb; m->on_read = cb;
m->on_read_out = slices; m->on_read_out = slices;
} }
gpr_mu_unlock(&m->parent->mu); gpr_mu_unlock(&m->parent->mu);
} }
// Copy src slice and split the copy at n bytes into two separate slices
void grpc_slice_copy_split(grpc_slice src, uint64_t n, grpc_slice& split1,
grpc_slice& split2) {
GPR_ASSERT(n <= GRPC_SLICE_LENGTH(src));
if (n == GRPC_SLICE_LENGTH(src)) {
split1 = grpc_slice_copy(src);
split2 = grpc_empty_slice();
return;
}
split1 = GRPC_SLICE_MALLOC(n);
memcpy(GRPC_SLICE_START_PTR(split1), GRPC_SLICE_START_PTR(src), n);
split2 = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(src) - n);
memcpy(GRPC_SLICE_START_PTR(split2), GRPC_SLICE_START_PTR(src) + n,
GRPC_SLICE_LENGTH(src) - n);
}
static half* other_half(half* h) { static half* other_half(half* h) {
if (h == &h->parent->client) return &h->parent->server; if (h == &h->parent->client) return &h->parent->server;
return &h->parent->client; return &h->parent->client;
} }
static void do_pending_write_op_locked(half* m, grpc_error_handle error) {
GPR_ASSERT(m->pending_write_op.is_armed);
GPR_ASSERT(m->bytes_written_so_far <=
m->parent->channel_effects->allowed_write_bytes);
if (m->parent->shutdown) {
grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->pending_write_op.cb,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already shutdown"));
m->pending_write_op.is_armed = false;
grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
return;
}
if (m->bytes_written_so_far ==
m->parent->channel_effects->allowed_write_bytes) {
// Keep it in pending state.
return;
}
half* other = other_half(m);
uint64_t max_writable =
std::min<uint64_t>(m->pending_write_op.slices->length,
m->parent->channel_effects->allowed_write_bytes -
m->bytes_written_so_far);
uint64_t max_readable = other->parent->channel_effects->allowed_read_bytes -
other->bytes_read_so_far;
uint64_t immediate_bytes_read =
other->on_read != nullptr ? std::min<uint64_t>(max_readable, max_writable)
: 0;
GPR_ASSERT(max_writable > 0);
GPR_ASSERT(max_readable >= 0);
// At the end of this process, we should have written max_writable bytes;
if (m->parent->simulate_channel_actions) {
m->bytes_written_so_far += max_writable;
}
// Estimate if the original write would still be pending at the end of this
// process
bool would_write_be_pending =
max_writable < m->pending_write_op.slices->length;
if (!m->parent->simulate_channel_actions) {
GPR_ASSERT(!would_write_be_pending);
}
grpc_slice_buffer* slices = m->pending_write_op.slices;
grpc_slice_buffer* dest =
other->on_read != nullptr ? other->on_read_out : &other->read_buffer;
while (max_writable > 0) {
grpc_slice slice = grpc_slice_buffer_take_first(slices);
uint64_t slice_length = GPR_SLICE_LENGTH(slice);
GPR_ASSERT(slice_length > 0);
grpc_slice split1, split2;
uint64_t split_length = 0;
if (slice_length <= max_readable) {
split_length = std::min<uint64_t>(slice_length, max_writable);
} else if (max_readable > 0) {
// slice_length > max_readable
split_length = std::min<uint64_t>(max_readable, max_writable);
} else {
// slice_length still > max_readable but max_readable is 0.
// In this case put the bytes into other->read_buffer. During a future
// read if max_readable still remains zero at the time of read, the
// pending read logic will kick in.
dest = &other->read_buffer;
split_length = std::min<uint64_t>(slice_length, max_writable);
}
grpc_slice_copy_split(slice, split_length, split1, split2);
grpc_slice_unref_internal(slice);
// Write a copy of the slice to the destination to be read
grpc_slice_buffer_add_indexed(dest, split1);
// Re-insert split2 into source for next iteration.
if (GPR_SLICE_LENGTH(split2) > 0) {
grpc_slice_buffer_undo_take_first(slices, split2);
} else {
grpc_slice_unref_internal(split2);
}
if (max_readable > 0) {
GPR_ASSERT(max_readable >= static_cast<uint64_t>(split_length));
max_readable -= split_length;
}
GPR_ASSERT(max_writable >= static_cast<uint64_t>(split_length));
max_writable -= split_length;
}
if (immediate_bytes_read > 0) {
GPR_ASSERT(!other->pending_read_op.is_armed);
if (m->parent->simulate_channel_actions) {
other->bytes_read_so_far += immediate_bytes_read;
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, other->on_read, error);
other->on_read = nullptr;
}
if (!would_write_be_pending) {
// No slices should be left
GPR_ASSERT(m->pending_write_op.slices->count == 0);
grpc_slice_buffer_reset_and_unref(m->pending_write_op.slices);
m->pending_write_op.is_armed = false;
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->pending_write_op.cb, error);
}
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* /*arg*/) { grpc_closure* cb, void* /*arg*/) {
half* m = other_half(reinterpret_cast<half*>(ep)); half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu); gpr_mu_lock(&m->parent->mu);
grpc_error_handle error = GRPC_ERROR_NONE;
gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1); gpr_atm_no_barrier_fetch_add(&m->parent->stats->num_writes, (gpr_atm)1);
if (m->parent->shutdown) { if (m->parent->shutdown) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); grpc_core::ExecCtx::Run(
} else if (m->on_read != nullptr) { DEBUG_LOCATION, cb,
for (size_t i = 0; i < slices->count; i++) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"));
grpc_slice_buffer_add(m->on_read_out, grpc_slice_copy(slices->slices[i]));
}
grpc_core::ExecCtx::Run(DEBUG_LOCATION, m->on_read, GRPC_ERROR_NONE);
m->on_read = nullptr;
} else { } else {
for (size_t i = 0; i < slices->count; i++) { GPR_ASSERT(!m->pending_write_op.is_armed);
grpc_slice_buffer_add(&m->read_buffer, m->pending_write_op.is_armed = true;
grpc_slice_copy(slices->slices[i])); m->pending_write_op.cb = cb;
m->pending_write_op.ep = ep;
// Copy slices into m->pending_write_op.slices
m->pending_write_op.slices = &m->write_buffer;
GPR_ASSERT(m->pending_write_op.slices->count == 0);
for (int i = 0; i < static_cast<int>(slices->count); i++) {
grpc_slice_buffer_add_indexed(m->pending_write_op.slices,
grpc_slice_copy(slices->slices[i]));
} }
do_pending_write_op_locked(m, GRPC_ERROR_NONE);
} }
gpr_mu_unlock(&m->parent->mu); gpr_mu_unlock(&m->parent->mu);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); }
void flush_pending_ops_locked(half* m, grpc_error_handle error) {
if (m->pending_read_op.is_armed) {
do_pending_read_op_locked(m, error);
}
if (m->pending_write_op.is_armed) {
do_pending_write_op_locked(m, error);
}
} }
static void me_add_to_pollset(grpc_endpoint* /*ep*/, static void me_add_to_pollset(grpc_endpoint* /*ep*/,
@ -110,6 +301,7 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
half* m = reinterpret_cast<half*>(ep); half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu); gpr_mu_lock(&m->parent->mu);
m->parent->shutdown = true; m->parent->shutdown = true;
flush_pending_ops_locked(m, GRPC_ERROR_NONE);
if (m->on_read) { if (m->on_read) {
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->on_read, DEBUG_LOCATION, m->on_read,
@ -117,6 +309,7 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
m->on_read = nullptr; m->on_read = nullptr;
} }
m = other_half(m); m = other_half(m);
flush_pending_ops_locked(m, GRPC_ERROR_NONE);
if (m->on_read) { if (m->on_read) {
grpc_core::ExecCtx::Run( grpc_core::ExecCtx::Run(
DEBUG_LOCATION, m->on_read, DEBUG_LOCATION, m->on_read,
@ -127,17 +320,28 @@ static void me_shutdown(grpc_endpoint* ep, grpc_error_handle why) {
GRPC_ERROR_UNREF(why); GRPC_ERROR_UNREF(why);
} }
void grpc_passthru_endpoint_destroy(passthru_endpoint* p) {
gpr_mu_destroy(&p->mu);
grpc_passthru_endpoint_stats_destroy(p->stats);
delete p->channel_effects;
grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
grpc_slice_buffer_destroy_internal(&p->client.write_buffer);
grpc_slice_buffer_destroy_internal(&p->server.write_buffer);
gpr_free(p);
}
static void me_destroy(grpc_endpoint* ep) { static void me_destroy(grpc_endpoint* ep) {
passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent; passthru_endpoint* p = (reinterpret_cast<half*>(ep))->parent;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
if (0 == --p->halves) { if (0 == --p->halves && p->channel_effects->actions.empty()) {
// no pending channel actions exist
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
gpr_mu_destroy(&p->mu); grpc_passthru_endpoint_destroy(p);
grpc_passthru_endpoint_stats_destroy(p->stats);
grpc_slice_buffer_destroy_internal(&p->client.read_buffer);
grpc_slice_buffer_destroy_internal(&p->server.read_buffer);
gpr_free(p);
} else { } else {
if (p->halves == 0 && p->simulate_channel_actions) {
grpc_timer_cancel(&p->channel_effects->timer);
}
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
} }
} }
@ -179,14 +383,21 @@ static void half_init(half* m, passthru_endpoint* parent,
m->base.vtable = &vtable; m->base.vtable = &vtable;
m->parent = parent; m->parent = parent;
grpc_slice_buffer_init(&m->read_buffer); grpc_slice_buffer_init(&m->read_buffer);
grpc_slice_buffer_init(&m->write_buffer);
m->pending_write_op.slices = nullptr;
m->on_read = nullptr; m->on_read = nullptr;
m->bytes_read_so_far = 0;
m->bytes_written_so_far = 0;
m->pending_write_op.is_armed = false;
m->pending_read_op.is_armed = false;
std::string name = std::string name =
absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent); absl::StrFormat("passthru_endpoint_%s_%p", half_name, parent);
} }
void grpc_passthru_endpoint_create(grpc_endpoint** client, void grpc_passthru_endpoint_create(grpc_endpoint** client,
grpc_endpoint** server, grpc_endpoint** server,
grpc_passthru_endpoint_stats* stats) { grpc_passthru_endpoint_stats* stats,
bool simulate_channel_actions) {
passthru_endpoint* m = passthru_endpoint* m =
static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m))); static_cast<passthru_endpoint*>(gpr_malloc(sizeof(*m)));
m->halves = 2; m->halves = 2;
@ -197,6 +408,12 @@ void grpc_passthru_endpoint_create(grpc_endpoint** client,
gpr_ref(&stats->refs); gpr_ref(&stats->refs);
m->stats = stats; m->stats = stats;
} }
m->channel_effects = new grpc_passthru_endpoint_channel_effects();
m->simulate_channel_actions = simulate_channel_actions;
if (!simulate_channel_actions) {
m->channel_effects->allowed_read_bytes = UINT64_MAX;
m->channel_effects->allowed_write_bytes = UINT64_MAX;
}
half_init(&m->client, m, "client"); half_init(&m->client, m, "client");
half_init(&m->server, m, "server"); half_init(&m->server, m, "server");
gpr_mu_init(&m->mu); gpr_mu_init(&m->mu);
@ -218,3 +435,55 @@ void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats) {
gpr_free(stats); gpr_free(stats);
} }
} }
static void sched_next_channel_action_locked(half* m);
static void do_next_sched_channel_action(void* arg, grpc_error_handle error) {
half* m = reinterpret_cast<half*>(arg);
gpr_mu_lock(&m->parent->mu);
GPR_ASSERT(!m->parent->channel_effects->actions.empty());
if (m->parent->halves == 0) {
gpr_mu_unlock(&m->parent->mu);
grpc_passthru_endpoint_destroy(m->parent);
return;
}
auto curr_action = m->parent->channel_effects->actions[0];
m->parent->channel_effects->actions.erase(
m->parent->channel_effects->actions.begin());
m->parent->channel_effects->allowed_read_bytes +=
curr_action.add_n_readable_bytes;
m->parent->channel_effects->allowed_write_bytes +=
curr_action.add_n_writable_bytes;
flush_pending_ops_locked(m, error);
flush_pending_ops_locked(other_half(m), error);
sched_next_channel_action_locked(m);
gpr_mu_unlock(&m->parent->mu);
}
static void sched_next_channel_action_locked(half* m) {
if (m->parent->channel_effects->actions.empty()) {
m->parent->channel_effects->on_complete();
return;
}
grpc_timer_init(&m->parent->channel_effects->timer,
m->parent->channel_effects->actions[0].wait_ms +
grpc_core::ExecCtx::Get()->Now(),
GRPC_CLOSURE_CREATE(do_next_sched_channel_action, m,
grpc_schedule_on_exec_ctx));
}
void start_scheduling_grpc_passthru_endpoint_channel_effects(
grpc_endpoint* ep,
const std::vector<grpc_passthru_endpoint_channel_action>& actions,
std::function<void()> on_complete) {
half* m = reinterpret_cast<half*>(ep);
gpr_mu_lock(&m->parent->mu);
if (!m->parent->simulate_channel_actions || m->parent->shutdown) {
gpr_mu_unlock(&m->parent->mu);
return;
}
m->parent->channel_effects->actions = actions;
m->parent->channel_effects->on_complete = std::move(on_complete);
sched_next_channel_action_locked(m);
gpr_mu_unlock(&m->parent->mu);
}

@ -31,12 +31,24 @@ typedef struct {
gpr_atm num_writes; gpr_atm num_writes;
} grpc_passthru_endpoint_stats; } grpc_passthru_endpoint_stats;
typedef struct {
uint64_t wait_ms;
uint64_t add_n_writable_bytes;
uint64_t add_n_readable_bytes;
} grpc_passthru_endpoint_channel_action;
void grpc_passthru_endpoint_create(grpc_endpoint** client, void grpc_passthru_endpoint_create(grpc_endpoint** client,
grpc_endpoint** server, grpc_endpoint** server,
grpc_passthru_endpoint_stats* stats); grpc_passthru_endpoint_stats* stats,
bool simulate_channel_actions = false);
grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create(); grpc_passthru_endpoint_stats* grpc_passthru_endpoint_stats_create();
void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats); void grpc_passthru_endpoint_stats_destroy(grpc_passthru_endpoint_stats* stats);
void start_scheduling_grpc_passthru_endpoint_channel_effects(
grpc_endpoint* ep,
const std::vector<grpc_passthru_endpoint_channel_action>& actions,
std::function<void()> on_complete);
#endif // PASSTHRU_ENDPOINT_H #endif // PASSTHRU_ENDPOINT_H

Loading…
Cancel
Save