[surface] Ensure SEND_STATUS & RECV_MESSAGE do not inhabit the same batch (#31554)

Add a check that SEND_STATUS_FROM_SERVER and RECV_MESSAGE are not in the same batch.

This is necessary pre-work for #31204 and implements part of grpc/proposal#336.

Also eliminates fling instead of updating it:

My expectation is nobody has looked at this corner in many years
It's not a benchmark we want: concentrating on a microbenchmark that doesn't include a binding layer caused us to favor designs that emphasized a lightweight core at the expense of a expensive bindings. We should consider the whole.

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31630/head
Craig Tiller 2 years ago committed by GitHub
parent 88a6b304fb
commit d410f1d0aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 98
      CMakeLists.txt
  2. 84
      build_autogenerated.yaml
  3. 9
      src/core/lib/surface/call.cc
  4. 6
      src/php/tests/unit_tests/CallInvokerTest.php
  5. 9
      src/php/tests/unit_tests/EndToEndTest.php
  6. 12
      src/php/tests/unit_tests/InterceptorTest.php
  7. 8
      src/php/tests/unit_tests/SecureEndToEndTest.php
  8. 27
      src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
  9. 28
      src/ruby/spec/client_server_spec.rb
  10. 19
      test/core/end2end/tests/max_message_length.cc
  11. 9
      test/core/end2end/tests/retry.cc
  12. 15
      test/core/end2end/tests/retry_per_attempt_recv_timeout.cc
  13. 15
      test/core/end2end/tests/retry_send_initial_metadata_refs.cc
  14. 18
      test/core/end2end/tests/retry_transparent_max_concurrent_streams.cc
  15. 89
      test/core/fling/BUILD
  16. 250
      test/core/fling/client.cc
  17. 80
      test/core/fling/fling_stream_test.cc
  18. 83
      test/core/fling/fling_test.cc
  19. 329
      test/core/fling/server.cc
  20. 6
      test/core/memory_usage/server.cc
  21. 44
      tools/run_tests/generated/tests.json

98
CMakeLists.txt generated

@ -771,12 +771,6 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c fd_conservation_posix_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c fling_stream_test)
endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_c fling_test)
endif()
add_dependencies(buildtests_c goaway_server_test)
add_dependencies(buildtests_c inproc_callback_test)
add_dependencies(buildtests_c invalid_call_argument_test)
@ -4624,98 +4618,6 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(fling_stream_test
test/core/end2end/data/client_certs.cc
test/core/end2end/data/server1_cert.cc
test/core/end2end/data/server1_key.cc
test/core/end2end/data/test_root_cert.cc
test/core/fling/fling_stream_test.cc
test/core/util/cmdline.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
test/core/util/histogram.cc
test/core/util/mock_endpoint.cc
test/core/util/parse_hexstring.cc
test/core/util/passthru_endpoint.cc
test/core/util/resolve_localhost_ip46.cc
test/core/util/slice_splitter.cc
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/tracer_util.cc
)
target_include_directories(fling_stream_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(fling_stream_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_executable(fling_test
test/core/end2end/data/client_certs.cc
test/core/end2end/data/server1_cert.cc
test/core/end2end/data/server1_key.cc
test/core/end2end/data/test_root_cert.cc
test/core/fling/fling_test.cc
test/core/util/cmdline.cc
test/core/util/fuzzer_util.cc
test/core/util/grpc_profiler.cc
test/core/util/histogram.cc
test/core/util/mock_endpoint.cc
test/core/util/parse_hexstring.cc
test/core/util/passthru_endpoint.cc
test/core/util/resolve_localhost_ip46.cc
test/core/util/slice_splitter.cc
test/core/util/subprocess_posix.cc
test/core/util/subprocess_windows.cc
test/core/util/tracer_util.cc
)
target_include_directories(fling_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
)
target_link_libraries(fling_test
${_gRPC_ZLIB_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
)
endif()
endif()
if(gRPC_BUILD_TESTS)

@ -3541,90 +3541,6 @@ targets:
- linux
- posix
- mac
- name: fling_stream_test
build: test
language: c
headers:
- test/core/end2end/data/ssl_test_data.h
- test/core/util/cmdline.h
- test/core/util/evaluate_args_test_util.h
- test/core/util/fuzzer_util.h
- test/core/util/grpc_profiler.h
- test/core/util/histogram.h
- test/core/util/mock_authorization_endpoint.h
- test/core/util/mock_endpoint.h
- test/core/util/parse_hexstring.h
- test/core/util/passthru_endpoint.h
- test/core/util/resolve_localhost_ip46.h
- test/core/util/slice_splitter.h
- test/core/util/subprocess.h
- test/core/util/tracer_util.h
src:
- test/core/end2end/data/client_certs.cc
- test/core/end2end/data/server1_cert.cc
- test/core/end2end/data/server1_key.cc
- test/core/end2end/data/test_root_cert.cc
- test/core/fling/fling_stream_test.cc
- test/core/util/cmdline.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc
- test/core/util/histogram.cc
- test/core/util/mock_endpoint.cc
- test/core/util/parse_hexstring.cc
- test/core/util/passthru_endpoint.cc
- test/core/util/resolve_localhost_ip46.cc
- test/core/util/slice_splitter.cc
- test/core/util/subprocess_posix.cc
- test/core/util/subprocess_windows.cc
- test/core/util/tracer_util.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: fling_test
build: test
language: c
headers:
- test/core/end2end/data/ssl_test_data.h
- test/core/util/cmdline.h
- test/core/util/evaluate_args_test_util.h
- test/core/util/fuzzer_util.h
- test/core/util/grpc_profiler.h
- test/core/util/histogram.h
- test/core/util/mock_authorization_endpoint.h
- test/core/util/mock_endpoint.h
- test/core/util/parse_hexstring.h
- test/core/util/passthru_endpoint.h
- test/core/util/resolve_localhost_ip46.h
- test/core/util/slice_splitter.h
- test/core/util/subprocess.h
- test/core/util/tracer_util.h
src:
- test/core/end2end/data/client_certs.cc
- test/core/end2end/data/server1_cert.cc
- test/core/end2end/data/server1_key.cc
- test/core/end2end/data/test_root_cert.cc
- test/core/fling/fling_test.cc
- test/core/util/cmdline.cc
- test/core/util/fuzzer_util.cc
- test/core/util/grpc_profiler.cc
- test/core/util/histogram.cc
- test/core/util/mock_endpoint.cc
- test/core/util/parse_hexstring.cc
- test/core/util/passthru_endpoint.cc
- test/core/util/resolve_localhost_ip46.cc
- test/core/util/slice_splitter.cc
- test/core/util/subprocess_posix.cc
- test/core/util/subprocess_windows.cc
- test/core/util/tracer_util.cc
deps:
- grpc_test_util
platforms:
- linux
- posix
- mac
- name: goaway_server_test
build: test
language: c

@ -1362,6 +1362,15 @@ grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
seen_ops |= (1u << ops[i].op);
}
if (!is_client() &&
(seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 &&
(seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) {
gpr_log(GPR_ERROR,
"******************* SEND_STATUS WITH RECV_MESSAGE "
"*******************");
return GRPC_CALL_ERROR;
}
GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
if (nops == 0) {

@ -214,6 +214,10 @@ class CallInvokerTest extends \PHPUnit\Framework\TestCase
$event = $this->server->requestCall();
$this->assertSame('/phony_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_RECV_MESSAGE => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
@ -221,10 +225,8 @@ class CallInvokerTest extends \PHPUnit\Framework\TestCase
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$call_invoker->getChannel()->close();
unset($unary_call);
unset($server_call);

@ -158,14 +158,18 @@ class EndToEndTest extends \PHPUnit\Framework\TestCase
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_RECV_MESSAGE => true,
]);
$this->assertSame($req_text, $event->message);
$event = $server_call->startBatch([
Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
'metadata' => [],
'code' => Grpc\STATUS_OK,
'details' => $status_text,
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
@ -173,7 +177,6 @@ class EndToEndTest extends \PHPUnit\Framework\TestCase
$this->assertTrue($event->send_status);
$this->assertTrue($event->send_message);
$this->assertFalse($event->cancelled);
$this->assertSame($req_text, $event->message);
$event = $call->startBatch([
Grpc\OP_RECV_INITIAL_METADATA => true,

@ -339,6 +339,10 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase
$event = $this->server->requestCall();
$this->assertSame('/phony_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_RECV_MESSAGE => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
@ -346,16 +350,18 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_unary_request', $event->message);
$stream_call = $client->StreamCall();
$stream_call->write($req);
$event = $this->server->requestCall();
$this->assertSame('/phony_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_RECV_MESSAGE => true,
]);
$this->assertSame('intercepted_stream_request', $event->message);
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_STATUS_FROM_SERVER => [
@ -363,10 +369,8 @@ class InterceptorTest extends \PHPUnit\Framework\TestCase
'code' => Grpc\STATUS_OK,
'details' => '',
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
$this->assertSame('intercepted_stream_request', $event->message);
unset($unary_call);
unset($stream_call);

@ -177,6 +177,12 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase
$this->assertSame('phony_method', $event->method);
$server_call = $event->call;
$event = $server_call->startBatch([
Grpc\OP_RECV_MESSAGE => true,
]);
$this->assertSame($req_text, $event->message);
$event = $server_call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
Grpc\OP_SEND_MESSAGE => ['message' => $reply_text],
@ -185,7 +191,6 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase
'code' => Grpc\STATUS_OK,
'details' => $status_text,
],
Grpc\OP_RECV_MESSAGE => true,
Grpc\OP_RECV_CLOSE_ON_SERVER => true,
]);
@ -193,7 +198,6 @@ class SecureEndToEndTest extends \PHPUnit\Framework\TestCase
$this->assertTrue($event->send_status);
$this->assertTrue($event->send_message);
$this->assertFalse($event->cancelled);
$this->assertSame($req_text, $event->message);
$event = $call->startBatch([
Grpc\OP_RECV_INITIAL_METADATA => true,

@ -227,6 +227,15 @@ class ServerClientMixin(object):
server_call_tag = object()
server_call = request_event.call
server_start_batch_result = server_call.start_server_batch([
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
server_message_event = self.server_completion_queue.poll(
deadline=DEADLINE)
server_call_tag = object()
server_initial_metadata = ((
SERVER_INITIAL_METADATA_KEY,
SERVER_INITIAL_METADATA_VALUE,
@ -238,7 +247,6 @@ class ServerClientMixin(object):
server_start_batch_result = server_call.start_server_batch([
cygrpc.SendInitialMetadataOperation(server_initial_metadata,
_EMPTY_FLAGS),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
cygrpc.SendStatusFromServerOperation(
@ -282,7 +290,21 @@ class ServerClientMixin(object):
cygrpc.OperationType.receive_status_on_client
]), found_client_op_types)
self.assertEqual(5, len(server_event.batch_operations))
self.assertEqual(1, len(server_message_event.batch_operations))
found_server_op_types = set()
for server_result in server_message_event.batch_operations:
self.assertNotIn(server_result.type(), found_server_op_types)
found_server_op_types.add(server_result.type())
if server_result.type() == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.message())
elif server_result.type(
) == cygrpc.OperationType.receive_close_on_server:
self.assertFalse(server_result.cancelled())
self.assertEqual(set([
cygrpc.OperationType.receive_message,
]), found_server_op_types)
self.assertEqual(4, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
self.assertNotIn(server_result.type(), found_server_op_types)
@ -295,7 +317,6 @@ class ServerClientMixin(object):
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.send_message,
cygrpc.OperationType.receive_close_on_server,
cygrpc.OperationType.send_status_from_server

@ -85,12 +85,15 @@ shared_examples 'basic GRPC message delivery is OK' do
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
expect(server_batch.send_close).to be true
expect(server_batch.send_status).to be true
@ -123,13 +126,16 @@ shared_examples 'basic GRPC message delivery is OK' do
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_MESSAGE => reply_text,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(sent_message)
expect(server_batch.send_close).to be true
expect(server_batch.send_message).to be true
expect(server_batch.send_status).to be true
@ -168,13 +174,16 @@ shared_examples 'basic GRPC message delivery is OK' do
# confirm the server can read the inbound message
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(long_request_str)
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil,
CallOps::SEND_MESSAGE => long_response_str,
CallOps::SEND_STATUS_FROM_SERVER => ok_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq(long_request_str)
expect(server_batch.send_close).to be true
expect(server_batch.send_message).to be true
expect(server_batch.send_status).to be true
@ -245,12 +254,15 @@ shared_examples 'basic GRPC message delivery is OK' do
the_status = Struct::Status.new(StatusCodes::OK, 'OK', {})
server_thread.join
server_ops = {
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_MESSAGE => nil
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq sent_message
server_ops = {
CallOps::SEND_MESSAGE => reply_text,
CallOps::SEND_STATUS_FROM_SERVER => the_status
}
server_batch = server_call.run_batch(server_ops)
expect(server_batch.message).to eq sent_message
expect(server_batch.send_status).to be true
expect(server_batch.send_message).to be true

@ -576,13 +576,19 @@ static void test_max_receive_message_length_on_compressed_request(
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_payload;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &recv_payload;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
@ -602,11 +608,12 @@ static void test_max_receive_message_length_on_compressed_request(
op->reserved = nullptr;
op++;
}
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(102), true);
cqv.Expect(tag(102), minimal_stack);
cqv.Expect(tag(103), true);
cqv.Expect(tag(1), true);
cqv.Verify();

@ -274,6 +274,12 @@ static void test_retry(grpc_end2end_test_config config) {
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
@ -282,11 +288,12 @@ static void test_retry(grpc_end2end_test_config config) {
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(203),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(202), true);
cqv.Expect(tag(203), true);
cqv.Expect(tag(1), true);
cqv.Verify();

@ -291,15 +291,21 @@ static void test_retry_per_attempt_recv_timeout(
}
GPR_ASSERT(found_retry_header);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(302),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server sends OK status.
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
@ -311,11 +317,12 @@ static void test_retry_per_attempt_recv_timeout(
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(302),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(303),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(302), true);
cqv.Expect(tag(303), true);
cqv.Expect(tag(1), true);
cqv.Verify();

@ -301,12 +301,18 @@ static void test_retry_send_initial_metadata_refs(
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = response_payload;
op++;
@ -318,11 +324,12 @@ static void test_retry_send_initial_metadata_refs(
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(203),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
cqv.Expect(tag(202), true);
cqv.Expect(tag(203), true);
cqv.Expect(tag(2), true);
cqv.Verify();

@ -227,6 +227,12 @@ static void test_retry_transparent_max_concurrent_streams(
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
@ -241,12 +247,13 @@ static void test_retry_transparent_max_concurrent_streams(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(104),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Server completes first call and shutdown.
// Client completes first call.
cqv.Expect(tag(104), true);
cqv.Expect(tag(103), true);
cqv.Expect(tag(102), true);
cqv.Expect(tag(1), true);
@ -304,6 +311,12 @@ static void test_retry_transparent_max_concurrent_streams(
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &request_payload_recv;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
@ -318,11 +331,12 @@ static void test_retry_transparent_max_concurrent_streams(
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.status_details = &status_details;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(202),
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(203),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
// Second call completes.
cqv.Expect(tag(203), true);
cqv.Expect(tag(202), true);
cqv.Expect(tag(2), true);
cqv.Verify();

@ -1,89 +0,0 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_test", "grpc_package")
grpc_package(name = "test/core/fling")
licenses(["notice"])
grpc_cc_binary(
name = "fling_client",
testonly = 1,
srcs = ["client.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
)
grpc_cc_binary(
name = "fling_server",
testonly = 1,
srcs = ["server.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
)
grpc_cc_test(
name = "fling_test",
srcs = ["fling_test.cc"],
data = [
":fling_client",
":fling_server",
],
tags = [
"no_windows",
"requires-net:ipv4",
"requires-net:loopback",
],
deps = [
"//:gpr",
"//:grpc",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
)
grpc_cc_test(
name = "fling_stream_test",
srcs = ["fling_stream_test.cc"],
data = [
":fling_client",
":fling_server",
],
tags = [
"no_windows",
"requires-net:ipv4",
"requires-net:loopback",
],
deps = [
"//:gpr",
"//:grpc",
"//test/core/end2end:ssl_test_data",
"//test/core/util:grpc_test_util",
"//test/core/util:grpc_test_util_base",
],
)

@ -1,250 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/propagation_bits.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
#include "test/core/util/cmdline.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/histogram.h"
#include "test/core/util/test_config.h"
static grpc_histogram* histogram;
static grpc_byte_buffer* the_buffer;
static grpc_channel* channel;
static grpc_completion_queue* cq;
static grpc_call* call;
static grpc_op ops[6];
static grpc_op stream_init_ops[2];
static grpc_op stream_step_ops[2];
static grpc_metadata_array initial_metadata_recv;
static grpc_metadata_array trailing_metadata_recv;
static grpc_byte_buffer* response_payload_recv = nullptr;
static grpc_status_code status;
static grpc_slice details;
static grpc_op* op;
static void init_ping_pong_request(void) {
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = the_buffer;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &response_payload_recv;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op++;
}
static void step_ping_pong_request(void) {
grpc_slice host = grpc_slice_from_static_string("localhost");
call = grpc_channel_create_call(
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/Reflector/reflectUnary"), &host,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops,
(size_t)(op - ops), (void*)1,
nullptr));
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_call_unref(call);
grpc_byte_buffer_destroy(response_payload_recv);
call = nullptr;
}
static void init_ping_pong_stream(void) {
grpc_metadata_array_init(&initial_metadata_recv);
grpc_call_error error;
grpc_slice host = grpc_slice_from_static_string("localhost");
call = grpc_channel_create_call(
channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
grpc_slice_from_static_string("/Reflector/reflectStream"), &host,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
stream_init_ops[0].data.send_initial_metadata.count = 0;
stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
stream_init_ops[1].data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv;
error = grpc_call_start_batch(call, stream_init_ops, 2,
reinterpret_cast<void*>(1), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_metadata_array_init(&initial_metadata_recv);
stream_step_ops[0].op = GRPC_OP_SEND_MESSAGE;
stream_step_ops[0].data.send_message.send_message = the_buffer;
stream_step_ops[1].op = GRPC_OP_RECV_MESSAGE;
stream_step_ops[1].data.recv_message.recv_message = &response_payload_recv;
}
static void step_ping_pong_stream(void) {
grpc_call_error error;
error = grpc_call_start_batch(call, stream_step_ops, 2,
reinterpret_cast<void*>(1), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
grpc_byte_buffer_destroy(response_payload_recv);
}
static double now(void) {
gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME);
return 1e9 * static_cast<double>(tv.tv_sec) + tv.tv_nsec;
}
typedef struct {
const char* name;
void (*init)();
void (*do_one_step)();
} scenario;
static const scenario scenarios[] = {
{"ping-pong-request", init_ping_pong_request, step_ping_pong_request},
{"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream},
};
int main(int argc, char** argv) {
grpc_slice slice = grpc_slice_from_copied_string("x");
double start, stop;
unsigned i;
int fake_argc = 1;
char* fake_argv[1];
int payload_size = 1;
int secure = 0;
const char* target = "localhost:443";
gpr_cmdline* cl;
grpc_event event;
const char* scenario_name = "ping-pong-request";
scenario sc = {nullptr, nullptr, nullptr};
GPR_ASSERT(argc >= 1);
fake_argv[0] = argv[0];
grpc::testing::TestEnvironment env(&fake_argc, fake_argv);
grpc_init();
int warmup_seconds = 1;
int benchmark_seconds = 5;
cl = gpr_cmdline_create("fling client");
gpr_cmdline_add_int(cl, "payload_size", "Size of the payload to send",
&payload_size);
gpr_cmdline_add_string(cl, "target", "Target host:port", &target);
gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure);
gpr_cmdline_add_string(cl, "scenario", "Scenario", &scenario_name);
gpr_cmdline_add_int(cl, "warmup", "Warmup seconds", &warmup_seconds);
gpr_cmdline_add_int(cl, "benchmark", "Benchmark seconds", &benchmark_seconds);
gpr_cmdline_parse(cl, argc, argv);
gpr_cmdline_destroy(cl);
for (i = 0; i < GPR_ARRAY_SIZE(scenarios); i++) {
if (0 == strcmp(scenarios[i].name, scenario_name)) {
sc = scenarios[i];
}
}
if (!sc.name) {
fprintf(stderr, "unsupported scenario '%s'. Valid are:", scenario_name);
fflush(stderr);
for (i = 0; i < GPR_ARRAY_SIZE(scenarios); i++) {
fprintf(stderr, " %s", scenarios[i].name);
fflush(stderr);
}
return 1;
}
grpc_channel_credentials* creds = grpc_insecure_credentials_create();
channel = grpc_channel_create(target, creds, nullptr);
grpc_channel_credentials_release(creds);
cq = grpc_completion_queue_create_for_next(nullptr);
the_buffer =
grpc_raw_byte_buffer_create(&slice, static_cast<size_t>(payload_size));
histogram = grpc_histogram_create(0.01, 60e9);
sc.init();
gpr_timespec end_warmup = grpc_timeout_seconds_to_deadline(warmup_seconds);
gpr_timespec end_profiling =
grpc_timeout_seconds_to_deadline(warmup_seconds + benchmark_seconds);
while (gpr_time_cmp(gpr_now(end_warmup.clock_type), end_warmup) < 0) {
sc.do_one_step();
}
gpr_log(GPR_INFO, "start profiling");
grpc_profiler_start("client.prof");
while (gpr_time_cmp(gpr_now(end_profiling.clock_type), end_profiling) < 0) {
start = now();
sc.do_one_step();
stop = now();
grpc_histogram_add(histogram, stop - start);
}
grpc_profiler_stop();
if (call) {
grpc_call_unref(call);
}
grpc_channel_destroy(channel);
grpc_completion_queue_shutdown(cq);
do {
event = grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
nullptr);
} while (event.type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cq);
grpc_byte_buffer_destroy(the_buffer);
grpc_slice_unref(slice);
gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f",
grpc_histogram_percentile(histogram, 50),
grpc_histogram_percentile(histogram, 95),
grpc_histogram_percentile(histogram, 99),
grpc_histogram_percentile(histogram, 99.9));
grpc_histogram_destroy(histogram);
grpc_shutdown();
return 0;
}

@ -1,80 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <string>
#include "absl/strings/str_cat.h"
#include "src/core/lib/gprpp/host_port.h"
#include "test/core/util/port.h"
#include "test/core/util/subprocess.h"
int main(int /*argc*/, char** argv) {
char* me = argv[0];
char* lslash = strrchr(me, '/');
char root[1024];
int port = grpc_pick_unused_port_or_die();
char* args[10];
int status;
gpr_subprocess *svr, *cli;
/* figure out where we are */
if (lslash) {
memcpy(root, me, static_cast<size_t>(lslash - me));
root[lslash - me] = 0;
} else {
strcpy(root, ".");
}
/* start the server */
std::string command =
absl::StrCat(root, "/fling_server", gpr_subprocess_binary_extension());
args[0] = const_cast<char*>(command.c_str());
args[1] = const_cast<char*>("--bind");
std::string joined = grpc_core::JoinHostPort("::", port);
args[2] = const_cast<char*>(joined.c_str());
args[3] = const_cast<char*>("--no-secure");
svr = gpr_subprocess_create(4, const_cast<const char**>(args));
/* start the client */
command =
absl::StrCat(root, "/fling_client", gpr_subprocess_binary_extension());
args[0] = const_cast<char*>(command.c_str());
args[1] = const_cast<char*>("--target");
joined = grpc_core::JoinHostPort("127.0.0.1", port);
args[2] = const_cast<char*>(joined.c_str());
args[3] = const_cast<char*>("--scenario=ping-pong-stream");
args[4] = const_cast<char*>("--no-secure");
args[5] = nullptr;
cli = gpr_subprocess_create(6, const_cast<const char**>(args));
/* wait for completion */
printf("waiting for client\n");
if ((status = gpr_subprocess_join(cli))) {
gpr_subprocess_destroy(cli);
gpr_subprocess_destroy(svr);
return status;
}
gpr_subprocess_destroy(cli);
gpr_subprocess_interrupt(svr);
status = gpr_subprocess_join(svr);
gpr_subprocess_destroy(svr);
return status;
}

@ -1,83 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <string.h>
#include <string>
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/host_port.h"
#include "test/core/util/port.h"
#include "test/core/util/subprocess.h"
int main(int /*argc*/, const char** argv) {
gpr_log_verbosity_init();
const char* me = argv[0];
const char* lslash = strrchr(me, '/');
char root[1024];
int port = grpc_pick_unused_port_or_die();
char* args[10];
int status;
gpr_subprocess *svr, *cli;
/* figure out where we are */
if (lslash) {
memcpy(root, me, static_cast<size_t>(lslash - me));
root[lslash - me] = 0;
} else {
strcpy(root, ".");
}
/* start the server */
std::string command =
absl::StrCat(root, "/fling_server", gpr_subprocess_binary_extension());
args[0] = const_cast<char*>(command.c_str());
args[1] = const_cast<char*>("--bind");
std::string joined = grpc_core::JoinHostPort("::", port);
args[2] = const_cast<char*>(joined.c_str());
args[3] = const_cast<char*>("--no-secure");
svr = gpr_subprocess_create(4, const_cast<const char**>(args));
/* start the client */
command =
absl::StrCat(root, "/fling_client", gpr_subprocess_binary_extension());
args[0] = const_cast<char*>(command.c_str());
args[1] = const_cast<char*>("--target");
joined = grpc_core::JoinHostPort("127.0.0.1", port);
args[2] = const_cast<char*>(joined.c_str());
args[3] = const_cast<char*>("--scenario=ping-pong-request");
args[4] = const_cast<char*>("--no-secure");
args[5] = nullptr;
cli = gpr_subprocess_create(6, const_cast<const char**>(args));
/* wait for completion */
printf("waiting for client\n");
if ((status = gpr_subprocess_join(cli))) {
gpr_subprocess_destroy(cli);
gpr_subprocess_destroy(svr);
return status;
}
gpr_subprocess_destroy(cli);
gpr_subprocess_interrupt(svr);
status = gpr_subprocess_join(svr);
gpr_subprocess_destroy(svr);
return status;
}

@ -1,329 +0,0 @@
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <signal.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/sync.h>
#ifndef _WIN32
/* This is for _exit() below, which is temporary. */
#include <unistd.h>
#endif
#include <string>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gprpp/host_port.h"
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/cmdline.h"
#include "test/core/util/grpc_profiler.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
static grpc_completion_queue* cq;
static grpc_server* server;
static grpc_call* call;
static grpc_call_details call_details;
static grpc_metadata_array request_metadata_recv;
static grpc_metadata_array initial_metadata_send;
static grpc_byte_buffer* payload_buffer = nullptr;
/* Used to drain the terminal read in unary calls. */
static grpc_byte_buffer* terminal_buffer = nullptr;
static grpc_op read_op;
static grpc_op metadata_send_op;
static grpc_op write_op;
static grpc_op status_op[2];
static int was_cancelled = 2;
static grpc_op unary_ops[6];
static int got_sigint = 0;
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
typedef enum {
FLING_SERVER_NEW_REQUEST = 1,
FLING_SERVER_READ_FOR_UNARY,
FLING_SERVER_BATCH_OPS_FOR_UNARY,
FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING,
FLING_SERVER_READ_FOR_STREAMING,
FLING_SERVER_WRITE_FOR_STREAMING,
FLING_SERVER_SEND_STATUS_FOR_STREAMING
} fling_server_tags;
typedef struct {
gpr_refcount pending_ops;
uint32_t flags;
} call_state;
static void request_call(void) {
grpc_metadata_array_init(&request_metadata_recv);
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_call(server, &call, &call_details,
&request_metadata_recv, cq, cq,
tag(FLING_SERVER_NEW_REQUEST)));
}
static void handle_unary_method(void) {
grpc_op* op;
grpc_call_error error;
grpc_metadata_array_init(&initial_metadata_send);
op = unary_ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &terminal_buffer;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
if (payload_buffer == nullptr) {
gpr_log(GPR_INFO, "NULL payload buffer !!!");
}
op->data.send_message.send_message = payload_buffer;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status_details = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op++;
error = grpc_call_start_batch(call, unary_ops,
static_cast<size_t>(op - unary_ops),
tag(FLING_SERVER_BATCH_OPS_FOR_UNARY), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
}
static void send_initial_metadata(void) {
grpc_call_error error;
void* tagarg = tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING);
grpc_metadata_array_init(&initial_metadata_send);
metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA;
metadata_send_op.data.send_initial_metadata.count = 0;
error = grpc_call_start_batch(call, &metadata_send_op, 1, tagarg, nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
}
static void start_read_op(int t) {
grpc_call_error error;
/* Starting read at server */
read_op.op = GRPC_OP_RECV_MESSAGE;
read_op.data.recv_message.recv_message = &payload_buffer;
error = grpc_call_start_batch(call, &read_op, 1, tag(t), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
}
static void start_write_op(void) {
grpc_call_error error;
void* tagarg = tag(FLING_SERVER_WRITE_FOR_STREAMING);
/* Starting write at server */
write_op.op = GRPC_OP_SEND_MESSAGE;
if (payload_buffer == nullptr) {
gpr_log(GPR_INFO, "NULL payload buffer !!!");
}
write_op.data.send_message.send_message = payload_buffer;
error = grpc_call_start_batch(call, &write_op, 1, tagarg, nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
}
static void start_send_status(void) {
grpc_call_error error;
void* tagarg = tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING);
status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK;
status_op[0].data.send_status_from_server.trailing_metadata_count = 0;
status_op[0].data.send_status_from_server.status_details = nullptr;
status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
status_op[1].data.recv_close_on_server.cancelled = &was_cancelled;
error = grpc_call_start_batch(call, status_op, 2, tagarg, nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
}
/* We have some sort of deadlock, so let's not exit gracefully for now.
When that is resolved, please remove the #include <unistd.h> above. */
static void sigint_handler(int /*x*/) { _exit(0); }
int main(int argc, char** argv) {
grpc_event ev;
call_state* s;
std::string addr_buf;
gpr_cmdline* cl;
grpc_completion_queue* shutdown_cq;
int shutdown_started = 0;
int shutdown_finished = 0;
int secure = 0;
const char* addr = nullptr;
char* fake_argv[1];
GPR_ASSERT(argc >= 1);
argc = 1;
fake_argv[0] = argv[0];
grpc_test_init(&argc, fake_argv);
grpc_init();
srand(static_cast<unsigned>(clock()));
cl = gpr_cmdline_create("fling server");
gpr_cmdline_add_string(cl, "bind", "Bind host:port", &addr);
gpr_cmdline_add_flag(cl, "secure", "Run with security?", &secure);
gpr_cmdline_parse(cl, argc, argv);
gpr_cmdline_destroy(cl);
if (addr == nullptr) {
addr_buf = grpc_core::JoinHostPort("::", grpc_pick_unused_port_or_die());
addr = addr_buf.c_str();
}
gpr_log(GPR_INFO, "creating server on: %s", addr);
cq = grpc_completion_queue_create_for_next(nullptr);
grpc_server_credentials* creds;
if (secure) {
grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key,
test_server1_cert};
creds = grpc_ssl_server_credentials_create(nullptr, &pem_key_cert_pair, 1,
0, nullptr);
} else {
creds = grpc_insecure_server_credentials_create();
}
server = grpc_server_create(nullptr, nullptr);
GPR_ASSERT(grpc_server_add_http2_port(server, addr, creds));
grpc_server_credentials_release(creds);
grpc_server_register_completion_queue(server, cq, nullptr);
grpc_server_start(server);
addr = nullptr;
addr_buf.clear();
grpc_call_details_init(&call_details);
request_call();
grpc_profiler_start("server.prof");
signal(SIGINT, sigint_handler);
while (!shutdown_finished) {
if (got_sigint && !shutdown_started) {
gpr_log(GPR_INFO, "Shutting down due to SIGINT");
shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(
shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5), nullptr)
.type == GRPC_OP_COMPLETE);
grpc_completion_queue_destroy(shutdown_cq);
grpc_completion_queue_shutdown(cq);
shutdown_started = 1;
}
ev = grpc_completion_queue_next(
cq,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_micros(1000000, GPR_TIMESPAN)),
nullptr);
s = static_cast<call_state*>(ev.tag);
switch (ev.type) {
case GRPC_OP_COMPLETE:
switch (reinterpret_cast<intptr_t>(s)) {
case FLING_SERVER_NEW_REQUEST:
if (call != nullptr) {
if (0 == grpc_slice_str_cmp(call_details.method,
"/Reflector/reflectStream")) {
/* Received streaming call. Send metadata here. */
start_read_op(FLING_SERVER_READ_FOR_STREAMING);
send_initial_metadata();
} else {
/* Received unary call. Can do all ops in one batch. */
start_read_op(FLING_SERVER_READ_FOR_UNARY);
}
} else {
GPR_ASSERT(shutdown_started);
}
/* request_call();
*/
break;
case FLING_SERVER_READ_FOR_STREAMING:
if (payload_buffer != nullptr) {
/* Received payload from client. */
start_write_op();
} else {
/* Received end of stream from client. */
start_send_status();
}
break;
case FLING_SERVER_WRITE_FOR_STREAMING:
/* Write completed at server */
grpc_byte_buffer_destroy(payload_buffer);
payload_buffer = nullptr;
start_read_op(FLING_SERVER_READ_FOR_STREAMING);
break;
case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING:
/* Metadata send completed at server */
break;
case FLING_SERVER_SEND_STATUS_FOR_STREAMING:
/* Send status and close completed at server */
grpc_call_unref(call);
if (!shutdown_started) request_call();
break;
case FLING_SERVER_READ_FOR_UNARY:
/* Finished payload read for unary. Start all reamaining
* unary ops in a batch.
*/
handle_unary_method();
break;
case FLING_SERVER_BATCH_OPS_FOR_UNARY:
/* Finished unary call. */
grpc_byte_buffer_destroy(payload_buffer);
payload_buffer = nullptr;
grpc_call_unref(call);
if (!shutdown_started) request_call();
break;
}
break;
case GRPC_QUEUE_SHUTDOWN:
GPR_ASSERT(shutdown_started);
shutdown_finished = 1;
break;
case GRPC_QUEUE_TIMEOUT:
break;
}
}
grpc_profiler_stop();
grpc_call_details_destroy(&call_details);
grpc_server_destroy(server);
grpc_completion_queue_destroy(cq);
grpc_shutdown();
return 0;
}

@ -59,7 +59,6 @@ static grpc_op snapshot_ops[5];
static grpc_op status_op;
static int got_sigint = 0;
static grpc_byte_buffer* payload_buffer = nullptr;
static grpc_byte_buffer* terminal_buffer = nullptr;
static int was_cancelled = 2;
static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
@ -131,9 +130,6 @@ static void send_snapshot(void* tag, MemStats* snapshot) {
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op++;
op->op = GRPC_OP_RECV_MESSAGE;
op->data.recv_message.recv_message = &terminal_buffer;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
if (payload_buffer == nullptr) {
gpr_log(GPR_INFO, "NULL payload buffer !!!");
@ -306,12 +302,10 @@ int main(int argc, char** argv) {
// FLING_SERVER_SEND_STATUS_SNAPSHOT to destroy the snapshot call
case FLING_SERVER_SEND_STATUS_SNAPSHOT:
grpc_byte_buffer_destroy(payload_buffer);
grpc_byte_buffer_destroy(terminal_buffer);
grpc_call_unref(s->call);
grpc_call_details_destroy(&s->call_details);
grpc_metadata_array_destroy(&s->initial_metadata_send);
grpc_metadata_array_destroy(&s->request_metadata_recv);
terminal_buffer = nullptr;
payload_buffer = nullptr;
break;
}

@ -137,50 +137,6 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "fling_stream_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "fling_test",
"platforms": [
"linux",
"mac",
"posix"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save