From d410f1d0aa12abc08adbc3393b2067c6c6229852 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 11 Nov 2022 14:18:29 -0800 Subject: [PATCH] [surface] Ensure SEND_STATUS & RECV_MESSAGE do not inhabit the same batch (#31554) Add a check that SEND_STATUS_FROM_SERVER and RECV_MESSAGE are not in the same batch. This is necessary pre-work for #31204 and implements part of grpc/proposal#336. Also eliminates fling instead of updating it: My expectation is nobody has looked at this corner in many years It's not a benchmark we want: concentrating on a microbenchmark that doesn't include a binding layer caused us to favor designs that emphasized a lightweight core at the expense of a expensive bindings. We should consider the whole. Co-authored-by: ctiller --- CMakeLists.txt | 98 ------ build_autogenerated.yaml | 84 ----- src/core/lib/surface/call.cc | 9 + src/php/tests/unit_tests/CallInvokerTest.php | 6 +- src/php/tests/unit_tests/EndToEndTest.php | 9 +- src/php/tests/unit_tests/InterceptorTest.php | 12 +- .../tests/unit_tests/SecureEndToEndTest.php | 8 +- .../tests/unit/_cython/cygrpc_test.py | 27 +- src/ruby/spec/client_server_spec.rb | 28 +- test/core/end2end/tests/max_message_length.cc | 19 +- test/core/end2end/tests/retry.cc | 9 +- .../tests/retry_per_attempt_recv_timeout.cc | 15 +- .../tests/retry_send_initial_metadata_refs.cc | 15 +- ...etry_transparent_max_concurrent_streams.cc | 18 +- test/core/fling/BUILD | 89 ----- test/core/fling/client.cc | 250 ------------- test/core/fling/fling_stream_test.cc | 80 ----- test/core/fling/fling_test.cc | 83 ----- test/core/fling/server.cc | 329 ------------------ test/core/memory_usage/server.cc | 6 - tools/run_tests/generated/tests.json | 44 --- 21 files changed, 136 insertions(+), 1102 deletions(-) delete mode 100644 test/core/fling/BUILD delete mode 100644 test/core/fling/client.cc delete mode 100644 test/core/fling/fling_stream_test.cc delete mode 100644 test/core/fling/fling_test.cc delete mode 100644 test/core/fling/server.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index bd1469ceb45..f93e06950f5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -771,12 +771,6 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_c fd_conservation_posix_test) endif() - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - add_dependencies(buildtests_c fling_stream_test) - endif() - if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - add_dependencies(buildtests_c fling_test) - endif() add_dependencies(buildtests_c goaway_server_test) add_dependencies(buildtests_c inproc_callback_test) add_dependencies(buildtests_c invalid_call_argument_test) @@ -4624,98 +4618,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) ) -endif() -endif() -if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - - add_executable(fling_stream_test - test/core/end2end/data/client_certs.cc - test/core/end2end/data/server1_cert.cc - test/core/end2end/data/server1_key.cc - test/core/end2end/data/test_root_cert.cc - test/core/fling/fling_stream_test.cc - test/core/util/cmdline.cc - test/core/util/fuzzer_util.cc - test/core/util/grpc_profiler.cc - test/core/util/histogram.cc - test/core/util/mock_endpoint.cc - test/core/util/parse_hexstring.cc - test/core/util/passthru_endpoint.cc - test/core/util/resolve_localhost_ip46.cc - test/core/util/slice_splitter.cc - test/core/util/subprocess_posix.cc - test/core/util/subprocess_windows.cc - test/core/util/tracer_util.cc - ) - - target_include_directories(fling_stream_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - ) - - target_link_libraries(fling_stream_test - ${_gRPC_ZLIB_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util - ) - - -endif() -endif() -if(gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) - - add_executable(fling_test - test/core/end2end/data/client_certs.cc - test/core/end2end/data/server1_cert.cc - test/core/end2end/data/server1_key.cc - test/core/end2end/data/test_root_cert.cc - test/core/fling/fling_test.cc - test/core/util/cmdline.cc - test/core/util/fuzzer_util.cc - test/core/util/grpc_profiler.cc - test/core/util/histogram.cc - test/core/util/mock_endpoint.cc - test/core/util/parse_hexstring.cc - test/core/util/passthru_endpoint.cc - test/core/util/resolve_localhost_ip46.cc - test/core/util/slice_splitter.cc - test/core/util/subprocess_posix.cc - test/core/util/subprocess_windows.cc - test/core/util/tracer_util.cc - ) - - target_include_directories(fling_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - ) - - target_link_libraries(fling_test - ${_gRPC_ZLIB_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util - ) - - endif() endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 39db4fb44db..b0f745f3fc6 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -3541,90 +3541,6 @@ targets: - linux - posix - mac -- name: fling_stream_test - build: test - language: c - headers: - - test/core/end2end/data/ssl_test_data.h - - test/core/util/cmdline.h - - test/core/util/evaluate_args_test_util.h - - test/core/util/fuzzer_util.h - - test/core/util/grpc_profiler.h - - test/core/util/histogram.h - - test/core/util/mock_authorization_endpoint.h - - test/core/util/mock_endpoint.h - - test/core/util/parse_hexstring.h - - test/core/util/passthru_endpoint.h - - test/core/util/resolve_localhost_ip46.h - - test/core/util/slice_splitter.h - - test/core/util/subprocess.h - - test/core/util/tracer_util.h - src: - - test/core/end2end/data/client_certs.cc - - test/core/end2end/data/server1_cert.cc - - test/core/end2end/data/server1_key.cc - - test/core/end2end/data/test_root_cert.cc - - test/core/fling/fling_stream_test.cc - - test/core/util/cmdline.cc - - test/core/util/fuzzer_util.cc - - test/core/util/grpc_profiler.cc - - test/core/util/histogram.cc - - test/core/util/mock_endpoint.cc - - test/core/util/parse_hexstring.cc - - test/core/util/passthru_endpoint.cc - - test/core/util/resolve_localhost_ip46.cc - - test/core/util/slice_splitter.cc - - test/core/util/subprocess_posix.cc - - test/core/util/subprocess_windows.cc - - test/core/util/tracer_util.cc - deps: - - grpc_test_util - platforms: - - linux - - posix - - mac -- name: fling_test - build: test - language: c - headers: - - test/core/end2end/data/ssl_test_data.h - - test/core/util/cmdline.h - - test/core/util/evaluate_args_test_util.h - - test/core/util/fuzzer_util.h - - test/core/util/grpc_profiler.h - - test/core/util/histogram.h - - test/core/util/mock_authorization_endpoint.h - - test/core/util/mock_endpoint.h - - test/core/util/parse_hexstring.h - - test/core/util/passthru_endpoint.h - - test/core/util/resolve_localhost_ip46.h - - test/core/util/slice_splitter.h - - test/core/util/subprocess.h - - test/core/util/tracer_util.h - src: - - test/core/end2end/data/client_certs.cc - - test/core/end2end/data/server1_cert.cc - - test/core/end2end/data/server1_key.cc - - test/core/end2end/data/test_root_cert.cc - - test/core/fling/fling_test.cc - - test/core/util/cmdline.cc - - test/core/util/fuzzer_util.cc - - test/core/util/grpc_profiler.cc - - test/core/util/histogram.cc - - test/core/util/mock_endpoint.cc - - test/core/util/parse_hexstring.cc - - test/core/util/passthru_endpoint.cc - - test/core/util/resolve_localhost_ip46.cc - - test/core/util/slice_splitter.cc - - test/core/util/subprocess_posix.cc - - test/core/util/subprocess_windows.cc - - test/core/util/tracer_util.cc - deps: - - grpc_test_util - platforms: - - linux - - posix - - mac - name: goaway_server_test build: test language: c diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index fe3ed7a3c51..ef8e2afb2fe 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1362,6 +1362,15 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops, seen_ops |= (1u << ops[i].op); } + if (!is_client() && + (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 && + (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) { + gpr_log(GPR_ERROR, + "******************* SEND_STATUS WITH RECV_MESSAGE " + "*******************"); + return GRPC_CALL_ERROR; + } + GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops); if (nops == 0) { diff --git a/src/php/tests/unit_tests/CallInvokerTest.php b/src/php/tests/unit_tests/CallInvokerTest.php index d928b97ef64..52af5b1b72f 100644 --- a/src/php/tests/unit_tests/CallInvokerTest.php +++ b/src/php/tests/unit_tests/CallInvokerTest.php @@ -214,6 +214,10 @@ class CallInvokerTest extends \PHPUnit\Framework\TestCase $event = $this->server->requestCall(); $this->assertSame('/phony_method', $event->method); $server_call = $event->call; + $event = $server_call->startBatch([ + Grpc\OP_RECV_MESSAGE => true, + ]); + $this->assertSame('intercepted_unary_request', $event->message); $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ @@ -221,10 +225,8 @@ class CallInvokerTest extends \PHPUnit\Framework\TestCase 'code' => Grpc\STATUS_OK, 'details' => '', ], - Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_CLOSE_ON_SERVER => true, ]); - $this->assertSame('intercepted_unary_request', $event->message); $call_invoker->getChannel()->close(); unset($unary_call); unset($server_call); diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 0e6084a5f2a..b2eb370a2e4 100644 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -158,14 +158,18 @@ class EndToEndTest extends \PHPUnit\Framework\TestCase $server_call = $event->call; $event = $server_call->startBatch([ - Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_RECV_MESSAGE => true, + ]); + $this->assertSame($req_text, $event->message); + + $event = $server_call->startBatch([ Grpc\OP_SEND_MESSAGE => ['message' => $reply_text], + Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ 'metadata' => [], 'code' => Grpc\STATUS_OK, 'details' => $status_text, ], - Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_CLOSE_ON_SERVER => true, ]); @@ -173,7 +177,6 @@ class EndToEndTest extends \PHPUnit\Framework\TestCase $this->assertTrue($event->send_status); $this->assertTrue($event->send_message); $this->assertFalse($event->cancelled); - $this->assertSame($req_text, $event->message); $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, diff --git a/src/php/tests/unit_tests/InterceptorTest.php b/src/php/tests/unit_tests/InterceptorTest.php index accef704ecc..f01d29eba86 100644 --- a/src/php/tests/unit_tests/InterceptorTest.php +++ b/src/php/tests/unit_tests/InterceptorTest.php @@ -339,6 +339,10 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase $event = $this->server->requestCall(); $this->assertSame('/phony_method', $event->method); $server_call = $event->call; + $event = $server_call->startBatch([ + Grpc\OP_RECV_MESSAGE => true, + ]); + $this->assertSame('intercepted_unary_request', $event->message); $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ @@ -346,16 +350,18 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase 'code' => Grpc\STATUS_OK, 'details' => '', ], - Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_CLOSE_ON_SERVER => true, ]); - $this->assertSame('intercepted_unary_request', $event->message); $stream_call = $client->StreamCall(); $stream_call->write($req); $event = $this->server->requestCall(); $this->assertSame('/phony_method', $event->method); $server_call = $event->call; + $event = $server_call->startBatch([ + Grpc\OP_RECV_MESSAGE => true, + ]); + $this->assertSame('intercepted_stream_request', $event->message); $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_STATUS_FROM_SERVER => [ @@ -363,10 +369,8 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase 'code' => Grpc\STATUS_OK, 'details' => '', ], - Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_CLOSE_ON_SERVER => true, ]); - $this->assertSame('intercepted_stream_request', $event->message); unset($unary_call); unset($stream_call); diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index ca542d143dd..6264c5dd55e 100644 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -177,6 +177,12 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase $this->assertSame('phony_method', $event->method); $server_call = $event->call; + $event = $server_call->startBatch([ + Grpc\OP_RECV_MESSAGE => true, + ]); + + $this->assertSame($req_text, $event->message); + $event = $server_call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], Grpc\OP_SEND_MESSAGE => ['message' => $reply_text], @@ -185,7 +191,6 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase 'code' => Grpc\STATUS_OK, 'details' => $status_text, ], - Grpc\OP_RECV_MESSAGE => true, Grpc\OP_RECV_CLOSE_ON_SERVER => true, ]); @@ -193,7 +198,6 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase $this->assertTrue($event->send_status); $this->assertTrue($event->send_message); $this->assertFalse($event->cancelled); - $this->assertSame($req_text, $event->message); $event = $call->startBatch([ Grpc\OP_RECV_INITIAL_METADATA => true, diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 9021dc08d1e..58bc579373e 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -227,6 +227,15 @@ class ServerClientMixin(object): server_call_tag = object() server_call = request_event.call + server_start_batch_result = server_call.start_server_batch([ + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + ], server_call_tag) + self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) + + server_message_event = self.server_completion_queue.poll( + deadline=DEADLINE) + + server_call_tag = object() server_initial_metadata = (( SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE, @@ -238,7 +247,6 @@ class ServerClientMixin(object): server_start_batch_result = server_call.start_server_batch([ cygrpc.SendInitialMetadataOperation(server_initial_metadata, _EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( @@ -282,7 +290,21 @@ class ServerClientMixin(object): cygrpc.OperationType.receive_status_on_client ]), found_client_op_types) - self.assertEqual(5, len(server_event.batch_operations)) + self.assertEqual(1, len(server_message_event.batch_operations)) + found_server_op_types = set() + for server_result in server_message_event.batch_operations: + self.assertNotIn(server_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) + self.assertEqual(set([ + cygrpc.OperationType.receive_message, + ]), found_server_op_types) + + self.assertEqual(4, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: self.assertNotIn(server_result.type(), found_server_op_types) @@ -295,7 +317,6 @@ class ServerClientMixin(object): self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, - cygrpc.OperationType.receive_message, cygrpc.OperationType.send_message, cygrpc.OperationType.receive_close_on_server, cygrpc.OperationType.send_status_from_server diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb index 78b4a7f3052..c27262ec70c 100644 --- a/src/ruby/spec/client_server_spec.rb +++ b/src/ruby/spec/client_server_spec.rb @@ -85,12 +85,15 @@ shared_examples 'basic GRPC message delivery is OK' do # confirm the server can read the inbound message server_thread.join server_ops = { - CallOps::RECV_MESSAGE => nil, + CallOps::RECV_MESSAGE => nil + } + server_batch = server_call.run_batch(server_ops) + expect(server_batch.message).to eq(sent_message) + server_ops = { CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_STATUS_FROM_SERVER => ok_status } server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(sent_message) expect(server_batch.send_close).to be true expect(server_batch.send_status).to be true @@ -123,13 +126,16 @@ shared_examples 'basic GRPC message delivery is OK' do # confirm the server can read the inbound message server_thread.join server_ops = { - CallOps::RECV_MESSAGE => nil, + CallOps::RECV_MESSAGE => nil + } + server_batch = server_call.run_batch(server_ops) + expect(server_batch.message).to eq(sent_message) + server_ops = { CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_MESSAGE => reply_text, CallOps::SEND_STATUS_FROM_SERVER => ok_status } server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(sent_message) expect(server_batch.send_close).to be true expect(server_batch.send_message).to be true expect(server_batch.send_status).to be true @@ -168,13 +174,16 @@ shared_examples 'basic GRPC message delivery is OK' do # confirm the server can read the inbound message server_thread.join server_ops = { - CallOps::RECV_MESSAGE => nil, + CallOps::RECV_MESSAGE => nil + } + server_batch = server_call.run_batch(server_ops) + expect(server_batch.message).to eq(long_request_str) + server_ops = { CallOps::RECV_CLOSE_ON_SERVER => nil, CallOps::SEND_MESSAGE => long_response_str, CallOps::SEND_STATUS_FROM_SERVER => ok_status } server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq(long_request_str) expect(server_batch.send_close).to be true expect(server_batch.send_message).to be true expect(server_batch.send_status).to be true @@ -245,12 +254,15 @@ shared_examples 'basic GRPC message delivery is OK' do the_status = Struct::Status.new(StatusCodes::OK, 'OK', {}) server_thread.join server_ops = { - CallOps::RECV_MESSAGE => nil, + CallOps::RECV_MESSAGE => nil + } + server_batch = server_call.run_batch(server_ops) + expect(server_batch.message).to eq sent_message + server_ops = { CallOps::SEND_MESSAGE => reply_text, CallOps::SEND_STATUS_FROM_SERVER => the_status } server_batch = server_call.run_batch(server_ops) - expect(server_batch.message).to eq sent_message expect(server_batch.send_status).to be true expect(server_batch.send_message).to be true diff --git a/test/core/end2end/tests/max_message_length.cc b/test/core/end2end/tests/max_message_length.cc index e476ed55414..8425f73faf5 100644 --- a/test/core/end2end/tests/max_message_length.cc +++ b/test/core/end2end/tests/max_message_length.cc @@ -576,13 +576,19 @@ static void test_max_receive_message_length_on_compressed_request( memset(ops, 0, sizeof(ops)); op = ops; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &recv_payload; op->flags = 0; op->reserved = nullptr; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &recv_payload; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; op->flags = 0; op->reserved = nullptr; op++; @@ -602,11 +608,12 @@ static void test_max_receive_message_length_on_compressed_request( op->reserved = nullptr; op++; } - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(102), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); - cqv.Expect(tag(102), true); + cqv.Expect(tag(102), minimal_stack); + cqv.Expect(tag(103), true); cqv.Expect(tag(1), true); cqv.Verify(); diff --git a/test/core/end2end/tests/retry.cc b/test/core/end2end/tests/retry.cc index e1e6188b6be..e29974c0de0 100644 --- a/test/core/end2end/tests/retry.cc +++ b/test/core/end2end/tests/retry.cc @@ -274,6 +274,12 @@ static void test_retry(grpc_end2end_test_config config) { op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = response_payload; op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; op->data.send_status_from_server.trailing_metadata_count = 0; op->data.send_status_from_server.status = GRPC_STATUS_OK; @@ -282,11 +288,12 @@ static void test_retry(grpc_end2end_test_config config) { op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(203), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); cqv.Expect(tag(202), true); + cqv.Expect(tag(203), true); cqv.Expect(tag(1), true); cqv.Verify(); diff --git a/test/core/end2end/tests/retry_per_attempt_recv_timeout.cc b/test/core/end2end/tests/retry_per_attempt_recv_timeout.cc index 83977905f36..475a94148f4 100644 --- a/test/core/end2end/tests/retry_per_attempt_recv_timeout.cc +++ b/test/core/end2end/tests/retry_per_attempt_recv_timeout.cc @@ -291,15 +291,21 @@ static void test_retry_per_attempt_recv_timeout( } GPR_ASSERT(found_retry_header); + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message.recv_message = &request_payload_recv; + op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(302), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + // Server sends OK status. memset(ops, 0, sizeof(ops)); op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &request_payload_recv; - op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = response_payload; op++; @@ -311,11 +317,12 @@ static void test_retry_per_attempt_recv_timeout( op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(302), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(303), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); cqv.Expect(tag(302), true); + cqv.Expect(tag(303), true); cqv.Expect(tag(1), true); cqv.Verify(); diff --git a/test/core/end2end/tests/retry_send_initial_metadata_refs.cc b/test/core/end2end/tests/retry_send_initial_metadata_refs.cc index 62deb130807..cb6abe7bb75 100644 --- a/test/core/end2end/tests/retry_send_initial_metadata_refs.cc +++ b/test/core/end2end/tests/retry_send_initial_metadata_refs.cc @@ -301,12 +301,18 @@ static void test_retry_send_initial_metadata_refs( memset(ops, 0, sizeof(ops)); op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op++; op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message.recv_message = &request_payload_recv; op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = response_payload; op++; @@ -318,11 +324,12 @@ static void test_retry_send_initial_metadata_refs( op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(203), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); cqv.Expect(tag(202), true); + cqv.Expect(tag(203), true); cqv.Expect(tag(2), true); cqv.Verify(); diff --git a/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc b/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc index 98650f99998..e7a65f5b81e 100644 --- a/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc +++ b/test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc @@ -227,6 +227,12 @@ static void test_retry_transparent_max_concurrent_streams( op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message.recv_message = &request_payload_recv; op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; @@ -241,12 +247,13 @@ static void test_retry_transparent_max_concurrent_streams( op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = &status_details; op++; - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(103), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(104), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); // Server completes first call and shutdown. // Client completes first call. + cqv.Expect(tag(104), true); cqv.Expect(tag(103), true); cqv.Expect(tag(102), true); cqv.Expect(tag(1), true); @@ -304,6 +311,12 @@ static void test_retry_transparent_max_concurrent_streams( op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message.recv_message = &request_payload_recv; op++; + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + nullptr); + GPR_ASSERT(GRPC_CALL_OK == error); + + memset(ops, 0, sizeof(ops)); + op = ops; op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; op->data.recv_close_on_server.cancelled = &was_cancelled; op++; @@ -318,11 +331,12 @@ static void test_retry_transparent_max_concurrent_streams( op->data.send_status_from_server.status = GRPC_STATUS_OK; op->data.send_status_from_server.status_details = &status_details; op++; - error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(202), + error = grpc_call_start_batch(s, ops, static_cast(op - ops), tag(203), nullptr); GPR_ASSERT(GRPC_CALL_OK == error); // Second call completes. + cqv.Expect(tag(203), true); cqv.Expect(tag(202), true); cqv.Expect(tag(2), true); cqv.Verify(); diff --git a/test/core/fling/BUILD b/test/core/fling/BUILD deleted file mode 100644 index e4fe3a37303..00000000000 --- a/test/core/fling/BUILD +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2017 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_test", "grpc_package") - -grpc_package(name = "test/core/fling") - -licenses(["notice"]) - -grpc_cc_binary( - name = "fling_client", - testonly = 1, - srcs = ["client.cc"], - language = "C++", - deps = [ - "//:gpr", - "//:grpc", - "//test/core/end2end:ssl_test_data", - "//test/core/util:grpc_test_util", - "//test/core/util:grpc_test_util_base", - ], -) - -grpc_cc_binary( - name = "fling_server", - testonly = 1, - srcs = ["server.cc"], - language = "C++", - deps = [ - "//:gpr", - "//:grpc", - "//test/core/end2end:ssl_test_data", - "//test/core/util:grpc_test_util", - "//test/core/util:grpc_test_util_base", - ], -) - -grpc_cc_test( - name = "fling_test", - srcs = ["fling_test.cc"], - data = [ - ":fling_client", - ":fling_server", - ], - tags = [ - "no_windows", - "requires-net:ipv4", - "requires-net:loopback", - ], - deps = [ - "//:gpr", - "//:grpc", - "//test/core/end2end:ssl_test_data", - "//test/core/util:grpc_test_util", - "//test/core/util:grpc_test_util_base", - ], -) - -grpc_cc_test( - name = "fling_stream_test", - srcs = ["fling_stream_test.cc"], - data = [ - ":fling_client", - ":fling_server", - ], - tags = [ - "no_windows", - "requires-net:ipv4", - "requires-net:loopback", - ], - deps = [ - "//:gpr", - "//:grpc", - "//test/core/end2end:ssl_test_data", - "//test/core/util:grpc_test_util", - "//test/core/util:grpc_test_util_base", - ], -) diff --git a/test/core/fling/client.cc b/test/core/fling/client.cc deleted file mode 100644 index 36e0e5f71c3..00000000000 --- a/test/core/fling/client.cc +++ /dev/null @@ -1,250 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "src/core/lib/gpr/useful.h" -#include "test/core/util/cmdline.h" -#include "test/core/util/grpc_profiler.h" -#include "test/core/util/histogram.h" -#include "test/core/util/test_config.h" - -static grpc_histogram* histogram; -static grpc_byte_buffer* the_buffer; -static grpc_channel* channel; -static grpc_completion_queue* cq; -static grpc_call* call; -static grpc_op ops[6]; -static grpc_op stream_init_ops[2]; -static grpc_op stream_step_ops[2]; -static grpc_metadata_array initial_metadata_recv; -static grpc_metadata_array trailing_metadata_recv; -static grpc_byte_buffer* response_payload_recv = nullptr; -static grpc_status_code status; -static grpc_slice details; -static grpc_op* op; - -static void init_ping_pong_request(void) { - grpc_metadata_array_init(&initial_metadata_recv); - grpc_metadata_array_init(&trailing_metadata_recv); - - memset(ops, 0, sizeof(ops)); - op = ops; - - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op++; - op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message.send_message = the_buffer; - op++; - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &response_payload_recv; - op++; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; - op->data.recv_status_on_client.status = &status; - op->data.recv_status_on_client.status_details = &details; - op++; -} - -static void step_ping_pong_request(void) { - grpc_slice host = grpc_slice_from_static_string("localhost"); - call = grpc_channel_create_call( - channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, - grpc_slice_from_static_string("/Reflector/reflectUnary"), &host, - gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, - (size_t)(op - ops), (void*)1, - nullptr)); - grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - grpc_call_unref(call); - grpc_byte_buffer_destroy(response_payload_recv); - call = nullptr; -} - -static void init_ping_pong_stream(void) { - grpc_metadata_array_init(&initial_metadata_recv); - - grpc_call_error error; - grpc_slice host = grpc_slice_from_static_string("localhost"); - call = grpc_channel_create_call( - channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq, - grpc_slice_from_static_string("/Reflector/reflectStream"), &host, - gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; - stream_init_ops[0].data.send_initial_metadata.count = 0; - stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; - stream_init_ops[1].data.recv_initial_metadata.recv_initial_metadata = - &initial_metadata_recv; - error = grpc_call_start_batch(call, stream_init_ops, 2, - reinterpret_cast(1), nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); - grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - - grpc_metadata_array_init(&initial_metadata_recv); - - stream_step_ops[0].op = GRPC_OP_SEND_MESSAGE; - stream_step_ops[0].data.send_message.send_message = the_buffer; - stream_step_ops[1].op = GRPC_OP_RECV_MESSAGE; - stream_step_ops[1].data.recv_message.recv_message = &response_payload_recv; -} - -static void step_ping_pong_stream(void) { - grpc_call_error error; - error = grpc_call_start_batch(call, stream_step_ops, 2, - reinterpret_cast(1), nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); - grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); - grpc_byte_buffer_destroy(response_payload_recv); -} - -static double now(void) { - gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME); - return 1e9 * static_cast(tv.tv_sec) + tv.tv_nsec; -} - -typedef struct { - const char* name; - void (*init)(); - void (*do_one_step)(); -} scenario; - -static const scenario scenarios[] = { - {"ping-pong-request", init_ping_pong_request, step_ping_pong_request}, - {"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream}, -}; - -int main(int argc, char** argv) { - grpc_slice slice = grpc_slice_from_copied_string("x"); - double start, stop; - unsigned i; - - int fake_argc = 1; - char* fake_argv[1]; - - int payload_size = 1; - int secure = 0; - const char* target = "localhost:443"; - gpr_cmdline* cl; - grpc_event event; - const char* scenario_name = "ping-pong-request"; - scenario sc = {nullptr, nullptr, nullptr}; - - GPR_ASSERT(argc >= 1); - fake_argv[0] = argv[0]; - grpc::testing::TestEnvironment env(&fake_argc, fake_argv); - - grpc_init(); - - int warmup_seconds = 1; - int benchmark_seconds = 5; - - cl = gpr_cmdline_create("fling client"); - gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send", - &payload_size); - gpr_cmdline_add_string(cl, "target", "Target host:port", &target); - gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure); - gpr_cmdline_add_string(cl, "scenario", "Scenario", &scenario_name); - gpr_cmdline_add_int(cl, "warmup", "Warmup seconds", &warmup_seconds); - gpr_cmdline_add_int(cl, "benchmark", "Benchmark seconds", &benchmark_seconds); - gpr_cmdline_parse(cl, argc, argv); - gpr_cmdline_destroy(cl); - - for (i = 0; i < GPR_ARRAY_SIZE(scenarios); i++) { - if (0 == strcmp(scenarios[i].name, scenario_name)) { - sc = scenarios[i]; - } - } - if (!sc.name) { - fprintf(stderr, "unsupported scenario '%s'. Valid are:", scenario_name); - fflush(stderr); - for (i = 0; i < GPR_ARRAY_SIZE(scenarios); i++) { - fprintf(stderr, " %s", scenarios[i].name); - fflush(stderr); - } - return 1; - } - - grpc_channel_credentials* creds = grpc_insecure_credentials_create(); - channel = grpc_channel_create(target, creds, nullptr); - grpc_channel_credentials_release(creds); - cq = grpc_completion_queue_create_for_next(nullptr); - the_buffer = - grpc_raw_byte_buffer_create(&slice, static_cast(payload_size)); - histogram = grpc_histogram_create(0.01, 60e9); - - sc.init(); - - gpr_timespec end_warmup = grpc_timeout_seconds_to_deadline(warmup_seconds); - gpr_timespec end_profiling = - grpc_timeout_seconds_to_deadline(warmup_seconds + benchmark_seconds); - - while (gpr_time_cmp(gpr_now(end_warmup.clock_type), end_warmup) < 0) { - sc.do_one_step(); - } - - gpr_log(GPR_INFO, "start profiling"); - grpc_profiler_start("client.prof"); - while (gpr_time_cmp(gpr_now(end_profiling.clock_type), end_profiling) < 0) { - start = now(); - sc.do_one_step(); - stop = now(); - grpc_histogram_add(histogram, stop - start); - } - grpc_profiler_stop(); - - if (call) { - grpc_call_unref(call); - } - - grpc_channel_destroy(channel); - grpc_completion_queue_shutdown(cq); - do { - event = grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), - nullptr); - } while (event.type != GRPC_QUEUE_SHUTDOWN); - grpc_completion_queue_destroy(cq); - grpc_byte_buffer_destroy(the_buffer); - grpc_slice_unref(slice); - - gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f", - grpc_histogram_percentile(histogram, 50), - grpc_histogram_percentile(histogram, 95), - grpc_histogram_percentile(histogram, 99), - grpc_histogram_percentile(histogram, 99.9)); - grpc_histogram_destroy(histogram); - - grpc_shutdown(); - - return 0; -} diff --git a/test/core/fling/fling_stream_test.cc b/test/core/fling/fling_stream_test.cc deleted file mode 100644 index 80b1ebda284..00000000000 --- a/test/core/fling/fling_stream_test.cc +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -#include - -#include "absl/strings/str_cat.h" - -#include "src/core/lib/gprpp/host_port.h" -#include "test/core/util/port.h" -#include "test/core/util/subprocess.h" - -int main(int /*argc*/, char** argv) { - char* me = argv[0]; - char* lslash = strrchr(me, '/'); - char root[1024]; - int port = grpc_pick_unused_port_or_die(); - char* args[10]; - int status; - gpr_subprocess *svr, *cli; - /* figure out where we are */ - if (lslash) { - memcpy(root, me, static_cast(lslash - me)); - root[lslash - me] = 0; - } else { - strcpy(root, "."); - } - /* start the server */ - std::string command = - absl::StrCat(root, "/fling_server", gpr_subprocess_binary_extension()); - args[0] = const_cast(command.c_str()); - args[1] = const_cast("--bind"); - std::string joined = grpc_core::JoinHostPort("::", port); - args[2] = const_cast(joined.c_str()); - args[3] = const_cast("--no-secure"); - svr = gpr_subprocess_create(4, const_cast(args)); - - /* start the client */ - command = - absl::StrCat(root, "/fling_client", gpr_subprocess_binary_extension()); - args[0] = const_cast(command.c_str()); - args[1] = const_cast("--target"); - joined = grpc_core::JoinHostPort("127.0.0.1", port); - args[2] = const_cast(joined.c_str()); - args[3] = const_cast("--scenario=ping-pong-stream"); - args[4] = const_cast("--no-secure"); - args[5] = nullptr; - cli = gpr_subprocess_create(6, const_cast(args)); - - /* wait for completion */ - printf("waiting for client\n"); - if ((status = gpr_subprocess_join(cli))) { - gpr_subprocess_destroy(cli); - gpr_subprocess_destroy(svr); - return status; - } - gpr_subprocess_destroy(cli); - - gpr_subprocess_interrupt(svr); - status = gpr_subprocess_join(svr); - gpr_subprocess_destroy(svr); - return status; -} diff --git a/test/core/fling/fling_test.cc b/test/core/fling/fling_test.cc deleted file mode 100644 index a96757e1cdc..00000000000 --- a/test/core/fling/fling_test.cc +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -#include - -#include "absl/strings/str_cat.h" - -#include - -#include "src/core/lib/gprpp/host_port.h" -#include "test/core/util/port.h" -#include "test/core/util/subprocess.h" - -int main(int /*argc*/, const char** argv) { - gpr_log_verbosity_init(); - const char* me = argv[0]; - const char* lslash = strrchr(me, '/'); - char root[1024]; - int port = grpc_pick_unused_port_or_die(); - char* args[10]; - int status; - gpr_subprocess *svr, *cli; - /* figure out where we are */ - if (lslash) { - memcpy(root, me, static_cast(lslash - me)); - root[lslash - me] = 0; - } else { - strcpy(root, "."); - } - /* start the server */ - std::string command = - absl::StrCat(root, "/fling_server", gpr_subprocess_binary_extension()); - args[0] = const_cast(command.c_str()); - args[1] = const_cast("--bind"); - std::string joined = grpc_core::JoinHostPort("::", port); - args[2] = const_cast(joined.c_str()); - args[3] = const_cast("--no-secure"); - svr = gpr_subprocess_create(4, const_cast(args)); - - /* start the client */ - command = - absl::StrCat(root, "/fling_client", gpr_subprocess_binary_extension()); - args[0] = const_cast(command.c_str()); - args[1] = const_cast("--target"); - joined = grpc_core::JoinHostPort("127.0.0.1", port); - args[2] = const_cast(joined.c_str()); - args[3] = const_cast("--scenario=ping-pong-request"); - args[4] = const_cast("--no-secure"); - args[5] = nullptr; - cli = gpr_subprocess_create(6, const_cast(args)); - - /* wait for completion */ - printf("waiting for client\n"); - if ((status = gpr_subprocess_join(cli))) { - gpr_subprocess_destroy(cli); - gpr_subprocess_destroy(svr); - return status; - } - gpr_subprocess_destroy(cli); - - gpr_subprocess_interrupt(svr); - status = gpr_subprocess_join(svr); - gpr_subprocess_destroy(svr); - return status; -} diff --git a/test/core/fling/server.cc b/test/core/fling/server.cc deleted file mode 100644 index 794d9b2baac..00000000000 --- a/test/core/fling/server.cc +++ /dev/null @@ -1,329 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#ifndef _WIN32 -/* This is for _exit() below, which is temporary. */ -#include -#endif - -#include - -#include -#include - -#include "src/core/lib/gprpp/host_port.h" -#include "test/core/end2end/data/ssl_test_data.h" -#include "test/core/util/cmdline.h" -#include "test/core/util/grpc_profiler.h" -#include "test/core/util/port.h" -#include "test/core/util/test_config.h" - -static grpc_completion_queue* cq; -static grpc_server* server; -static grpc_call* call; -static grpc_call_details call_details; -static grpc_metadata_array request_metadata_recv; -static grpc_metadata_array initial_metadata_send; -static grpc_byte_buffer* payload_buffer = nullptr; -/* Used to drain the terminal read in unary calls. */ -static grpc_byte_buffer* terminal_buffer = nullptr; - -static grpc_op read_op; -static grpc_op metadata_send_op; -static grpc_op write_op; -static grpc_op status_op[2]; -static int was_cancelled = 2; -static grpc_op unary_ops[6]; -static int got_sigint = 0; - -static void* tag(intptr_t t) { return reinterpret_cast(t); } - -typedef enum { - FLING_SERVER_NEW_REQUEST = 1, - FLING_SERVER_READ_FOR_UNARY, - FLING_SERVER_BATCH_OPS_FOR_UNARY, - FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING, - FLING_SERVER_READ_FOR_STREAMING, - FLING_SERVER_WRITE_FOR_STREAMING, - FLING_SERVER_SEND_STATUS_FOR_STREAMING -} fling_server_tags; - -typedef struct { - gpr_refcount pending_ops; - uint32_t flags; -} call_state; - -static void request_call(void) { - grpc_metadata_array_init(&request_metadata_recv); - GPR_ASSERT(GRPC_CALL_OK == - grpc_server_request_call(server, &call, &call_details, - &request_metadata_recv, cq, cq, - tag(FLING_SERVER_NEW_REQUEST))); -} - -static void handle_unary_method(void) { - grpc_op* op; - grpc_call_error error; - - grpc_metadata_array_init(&initial_metadata_send); - - op = unary_ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &terminal_buffer; - op++; - op->op = GRPC_OP_SEND_MESSAGE; - if (payload_buffer == nullptr) { - gpr_log(GPR_INFO, "NULL payload buffer !!!"); - } - op->data.send_message.send_message = payload_buffer; - op++; - op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; - op->data.send_status_from_server.status = GRPC_STATUS_OK; - op->data.send_status_from_server.trailing_metadata_count = 0; - op->data.send_status_from_server.status_details = nullptr; - op++; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled; - op++; - - error = grpc_call_start_batch(call, unary_ops, - static_cast(op - unary_ops), - tag(FLING_SERVER_BATCH_OPS_FOR_UNARY), nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); -} - -static void send_initial_metadata(void) { - grpc_call_error error; - void* tagarg = tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING); - grpc_metadata_array_init(&initial_metadata_send); - metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; - metadata_send_op.data.send_initial_metadata.count = 0; - error = grpc_call_start_batch(call, &metadata_send_op, 1, tagarg, nullptr); - - GPR_ASSERT(GRPC_CALL_OK == error); -} - -static void start_read_op(int t) { - grpc_call_error error; - /* Starting read at server */ - read_op.op = GRPC_OP_RECV_MESSAGE; - read_op.data.recv_message.recv_message = &payload_buffer; - error = grpc_call_start_batch(call, &read_op, 1, tag(t), nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); -} - -static void start_write_op(void) { - grpc_call_error error; - void* tagarg = tag(FLING_SERVER_WRITE_FOR_STREAMING); - /* Starting write at server */ - write_op.op = GRPC_OP_SEND_MESSAGE; - if (payload_buffer == nullptr) { - gpr_log(GPR_INFO, "NULL payload buffer !!!"); - } - write_op.data.send_message.send_message = payload_buffer; - error = grpc_call_start_batch(call, &write_op, 1, tagarg, nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); -} - -static void start_send_status(void) { - grpc_call_error error; - void* tagarg = tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING); - status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; - status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK; - status_op[0].data.send_status_from_server.trailing_metadata_count = 0; - status_op[0].data.send_status_from_server.status_details = nullptr; - status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - status_op[1].data.recv_close_on_server.cancelled = &was_cancelled; - - error = grpc_call_start_batch(call, status_op, 2, tagarg, nullptr); - GPR_ASSERT(GRPC_CALL_OK == error); -} - -/* We have some sort of deadlock, so let's not exit gracefully for now. - When that is resolved, please remove the #include above. */ -static void sigint_handler(int /*x*/) { _exit(0); } - -int main(int argc, char** argv) { - grpc_event ev; - call_state* s; - std::string addr_buf; - gpr_cmdline* cl; - grpc_completion_queue* shutdown_cq; - int shutdown_started = 0; - int shutdown_finished = 0; - - int secure = 0; - const char* addr = nullptr; - - char* fake_argv[1]; - - GPR_ASSERT(argc >= 1); - argc = 1; - fake_argv[0] = argv[0]; - grpc_test_init(&argc, fake_argv); - - grpc_init(); - srand(static_cast(clock())); - - cl = gpr_cmdline_create("fling server"); - gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr); - gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure); - gpr_cmdline_parse(cl, argc, argv); - gpr_cmdline_destroy(cl); - - if (addr == nullptr) { - addr_buf = grpc_core::JoinHostPort("::", grpc_pick_unused_port_or_die()); - addr = addr_buf.c_str(); - } - gpr_log(GPR_INFO, "creating server on: %s", addr); - - cq = grpc_completion_queue_create_for_next(nullptr); - grpc_server_credentials* creds; - if (secure) { - grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key, - test_server1_cert}; - creds = grpc_ssl_server_credentials_create(nullptr, &pem_key_cert_pair, 1, - 0, nullptr); - } else { - creds = grpc_insecure_server_credentials_create(); - } - server = grpc_server_create(nullptr, nullptr); - GPR_ASSERT(grpc_server_add_http2_port(server, addr, creds)); - grpc_server_credentials_release(creds); - grpc_server_register_completion_queue(server, cq, nullptr); - grpc_server_start(server); - - addr = nullptr; - addr_buf.clear(); - - grpc_call_details_init(&call_details); - - request_call(); - - grpc_profiler_start("server.prof"); - signal(SIGINT, sigint_handler); - while (!shutdown_finished) { - if (got_sigint && !shutdown_started) { - gpr_log(GPR_INFO, "Shutting down due to SIGINT"); - - shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr); - grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000)); - - GPR_ASSERT(grpc_completion_queue_pluck( - shutdown_cq, tag(1000), - grpc_timeout_seconds_to_deadline(5), nullptr) - .type == GRPC_OP_COMPLETE); - grpc_completion_queue_destroy(shutdown_cq); - - grpc_completion_queue_shutdown(cq); - shutdown_started = 1; - } - ev = grpc_completion_queue_next( - cq, - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000000, GPR_TIMESPAN)), - nullptr); - s = static_cast(ev.tag); - switch (ev.type) { - case GRPC_OP_COMPLETE: - switch (reinterpret_cast(s)) { - case FLING_SERVER_NEW_REQUEST: - if (call != nullptr) { - if (0 == grpc_slice_str_cmp(call_details.method, - "/Reflector/reflectStream")) { - /* Received streaming call. Send metadata here. */ - start_read_op(FLING_SERVER_READ_FOR_STREAMING); - send_initial_metadata(); - } else { - /* Received unary call. Can do all ops in one batch. */ - start_read_op(FLING_SERVER_READ_FOR_UNARY); - } - } else { - GPR_ASSERT(shutdown_started); - } - /* request_call(); - */ - break; - case FLING_SERVER_READ_FOR_STREAMING: - if (payload_buffer != nullptr) { - /* Received payload from client. */ - start_write_op(); - } else { - /* Received end of stream from client. */ - start_send_status(); - } - break; - case FLING_SERVER_WRITE_FOR_STREAMING: - /* Write completed at server */ - grpc_byte_buffer_destroy(payload_buffer); - payload_buffer = nullptr; - start_read_op(FLING_SERVER_READ_FOR_STREAMING); - break; - case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING: - /* Metadata send completed at server */ - break; - case FLING_SERVER_SEND_STATUS_FOR_STREAMING: - /* Send status and close completed at server */ - grpc_call_unref(call); - if (!shutdown_started) request_call(); - break; - case FLING_SERVER_READ_FOR_UNARY: - /* Finished payload read for unary. Start all reamaining - * unary ops in a batch. - */ - handle_unary_method(); - break; - case FLING_SERVER_BATCH_OPS_FOR_UNARY: - /* Finished unary call. */ - grpc_byte_buffer_destroy(payload_buffer); - payload_buffer = nullptr; - grpc_call_unref(call); - if (!shutdown_started) request_call(); - break; - } - break; - case GRPC_QUEUE_SHUTDOWN: - GPR_ASSERT(shutdown_started); - shutdown_finished = 1; - break; - case GRPC_QUEUE_TIMEOUT: - break; - } - } - grpc_profiler_stop(); - grpc_call_details_destroy(&call_details); - - grpc_server_destroy(server); - grpc_completion_queue_destroy(cq); - grpc_shutdown(); - return 0; -} diff --git a/test/core/memory_usage/server.cc b/test/core/memory_usage/server.cc index 4b4b28309e1..92b61360bac 100644 --- a/test/core/memory_usage/server.cc +++ b/test/core/memory_usage/server.cc @@ -59,7 +59,6 @@ static grpc_op snapshot_ops[5]; static grpc_op status_op; static int got_sigint = 0; static grpc_byte_buffer* payload_buffer = nullptr; -static grpc_byte_buffer* terminal_buffer = nullptr; static int was_cancelled = 2; static void* tag(intptr_t t) { return reinterpret_cast(t); } @@ -131,9 +130,6 @@ static void send_snapshot(void* tag, MemStats* snapshot) { op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op++; - op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message.recv_message = &terminal_buffer; - op++; op->op = GRPC_OP_SEND_MESSAGE; if (payload_buffer == nullptr) { gpr_log(GPR_INFO, "NULL payload buffer !!!"); @@ -306,12 +302,10 @@ int main(int argc, char** argv) { // FLING_SERVER_SEND_STATUS_SNAPSHOT to destroy the snapshot call case FLING_SERVER_SEND_STATUS_SNAPSHOT: grpc_byte_buffer_destroy(payload_buffer); - grpc_byte_buffer_destroy(terminal_buffer); grpc_call_unref(s->call); grpc_call_details_destroy(&s->call_details); grpc_metadata_array_destroy(&s->initial_metadata_send); grpc_metadata_array_destroy(&s->request_metadata_recv); - terminal_buffer = nullptr; payload_buffer = nullptr; break; } diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 24876625b32..08d750fbc5f 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -137,50 +137,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "fling_stream_test", - "platforms": [ - "linux", - "mac", - "posix" - ], - "uses_polling": true - }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "fling_test", - "platforms": [ - "linux", - "mac", - "posix" - ], - "uses_polling": true - }, { "args": [], "benchmark": false,