diff --git a/test/core/end2end/fixtures/proxy.cc b/test/core/end2end/fixtures/proxy.cc index febdb809838..fd8a0f59763 100644 --- a/test/core/end2end/fixtures/proxy.cc +++ b/test/core/end2end/fixtures/proxy.cc @@ -20,6 +20,7 @@ #include +#include #include #include @@ -82,6 +83,10 @@ typedef struct { grpc_metadata_array c2p_initial_metadata; grpc_metadata_array p2s_initial_metadata; + grpc_core::Mutex* initial_metadata_mu; + bool p2s_initial_metadata_received ABSL_GUARDED_BY(initial_metadata_mu); + grpc_op* deferred_trailing_metadata_op ABSL_GUARDED_BY(initial_metadata_mu); + grpc_byte_buffer* c2p_msg; grpc_byte_buffer* p2s_msg; @@ -166,6 +171,13 @@ static void unrefpc(proxy_call* pc, const char* /*reason*/) { grpc_metadata_array_destroy(&pc->p2s_initial_metadata); grpc_metadata_array_destroy(&pc->p2s_trailing_metadata); grpc_slice_unref(pc->p2s_status_details); + { + grpc_core::MutexLock lock(pc->initial_metadata_mu); + if (pc->deferred_trailing_metadata_op != nullptr) { + gpr_free(pc->deferred_trailing_metadata_op); + } + } + delete pc->initial_metadata_mu; gpr_free(pc); } } @@ -179,25 +191,45 @@ static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) { unrefpc(pc, "on_c2p_sent_initial_metadata"); } +static void on_c2p_sent_status(void* arg, int /*success*/) { + proxy_call* pc = static_cast(arg); + unrefpc(pc, "on_c2p_sent_status"); +} + static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) { proxy_call* pc = static_cast(arg); grpc_op op; grpc_call_error err; - memset(&op, 0, sizeof(op)); - if (!pc->proxy->shutdown && !grpc_call_is_trailers_only(pc->p2s)) { - op.op = GRPC_OP_SEND_INITIAL_METADATA; - op.flags = 0; - op.reserved = nullptr; - op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; - op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata; - refpc(pc, "on_c2p_sent_initial_metadata"); - err = grpc_call_start_batch(pc->c2p, &op, 1, - new_closure(on_c2p_sent_initial_metadata, pc), - nullptr); - CHECK_EQ(err, GRPC_CALL_OK); + if (!pc->proxy->shutdown) { + if (!grpc_call_is_trailers_only(pc->p2s)) { + op.op = GRPC_OP_SEND_INITIAL_METADATA; + op.flags = 0; + op.reserved = nullptr; + op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count; + op.data.send_initial_metadata.metadata = + pc->p2s_initial_metadata.metadata; + refpc(pc, "on_c2p_sent_initial_metadata"); + err = grpc_call_start_batch(pc->c2p, &op, 1, + new_closure(on_c2p_sent_initial_metadata, pc), + nullptr); + CHECK_EQ(err, GRPC_CALL_OK); + } + grpc_op* deferred_trailing_metadata_op = nullptr; + { + grpc_core::MutexLock lock(pc->initial_metadata_mu); + // Start the batch without the mutex held, just in case. + // This will be nullptr if the trailing metadata has not yet been seen. + deferred_trailing_metadata_op = pc->deferred_trailing_metadata_op; + pc->p2s_initial_metadata_received = true; + } + if (deferred_trailing_metadata_op != nullptr) { + refpc(pc, "on_c2p_sent_status"); + err = grpc_call_start_batch(pc->c2p, deferred_trailing_metadata_op, 1, + new_closure(on_c2p_sent_status, pc), nullptr); + CHECK_EQ(err, GRPC_CALL_OK); + } } - unrefpc(pc, "on_p2s_recv_initial_metadata"); } @@ -308,11 +340,6 @@ static void on_p2s_recv_msg(void* arg, int success) { unrefpc(pc, "on_p2s_recv_msg"); } -static void on_c2p_sent_status(void* arg, int /*success*/) { - proxy_call* pc = static_cast(arg); - unrefpc(pc, "on_c2p_sent_status"); -} - static void on_p2s_status(void* arg, int success) { proxy_call* pc = static_cast(arg); grpc_op op[2]; // Possibly send empty initial metadata also if trailers-only @@ -340,10 +367,29 @@ static void on_p2s_status(void* arg, int success) { op[op_count].data.send_status_from_server.status_details = &pc->p2s_status_details; op_count++; - refpc(pc, "on_c2p_sent_status"); - err = grpc_call_start_batch(pc->c2p, op, op_count, - new_closure(on_c2p_sent_status, pc), nullptr); - CHECK_EQ(err, GRPC_CALL_OK); + + // TODO(ctiller): The current core implementation requires initial + // metadata batches to be started *after* initial metadata batches have + // been completed. The C++ Callback API does this accounting too, for + // example. + // + // This entire fixture will need a redesign when the batch API goes away. + bool op_deferred = false; + { + grpc_core::MutexLock lock(pc->initial_metadata_mu); + if (!pc->p2s_initial_metadata_received) { + op_deferred = true; + pc->deferred_trailing_metadata_op = + static_cast(gpr_malloc(sizeof(op))); + memcpy(pc->deferred_trailing_metadata_op, &op, sizeof(op)); + } + } + if (!op_deferred) { + refpc(pc, "on_c2p_sent_status"); + err = grpc_call_start_batch(pc->c2p, op, op_count, + new_closure(on_c2p_sent_status, pc), nullptr); + CHECK_EQ(err, GRPC_CALL_OK); + } } unrefpc(pc, "on_p2s_status"); @@ -365,6 +411,12 @@ static void on_new_call(void* arg, int success) { memset(pc, 0, sizeof(*pc)); pc->proxy = proxy; std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata); + pc->initial_metadata_mu = new grpc_core::Mutex(); + { + grpc_core::MutexLock lock(pc->initial_metadata_mu); + pc->p2s_initial_metadata_received = false; + pc->deferred_trailing_metadata_op = nullptr; + } pc->c2p = proxy->new_call; pc->p2s = grpc_channel_create_call( proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq, @@ -374,6 +426,7 @@ static void on_new_call(void* arg, int success) { op.reserved = nullptr; + // Proxy: receive initial metadata from the server op.op = GRPC_OP_RECV_INITIAL_METADATA; op.flags = 0; op.data.recv_initial_metadata.recv_initial_metadata = @@ -384,6 +437,7 @@ static void on_new_call(void* arg, int success) { nullptr); CHECK_EQ(err, GRPC_CALL_OK); + // Proxy: send initial metadata to the server op.op = GRPC_OP_SEND_INITIAL_METADATA; op.flags = 0; op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count; @@ -394,6 +448,7 @@ static void on_new_call(void* arg, int success) { nullptr); CHECK_EQ(err, GRPC_CALL_OK); + // Client: receive message from the proxy op.op = GRPC_OP_RECV_MESSAGE; op.flags = 0; op.data.recv_message.recv_message = &pc->c2p_msg; @@ -402,6 +457,7 @@ static void on_new_call(void* arg, int success) { new_closure(on_c2p_recv_msg, pc), nullptr); CHECK_EQ(err, GRPC_CALL_OK); + // Proxy: receive message from the server op.op = GRPC_OP_RECV_MESSAGE; op.flags = 0; op.data.recv_message.recv_message = &pc->p2s_msg; @@ -410,6 +466,7 @@ static void on_new_call(void* arg, int success) { new_closure(on_p2s_recv_msg, pc), nullptr); CHECK_EQ(err, GRPC_CALL_OK); + // Proxy: receive status from the server op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; op.flags = 0; op.data.recv_status_on_client.trailing_metadata = @@ -421,6 +478,7 @@ static void on_new_call(void* arg, int success) { nullptr); CHECK_EQ(err, GRPC_CALL_OK); + // Client: receive close-ack from the proxy op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; op.flags = 0; op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;