diff --git a/BUILD b/BUILD index 9c99f95fcdf..bfda84e0f40 100644 --- a/BUILD +++ b/BUILD @@ -939,6 +939,7 @@ grpc_cc_library( "grpc_codegen", "grpc_trace", "inlined_vector", + "orphanable", "ref_counted", "ref_counted_ptr", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index 5dfbdcb85a0..e0724d10aad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -224,7 +224,6 @@ add_dependencies(buildtests_c avl_test) add_dependencies(buildtests_c bad_server_response_test) add_dependencies(buildtests_c bin_decoder_test) add_dependencies(buildtests_c bin_encoder_test) -add_dependencies(buildtests_c byte_stream_test) add_dependencies(buildtests_c channel_create_test) add_dependencies(buildtests_c chttp2_hpack_encoder_test) add_dependencies(buildtests_c chttp2_stream_map_test) @@ -530,6 +529,7 @@ endif() if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx bm_pollset) endif() +add_dependencies(buildtests_cxx byte_stream_test) add_dependencies(buildtests_cxx channel_arguments_test) add_dependencies(buildtests_cxx channel_filter_test) add_dependencies(buildtests_cxx check_gcp_environment_linux_test) @@ -5436,33 +5436,6 @@ target_link_libraries(bin_encoder_test endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) -add_executable(byte_stream_test - test/core/transport/byte_stream_test.cc -) - - -target_include_directories(byte_stream_test - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include - PRIVATE ${_gRPC_SSL_INCLUDE_DIR} - PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} - PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} - PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} - PRIVATE ${_gRPC_CARES_INCLUDE_DIR} - PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} -) - -target_link_libraries(byte_stream_test - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util - grpc - gpr_test_util - gpr -) - -endif (gRPC_BUILD_TESTS) -if (gRPC_BUILD_TESTS) - add_executable(channel_create_test test/core/surface/channel_create_test.cc ) @@ -9984,6 +9957,42 @@ endif() endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) +add_executable(byte_stream_test + test/core/transport/byte_stream_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + + +target_include_directories(byte_stream_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE third_party/googletest/googletest/include + PRIVATE third_party/googletest/googletest + PRIVATE third_party/googletest/googlemock/include + PRIVATE third_party/googletest/googlemock + PRIVATE ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(byte_stream_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr_test_util + gpr + ${_gRPC_GFLAGS_LIBRARIES} +) + +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + add_executable(channel_arguments_test test/cpp/common/channel_arguments_test.cc third_party/googletest/googletest/src/gtest-all.cc diff --git a/Makefile b/Makefile index a298fafd6bd..160762b62fe 100644 --- a/Makefile +++ b/Makefile @@ -961,7 +961,6 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test -byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test @@ -1127,6 +1126,7 @@ bm_fullstack_trickle: $(BINDIR)/$(CONFIG)/bm_fullstack_trickle bm_fullstack_unary_ping_pong: $(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong bm_metadata: $(BINDIR)/$(CONFIG)/bm_metadata bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset +byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test @@ -1399,7 +1399,6 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/bad_server_response_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \ $(BINDIR)/$(CONFIG)/bin_encoder_test \ - $(BINDIR)/$(CONFIG)/byte_stream_test \ $(BINDIR)/$(CONFIG)/channel_create_test \ $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \ $(BINDIR)/$(CONFIG)/chttp2_stream_map_test \ @@ -1613,6 +1612,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong \ $(BINDIR)/$(CONFIG)/bm_metadata \ $(BINDIR)/$(CONFIG)/bm_pollset \ + $(BINDIR)/$(CONFIG)/byte_stream_test \ $(BINDIR)/$(CONFIG)/channel_arguments_test \ $(BINDIR)/$(CONFIG)/channel_filter_test \ $(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \ @@ -1779,6 +1779,7 @@ buildtests_cxx: privatelibs_cxx \ $(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong \ $(BINDIR)/$(CONFIG)/bm_metadata \ $(BINDIR)/$(CONFIG)/bm_pollset \ + $(BINDIR)/$(CONFIG)/byte_stream_test \ $(BINDIR)/$(CONFIG)/channel_arguments_test \ $(BINDIR)/$(CONFIG)/channel_filter_test \ $(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \ @@ -1885,8 +1886,6 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 ) $(E) "[RUN] Testing bin_encoder_test" $(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 ) - $(E) "[RUN] Testing byte_stream_test" - $(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 ) $(E) "[RUN] Testing channel_create_test" $(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 ) $(E) "[RUN] Testing chttp2_hpack_encoder_test" @@ -2203,6 +2202,8 @@ test_cxx: buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/bm_metadata || ( echo test bm_metadata failed ; exit 1 ) $(E) "[RUN] Testing bm_pollset" $(Q) $(BINDIR)/$(CONFIG)/bm_pollset || ( echo test bm_pollset failed ; exit 1 ) + $(E) "[RUN] Testing byte_stream_test" + $(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 ) $(E) "[RUN] Testing channel_arguments_test" $(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 ) $(E) "[RUN] Testing channel_filter_test" @@ -10128,38 +10129,6 @@ endif endif -BYTE_STREAM_TEST_SRC = \ - test/core/transport/byte_stream_test.cc \ - -BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error - -else - - - -$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test - -endif - -$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(BYTE_STREAM_TEST_OBJS:.o=.dep) -endif -endif - - CHANNEL_CREATE_TEST_SRC = \ test/core/surface/channel_create_test.cc \ @@ -15830,6 +15799,49 @@ endif endif +BYTE_STREAM_TEST_SRC = \ + test/core/transport/byte_stream_test.cc \ + +BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/byte_stream_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/byte_stream_test: $(PROTOBUF_DEP) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/byte_stream_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BYTE_STREAM_TEST_OBJS:.o=.dep) +endif +endif + + CHANNEL_ARGUMENTS_TEST_SRC = \ test/cpp/common/channel_arguments_test.cc \ diff --git a/build.yaml b/build.yaml index e2bb8bfa9ff..6afd48ca72c 100644 --- a/build.yaml +++ b/build.yaml @@ -1986,17 +1986,6 @@ targets: - grpc_test_util - grpc uses_polling: false -- name: byte_stream_test - build: test - language: c - src: - - test/core/transport/byte_stream_test.cc - deps: - - grpc_test_util - - grpc - - gpr_test_util - - gpr - uses_polling: false - name: channel_create_test build: test language: c @@ -4071,6 +4060,18 @@ targets: - mac - linux - posix +- name: byte_stream_test + gtest: true + build: test + language: c++ + src: + - test/core/transport/byte_stream_test.cc + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + uses_polling: false - name: channel_arguments_test gtest: true build: test diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 90b93fbe236..bbc5160bec0 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -798,7 +798,8 @@ typedef struct { grpc_linked_mdelem* send_initial_metadata_storage; grpc_metadata_batch send_initial_metadata; // For send_message. - grpc_caching_byte_stream send_message; + grpc_core::ManualConstructor + send_message; // For send_trailing_metadata. grpc_linked_mdelem* send_trailing_metadata_storage; grpc_metadata_batch send_trailing_metadata; @@ -808,7 +809,7 @@ typedef struct { bool trailing_metadata_available; // For intercepting recv_message. grpc_closure recv_message_ready; - grpc_byte_stream* recv_message; + grpc_core::OrphanablePtr recv_message; // For intercepting recv_trailing_metadata. grpc_metadata_batch recv_trailing_metadata; grpc_transport_stream_stats collect_stats; @@ -914,12 +915,12 @@ typedef struct client_channel_call_data { gpr_atm* peer_string; // send_message // When we get a send_message op, we replace the original byte stream - // with a grpc_caching_byte_stream that caches the slices to a - // local buffer for use in retries. + // with a CachingByteStream that caches the slices to a local buffer for + // use in retries. // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector send_messages; + grpc_core::InlinedVector send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, } // Set up cache for send_message ops. if (batch->send_message) { - grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc( - calld->arena, sizeof(grpc_byte_stream_cache)); - grpc_byte_stream_cache_init(cache, - batch->payload->send_message.send_message); + grpc_core::ByteStreamCache* cache = + static_cast( + gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); + new (cache) grpc_core::ByteStreamCache( + std::move(batch->payload->send_message.send_message)); calld->send_messages.push_back(cache); } // Save metadata batch for send_trailing_metadata ops. @@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit( "]", chand, calld, i); } - grpc_byte_stream_cache_destroy(calld->send_messages[i]); + calld->send_messages[i]->Destroy(); } if (retry_state->completed_send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch( "]", chand, calld, retry_state->completed_send_message_count - 1); } - grpc_byte_stream_cache_destroy( - calld->send_messages[retry_state->completed_send_message_count - 1]); + calld->send_messages[retry_state->completed_send_message_count - 1] + ->Destroy(); } if (batch_data->batch.send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem, if (batch->send_message) { calld->pending_send_message = true; calld->bytes_buffered_for_retry += - batch->payload->send_message.send_message->length; + batch->payload->send_message.send_message->length(); } if (batch->send_trailing_metadata) { calld->pending_send_trailing_metadata = true; @@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) { GPR_ASSERT(pending != nullptr); // Return payload. *pending->batch->payload->recv_message.recv_message = - batch_data->recv_message; + std::move(batch_data->recv_message); // Update bookkeeping. // Note: Need to do this before invoking the callback, since invoking // the callback will result in yielding the call combiner. @@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op( "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]", chand, calld, retry_state->started_send_message_count); } - grpc_byte_stream_cache* cache = + grpc_core::ByteStreamCache* cache = calld->send_messages[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; - grpc_caching_byte_stream_init(&batch_data->send_message, cache); + batch_data->send_message.Init(cache); batch_data->batch.send_message = true; - batch_data->batch.payload->send_message.send_message = - &batch_data->send_message.base; + batch_data->batch.payload->send_message.send_message.reset( + batch_data->send_message.get()); } // Adds retriable send_trailing_metadata op to batch_data. diff --git a/src/core/ext/filters/http/client/http_client_filter.cc b/src/core/ext/filters/http/client/http_client_filter.cc index 58aefd17c75..ae94ce47b9e 100644 --- a/src/core/ext/filters/http/client/http_client_filter.cc +++ b/src/core/ext/filters/http/client/http_client_filter.cc @@ -20,9 +20,11 @@ #include #include #include +#include #include #include "src/core/ext/filters/http/client/http_client_filter.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" @@ -58,8 +60,9 @@ struct call_data { // State for handling send_message ops. grpc_transport_stream_op_batch* send_message_batch; size_t send_message_bytes_read; - grpc_byte_stream_cache send_message_cache; - grpc_caching_byte_stream send_message_caching_stream; + grpc_core::ManualConstructor send_message_cache; + grpc_core::ManualConstructor + send_message_caching_stream; grpc_closure on_send_message_next_done; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; @@ -166,7 +169,7 @@ static void recv_trailing_metadata_on_complete(void* user_data, static void send_message_on_complete(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); - grpc_byte_stream_cache_destroy(&calld->send_message_cache); + calld->send_message_cache.Destroy(); GRPC_CLOSURE_RUN(calld->original_send_message_on_complete, GRPC_ERROR_REF(error)); } @@ -175,8 +178,7 @@ static void send_message_on_complete(void* arg, grpc_error* error) { // calld->send_message_bytes_read. static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error* error = grpc_byte_stream_pull( - &calld->send_message_caching_stream.base, &incoming_slice); + grpc_error* error = calld->send_message_caching_stream->Pull(&incoming_slice); if (error == GRPC_ERROR_NONE) { calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); grpc_slice_unref_internal(incoming_slice); @@ -186,24 +188,23 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // Reads as many slices as possible from the send_message byte stream. // Upon successful return, if calld->send_message_bytes_read == -// calld->send_message_caching_stream.base.length, then we have completed +// calld->send_message_caching_stream->length(), then we have completed // reading from the byte stream; otherwise, an async read has been dispatched // and on_send_message_next_done() will be invoked when it is complete. static grpc_error* read_all_available_send_message_data(call_data* calld) { - while (grpc_byte_stream_next(&calld->send_message_caching_stream.base, - ~static_cast(0), - &calld->on_send_message_next_done)) { + while (calld->send_message_caching_stream->Next( + SIZE_MAX, &calld->on_send_message_next_done)) { grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) return error; if (calld->send_message_bytes_read == - calld->send_message_caching_stream.base.length) { + calld->send_message_caching_stream->length()) { break; } } return GRPC_ERROR_NONE; } -// Async callback for grpc_byte_stream_next(). +// Async callback for ByteStream::Next(). static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); @@ -222,7 +223,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) { // here, then we know that all of the data was not available // synchronously, so we were not able to do a cached call. Instead, // we just reset the byte stream and then send down the batch as-is. - grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); + calld->send_message_caching_stream->Reset(); grpc_call_next_op(elem, calld->send_message_batch); } @@ -253,7 +254,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem, size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); estimated_len++; /* for the '?' */ estimated_len += grpc_base64_estimate_encoded_size( - batch->payload->send_message.send_message->length, true /* url_safe */, + batch->payload->send_message.send_message->length(), true /* url_safe */, false /* multi_line */); grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); /* memcopy individual pieces into this slice */ @@ -265,9 +266,9 @@ static grpc_error* update_path_for_get(grpc_call_element* elem, write_ptr += GRPC_SLICE_LENGTH(path_slice); *write_ptr++ = '?'; char* payload_bytes = - slice_buffer_to_string(&calld->send_message_cache.cache_buffer); + slice_buffer_to_string(calld->send_message_cache->cache_buffer()); grpc_base64_encode_core(write_ptr, payload_bytes, - batch->payload->send_message.send_message->length, + batch->payload->send_message.send_message->length(), true /* url_safe */, false /* multi_line */); gpr_free(payload_bytes); /* remove trailing unused memory and add trailing 0 to terminate string */ @@ -326,15 +327,14 @@ static void hc_start_transport_stream_op_batch( if (batch->send_message && (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - batch->payload->send_message.send_message->length < + batch->payload->send_message.send_message->length() < channeld->max_payload_size_for_get) { calld->send_message_bytes_read = 0; - grpc_byte_stream_cache_init(&calld->send_message_cache, - batch->payload->send_message.send_message); - grpc_caching_byte_stream_init(&calld->send_message_caching_stream, - &calld->send_message_cache); - batch->payload->send_message.send_message = - &calld->send_message_caching_stream.base; + calld->send_message_cache.Init( + std::move(batch->payload->send_message.send_message)); + calld->send_message_caching_stream.Init(calld->send_message_cache.get()); + batch->payload->send_message.send_message.reset( + calld->send_message_caching_stream.get()); calld->original_send_message_on_complete = batch->on_complete; batch->on_complete = &calld->send_message_on_complete; calld->send_message_batch = batch; @@ -342,12 +342,12 @@ static void hc_start_transport_stream_op_batch( if (error != GRPC_ERROR_NONE) goto done; // If all the data has been read, then we can use GET. if (calld->send_message_bytes_read == - calld->send_message_caching_stream.base.length) { + calld->send_message_caching_stream->length()) { method = GRPC_MDELEM_METHOD_GET; error = update_path_for_get(elem, batch); if (error != GRPC_ERROR_NONE) goto done; batch->send_message = false; - grpc_byte_stream_destroy(&calld->send_message_caching_stream.base); + calld->send_message_caching_stream->Orphan(); } else { // Not all data is available. The batch will be sent down // asynchronously in on_send_message_next_done(). diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.cc b/src/core/ext/filters/http/message_compress/message_compress_filter.cc index efe0085c5b0..e7d9949386f 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.cc +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.cc @@ -32,6 +32,7 @@ #include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/compression/message_compress.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -62,7 +63,8 @@ struct call_data { grpc_closure start_send_message_batch_in_call_combiner; grpc_transport_stream_op_batch* send_message_batch; grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ - grpc_slice_buffer_stream replacement_stream; + grpc_core::ManualConstructor + replacement_stream; grpc_closure* original_send_message_on_complete; grpc_closure send_message_on_complete; grpc_closure on_send_message_next_done; @@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); uint32_t send_flags = - calld->send_message_batch->payload->send_message.send_message->flags; + calld->send_message_batch->payload->send_message.send_message->flags(); bool did_compress = grpc_msg_compress(calld->message_compression_algorithm, &calld->slices, &tmp); if (did_compress) { @@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) { grpc_slice_buffer_destroy_internal(&tmp); // Swap out the original byte stream with our new one and send the // batch down. - grpc_byte_stream_destroy( - calld->send_message_batch->payload->send_message.send_message); - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - send_flags); - calld->send_message_batch->payload->send_message.send_message = - &calld->replacement_stream.base; + calld->replacement_stream.Init(&calld->slices, send_flags); + calld->send_message_batch->payload->send_message.send_message.reset( + calld->replacement_stream.get()); calld->original_send_message_on_complete = calld->send_message_batch->on_complete; calld->send_message_batch->on_complete = &calld->send_message_on_complete; @@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg, // Pulls a slice from the send_message byte stream and adds it to calld->slices. static grpc_error* pull_slice_from_send_message(call_data* calld) { grpc_slice incoming_slice; - grpc_error* error = grpc_byte_stream_pull( - calld->send_message_batch->payload->send_message.send_message, - &incoming_slice); + grpc_error* error = + calld->send_message_batch->payload->send_message.send_message->Pull( + &incoming_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&calld->slices, incoming_slice); } @@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) { // Reads as many slices as possible from the send_message byte stream. // If all data has been read, invokes finish_send_message(). Otherwise, -// an async call to grpc_byte_stream_next() has been started, which will +// an async call to ByteStream::Next() has been started, which will // eventually result in calling on_send_message_next_done(). static void continue_reading_send_message(grpc_call_element* elem) { call_data* calld = static_cast(elem->call_data); - while (grpc_byte_stream_next( - calld->send_message_batch->payload->send_message.send_message, + while (calld->send_message_batch->payload->send_message.send_message->Next( ~static_cast(0), &calld->on_send_message_next_done)) { grpc_error* error = pull_slice_from_send_message(calld); if (error != GRPC_ERROR_NONE) { @@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) { GRPC_ERROR_UNREF(error); return; } - if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + if (calld->slices.length == calld->send_message_batch->payload->send_message + .send_message->length()) { finish_send_message(elem); break; } } } -// Async callback for grpc_byte_stream_next(). +// Async callback for ByteStream::Next(). static void on_send_message_next_done(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast(arg); call_data* calld = static_cast(elem->call_data); @@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) { return; } if (calld->slices.length == - calld->send_message_batch->payload->send_message.send_message->length) { + calld->send_message_batch->payload->send_message.send_message->length()) { finish_send_message(elem); } else { continue_reading_send_message(elem); @@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) { call_data* calld = static_cast(elem->call_data); if (skip_compression( elem, - calld->send_message_batch->payload->send_message.send_message->flags, + calld->send_message_batch->payload->send_message.send_message + ->flags(), calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) { send_message_batch_continue(elem); } else { @@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch( grpc_schedule_on_exec_ctx), GRPC_ERROR_REF(calld->cancel_error), "failing send_message op"); } else { - grpc_byte_stream_shutdown( - - calld->send_message_batch->payload->send_message.send_message, + calld->send_message_batch->payload->send_message.send_message->Shutdown( GRPC_ERROR_REF(calld->cancel_error)); } } diff --git a/src/core/ext/filters/http/server/http_server_filter.cc b/src/core/ext/filters/http/server/http_server_filter.cc index 57ec8dce34d..c2020158756 100644 --- a/src/core/ext/filters/http/server/http_server_filter.cc +++ b/src/core/ext/filters/http/server/http_server_filter.cc @@ -23,6 +23,7 @@ #include #include #include +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/b64.h" #include "src/core/lib/slice/percent_encoding.h" @@ -53,8 +54,8 @@ struct call_data { */ grpc_closure* recv_message_ready; grpc_closure* on_complete; - grpc_byte_stream** pp_recv_message; - grpc_slice_buffer_stream read_stream; + grpc_core::OrphanablePtr* pp_recv_message; + grpc_core::ManualConstructor read_stream; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member @@ -232,7 +233,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem, grpc_base64_decode_with_len( reinterpret_cast GRPC_SLICE_START_PTR(query_slice), GRPC_SLICE_LENGTH(query_slice), k_url_safe)); - grpc_slice_buffer_stream_init(&calld->read_stream, &read_slice_buffer, 0); + calld->read_stream.Init(&read_slice_buffer, 0); grpc_slice_buffer_destroy_internal(&read_slice_buffer); calld->seen_path_with_query = true; grpc_slice_unref_internal(query_slice); @@ -281,10 +282,10 @@ static void hs_on_complete(void* user_data, grpc_error* err) { call_data* calld = static_cast(elem->call_data); /* Call recv_message_ready if we got the payload via the path field */ if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) { - *calld->pp_recv_message = - calld->payload_bin_delivered - ? nullptr - : reinterpret_cast(&calld->read_stream); + calld->pp_recv_message->reset( + calld->payload_bin_delivered ? nullptr + : reinterpret_cast( + calld->read_stream.get())); // Re-enter call combiner for recv_message_ready, since the surface // code will release the call combiner for each callback it receives. GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready, @@ -405,7 +406,7 @@ static void destroy_call_elem(grpc_call_element* elem, grpc_closure* ignored) { call_data* calld = static_cast(elem->call_data); if (calld->seen_path_with_query && !calld->payload_bin_delivered) { - grpc_byte_stream_destroy(&calld->read_stream.base); + calld->read_stream->Orphan(); } } diff --git a/src/core/ext/filters/message_size/message_size_filter.cc b/src/core/ext/filters/message_size/message_size_filter.cc index b1b14dde024..c7fc3f2e627 100644 --- a/src/core/ext/filters/message_size/message_size_filter.cc +++ b/src/core/ext/filters/message_size/message_size_filter.cc @@ -100,7 +100,7 @@ struct call_data { // call our next_recv_message_ready member after handling it. grpc_closure recv_message_ready; // Used by recv_message_ready. - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr* recv_message; // Original recv_message_ready callback, invoked after our own. grpc_closure* next_recv_message_ready; }; @@ -121,12 +121,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) { grpc_call_element* elem = static_cast(user_data); call_data* calld = static_cast(elem->call_data); if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 && - (*calld->recv_message)->length > + (*calld->recv_message)->length() > static_cast(calld->limits.max_recv_size)) { char* message_string; gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, calld->limits.max_recv_size); + (*calld->recv_message)->length(), calld->limits.max_recv_size); grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED); @@ -150,11 +150,11 @@ static void start_transport_stream_op_batch( call_data* calld = static_cast(elem->call_data); // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && - op->payload->send_message.send_message->length > + op->payload->send_message.send_message->length() > static_cast(calld->limits.max_send_size)) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->payload->send_message.send_message->length, + op->payload->send_message.send_message->length(), calld->limits.max_send_size); grpc_transport_stream_op_batch_finish_with_failure( op, diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc index bed1004c576..c7070d4d9ba 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc @@ -93,7 +93,9 @@ static void start_transport_stream_op_batch( /* Send message happens after client's user-agent (initial metadata) is * received, so workaround_active must be set already */ if (calld->workaround_active) { - op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS; + op->payload->send_message.send_message->set_flags( + op->payload->send_message.send_message->flags() | + GRPC_WRITE_NO_COMPRESS); } } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 56aaada9127..a4d616d7784 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -39,6 +39,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/gpr/env.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" @@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t, grpc_connectivity_state state, grpc_error* error, const char* reason); -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs); - static void benign_reclaimer_locked(void* t, grpc_error* error); static void destructive_reclaimer_locked(void* t, grpc_error* error); @@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs, s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is - read-closed. The others are for incoming_byte_streams that are actively - reading */ + read-closed. The others are for Chttp2IncomingByteStreams that are + actively reading */ GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -1256,8 +1251,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, abort(); /* TODO(ctiller): what cleanup here? */ return; /* early out */ } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - grpc_byte_stream_destroy(s->fetching_send_message); + if (s->fetched_send_message_length == s->fetching_send_message->length()) { int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1274,20 +1268,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t, cb->closure = s->fetching_send_message_finished; s->fetching_send_message_finished = nullptr; grpc_chttp2_write_cb** list = - s->fetching_send_message->flags & GRPC_WRITE_THROUGH + s->fetching_send_message->flags() & GRPC_WRITE_THROUGH ? &s->on_write_finished_cbs : &s->on_flow_controlled_cbs; cb->next = *list; *list = cb; } - s->fetching_send_message = nullptr; + s->fetching_send_message.reset(); return; /* early out */ - } else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX, - &s->complete_fetch_locked)) { - grpc_error* error = - grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + } else if (s->fetching_send_message->Next(UINT32_MAX, + &s->complete_fetch_locked)) { + grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice); if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } else { add_fetched_slice_locked(t, s); @@ -1300,14 +1293,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) { grpc_chttp2_stream* s = static_cast(gs); grpc_chttp2_transport* t = s->t; if (error == GRPC_ERROR_NONE) { - error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice); + error = s->fetching_send_message->Pull(&s->fetching_slice); if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(t, s); continue_fetching_send_locked(t, s); } } if (error != GRPC_ERROR_NONE) { - grpc_byte_stream_destroy(s->fetching_send_message); + s->fetching_send_message.reset(); grpc_chttp2_cancel_stream(t, s, error); } } @@ -1439,7 +1432,7 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->id != 0); grpc_chttp2_mark_stream_writable(t, s); if (!(op->send_message && - (op->payload->send_message.send_message->flags & + (op->payload->send_message.send_message->flags() & GRPC_WRITE_BUFFER_HINT))) { grpc_chttp2_initiate_write( t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); @@ -1466,7 +1459,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( - op->payload->send_message.send_message->length); + op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; s->fetching_send_message_finished = add_closure_barrier(op->on_complete); if (s->write_closed) { @@ -1475,7 +1468,7 @@ static void perform_stream_op_locked(void* stream_op, // streaming call might send another message before getting a // recv_message failure, breaking out of its loop, and then // starting recv_trailing_metadata. - grpc_byte_stream_destroy(op->payload->send_message.send_message); + op->payload->send_message.send_message.reset(); grpc_chttp2_complete_closure_step( t, s, &s->fetching_send_message_finished, t->is_client && s->received_trailing_metadata @@ -1488,14 +1481,15 @@ static void perform_stream_op_locked(void* stream_op, GPR_ASSERT(s->fetching_send_message == nullptr); uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); - uint32_t flags = op_payload->send_message.send_message->flags; + uint32_t flags = op_payload->send_message.send_message->flags(); frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; - size_t len = op_payload->send_message.send_message->length; + size_t len = op_payload->send_message.send_message->length(); frame_hdr[1] = static_cast(len >> 24); frame_hdr[2] = static_cast(len >> 16); frame_hdr[3] = static_cast(len >> 8); frame_hdr[4] = static_cast(len); - s->fetching_send_message = op_payload->send_message.send_message; + s->fetching_send_message = + std::move(op_payload->send_message.send_message); s->fetched_send_message_length = 0; s->next_message_end_offset = s->flow_controlled_bytes_written + @@ -1947,12 +1941,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id, } if (s->pending_byte_stream) { if (s->on_next != nullptr) { - grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame; + grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame; if (error == GRPC_ERROR_NONE) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } - incoming_byte_stream_publish_error(bs, error); - incoming_byte_stream_unref(bs); + bs->PublishError(error); + bs->Unref(); s->data_parser.parsing_frame = nullptr; } else { GRPC_ERROR_UNREF(s->byte_stream_error); @@ -2096,10 +2090,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t, GRPC_ERROR_REF(error), "send_trailing_metadata_finished"); - if (s->fetching_send_message != nullptr) { - grpc_byte_stream_destroy(s->fetching_send_message); - s->fetching_send_message = nullptr; - } + s->fetching_send_message.reset(); grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished, GRPC_ERROR_REF(error), "fetching_send_message_finished"); @@ -2715,7 +2706,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs, static void reset_byte_stream(void* arg, grpc_error* error) { grpc_chttp2_stream* s = static_cast(arg); - s->pending_byte_stream = false; if (error == GRPC_ERROR_NONE) { grpc_chttp2_maybe_complete_recv_message(s->t, s); @@ -2731,22 +2721,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) { } } -static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) { - if (gpr_unref(&bs->refs)) { - gpr_free(bs); +namespace grpc_core { + +Chttp2IncomingByteStream::Chttp2IncomingByteStream( + grpc_chttp2_transport* transport, grpc_chttp2_stream* stream, + uint32_t frame_size, uint32_t flags) + : ByteStream(frame_size, flags), + transport_(transport), + stream_(stream), + remaining_bytes_(frame_size) { + gpr_ref_init(&refs_, 2); + GRPC_ERROR_UNREF(stream->byte_stream_error); + stream->byte_stream_error = GRPC_ERROR_NONE; +} + +void Chttp2IncomingByteStream::OrphanLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast(arg); + grpc_chttp2_stream* s = bs->stream_; + grpc_chttp2_transport* t = s->t; + bs->Unref(); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); +} + +void Chttp2IncomingByteStream::Orphan() { + GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); + GRPC_CLOSURE_SCHED( + GRPC_CLOSURE_INIT(&destroy_action_, + &Chttp2IncomingByteStream::OrphanLocked, this, + grpc_combiner_scheduler(transport_->combiner)), + GRPC_ERROR_NONE); +} + +void Chttp2IncomingByteStream::Unref() { + if (gpr_unref(&refs_)) { + Delete(this); } } -static void incoming_byte_stream_next_locked(void* argp, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast(argp); - grpc_chttp2_transport* t = bs->transport; - grpc_chttp2_stream* s = bs->stream; +void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); } +void Chttp2IncomingByteStream::NextLocked(void* arg, + grpc_error* error_ignored) { + Chttp2IncomingByteStream* bs = static_cast(arg); + grpc_chttp2_transport* t = bs->transport_; + grpc_chttp2_stream* s = bs->stream_; size_t cur_length = s->frame_storage.length; if (!s->read_closed) { - s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint, + s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint, cur_length); grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s); } @@ -2755,22 +2779,22 @@ static void incoming_byte_stream_next_locked(void* argp, grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); s->unprocessed_incoming_frames_decompressed = false; - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE); } else if (s->byte_stream_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else if (s->read_closed) { - if (bs->remaining_bytes != 0) { + if (bs->remaining_bytes_ != 0) { s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(bs->next_action.on_complete, + GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_REF(s->byte_stream_error)); if (s->data_parser.parsing_frame != nullptr) { - incoming_byte_stream_unref(s->data_parser.parsing_frame); + s->data_parser.parsing_frame->Unref(); s->data_parser.parsing_frame = nullptr; } } else { @@ -2778,122 +2802,94 @@ static void incoming_byte_stream_next_locked(void* argp, GPR_ASSERT(false); } } else { - s->on_next = bs->next_action.on_complete; + s->on_next = bs->next_action_.on_complete; } - incoming_byte_stream_unref(bs); + bs->Unref(); } -static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { +bool Chttp2IncomingByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { GPR_TIMER_SCOPE("incoming_byte_stream_next", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast(byte_stream); - grpc_chttp2_stream* s = bs->stream; - if (s->unprocessed_incoming_frames_buffer.length > 0) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { return true; } else { - gpr_ref(&bs->refs); - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; + Ref(); + next_action_.max_size_hint = max_size_hint; + next_action_.on_complete = on_complete; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->next_action.closure, - incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), + GRPC_CLOSURE_INIT(&next_action_.closure, + &Chttp2IncomingByteStream::NextLocked, this, + grpc_combiner_scheduler(transport_->combiner)), GRPC_ERROR_NONE); return false; } } -static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { +grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) { GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast(byte_stream); - grpc_chttp2_stream* s = bs->stream; grpc_error* error; - - if (s->unprocessed_incoming_frames_buffer.length > 0) { - if (!s->unprocessed_incoming_frames_decompressed) { + if (stream_->unprocessed_incoming_frames_buffer.length > 0) { + if (!stream_->unprocessed_incoming_frames_decompressed) { bool end_of_context; - if (!s->stream_decompression_ctx) { - s->stream_decompression_ctx = grpc_stream_compression_context_create( - s->stream_decompression_method); + if (!stream_->stream_decompression_ctx) { + stream_->stream_decompression_ctx = + grpc_stream_compression_context_create( + stream_->stream_decompression_method); } - if (!grpc_stream_decompress(s->stream_decompression_ctx, - &s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer, nullptr, + if (!grpc_stream_decompress(stream_->stream_decompression_ctx, + &stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer, nullptr, MAX_SIZE_T, &end_of_context)) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error."); return error; } - GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); - grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, - &s->decompressed_data_buffer); - s->unprocessed_incoming_frames_decompressed = true; + GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0); + grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer, + &stream_->decompressed_data_buffer); + stream_->unprocessed_incoming_frames_decompressed = true; if (end_of_context) { - grpc_stream_compression_context_destroy(s->stream_decompression_ctx); - s->stream_decompression_ctx = nullptr; + grpc_stream_compression_context_destroy( + stream_->stream_decompression_ctx); + stream_->stream_decompression_ctx = nullptr; } - if (s->unprocessed_incoming_frames_buffer.length == 0) { + if (stream_->unprocessed_incoming_frames_buffer.length == 0) { *slice = grpc_empty_slice(); } } error = grpc_deframe_unprocessed_incoming_frames( - &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice, - nullptr); + &stream_->data_parser, stream_, + &stream_->unprocessed_incoming_frames_buffer, slice, nullptr); if (error != GRPC_ERROR_NONE) { return error; } } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); return error; } return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored); - -static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) { - GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0); - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast(byte_stream); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&bs->destroy_action, - incoming_byte_stream_destroy_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner)), - GRPC_ERROR_NONE); -} - -static void incoming_byte_stream_publish_error( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) { - grpc_chttp2_stream* s = bs->stream; - +void Chttp2IncomingByteStream::PublishError(grpc_error* error) { GPR_ASSERT(error != GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error)); - s->on_next = nullptr; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_REF(error); - grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error)); + stream_->on_next = nullptr; + GRPC_ERROR_UNREF(stream_->byte_stream_error); + stream_->byte_stream_error = GRPC_ERROR_REF(error); + grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error)); } -grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out) { - grpc_chttp2_stream* s = bs->stream; - - if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { +grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice, + grpc_slice* slice_out) { + if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); - - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); grpc_slice_unref_internal(slice); return error; } else { - bs->remaining_bytes -= static_cast GRPC_SLICE_LENGTH(slice); + remaining_bytes_ -= static_cast GRPC_SLICE_LENGTH(slice); if (slice_out != nullptr) { *slice_out = slice; } @@ -2901,66 +2897,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push( } } -grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error) { - grpc_chttp2_stream* s = bs->stream; - +grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error, + bool reset_on_error) { if (error == GRPC_ERROR_NONE) { - if (bs->remaining_bytes != 0) { + if (remaining_bytes_ != 0) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); } } if (error != GRPC_ERROR_NONE && reset_on_error) { - GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error)); } - incoming_byte_stream_unref(bs); + Unref(); return error; } -static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_chttp2_incoming_byte_stream* bs = - reinterpret_cast(byte_stream); - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - bs, error, true /* reset_on_error */)); +void Chttp2IncomingByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */)); } -static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { - incoming_byte_stream_next, incoming_byte_stream_pull, - incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; - -static void incoming_byte_stream_destroy_locked(void* byte_stream, - grpc_error* error_ignored) { - grpc_chttp2_incoming_byte_stream* bs = - static_cast(byte_stream); - grpc_chttp2_stream* s = bs->stream; - grpc_chttp2_transport* t = s->t; - - GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); - incoming_byte_stream_unref(bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); -} - -grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags) { - grpc_chttp2_incoming_byte_stream* incoming_byte_stream = - static_cast( - gpr_malloc(sizeof(*incoming_byte_stream))); - incoming_byte_stream->base.length = frame_size; - incoming_byte_stream->remaining_bytes = frame_size; - incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; - gpr_ref_init(&incoming_byte_stream->refs, 2); - incoming_byte_stream->transport = t; - incoming_byte_stream->stream = s; - GRPC_ERROR_UNREF(s->byte_stream_error); - s->byte_stream_error = GRPC_ERROR_NONE; - return incoming_byte_stream; -} +} // namespace grpc_core /******************************************************************************* * RESOURCE QUOTAS diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index 0d37a494a23..f8f06f67891 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -27,6 +27,7 @@ #include #include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/transport/transport.h" @@ -39,8 +40,7 @@ grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) { void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) { if (parser->parsing_frame != nullptr) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - parser->parsing_frame, + GRPC_ERROR_UNREF(parser->parsing_frame->Finished( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); } GRPC_ERROR_UNREF(parser->error); @@ -100,7 +100,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_error* grpc_deframe_unprocessed_incoming_frames( grpc_chttp2_data_parser* p, grpc_chttp2_stream* s, grpc_slice_buffer* slices, grpc_slice* slice_out, - grpc_byte_stream** stream_out) { + grpc_core::OrphanablePtr* stream_out) { grpc_error* error = GRPC_ERROR_NONE; grpc_chttp2_transport* t = s->t; @@ -197,12 +197,11 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( if (p->is_frame_compressed) { message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( + p->parsing_frame = grpc_core::New( t, s, p->frame_size, message_flags); - *stream_out = &p->parsing_frame->base; - if (p->parsing_frame->remaining_bytes == 0) { - GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true)); + stream_out->reset(p->parsing_frame); + if (p->parsing_frame->remaining_bytes() == 0) { + GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true)); p->parsing_frame = nullptr; p->state = GRPC_CHTTP2_DATA_FH_0; } @@ -226,8 +225,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( if (remaining == p->frame_size) { s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + (error = p->parsing_frame->Push( grpc_slice_sub(slice, static_cast(cur - beg), static_cast(end - beg)), slice_out))) { @@ -235,8 +233,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( return error; } if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true))) { + (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { grpc_slice_unref_internal(slice); return error; } @@ -247,8 +244,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( } else if (remaining < p->frame_size) { s->stats.incoming.data_bytes += remaining; if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + (error = p->parsing_frame->Push( grpc_slice_sub(slice, static_cast(cur - beg), static_cast(end - beg)), slice_out))) { @@ -261,18 +257,16 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( GPR_ASSERT(remaining > p->frame_size); s->stats.incoming.data_bytes += p->frame_size; if (GRPC_ERROR_NONE != - (grpc_chttp2_incoming_byte_stream_push( - p->parsing_frame, + p->parsing_frame->Push( grpc_slice_sub( slice, static_cast(cur - beg), static_cast(cur + p->frame_size - beg)), - slice_out))) { + slice_out)) { grpc_slice_unref_internal(slice); return error; } if (GRPC_ERROR_NONE != - (error = grpc_chttp2_incoming_byte_stream_finished( - p->parsing_frame, GRPC_ERROR_NONE, true))) { + (error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) { grpc_slice_unref_internal(slice); return error; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 3efbbf9f76a..4b0f8736448 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -40,8 +40,9 @@ typedef enum { GRPC_CHTTP2_DATA_ERROR } grpc_chttp2_stream_state; -typedef struct grpc_chttp2_incoming_byte_stream - grpc_chttp2_incoming_byte_stream; +namespace grpc_core { +class Chttp2IncomingByteStream; +} // namespace grpc_core typedef struct { grpc_chttp2_stream_state state; @@ -50,7 +51,7 @@ typedef struct { grpc_error* error; bool is_frame_compressed; - grpc_chttp2_incoming_byte_stream* parsing_frame; + grpc_core::Chttp2IncomingByteStream* parsing_frame; } grpc_chttp2_data_parser; /* initialize per-stream state for data frame parsing */ @@ -79,6 +80,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf, grpc_error* grpc_deframe_unprocessed_incoming_frames( grpc_chttp2_data_parser* p, grpc_chttp2_stream* s, grpc_slice_buffer* slices, grpc_slice* slice_out, - grpc_byte_stream** stream_out); + grpc_core::OrphanablePtr* stream_out); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index b9431cd311b..6d11e5aa314 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -203,18 +203,58 @@ typedef struct grpc_chttp2_write_cb { struct grpc_chttp2_write_cb* next; } grpc_chttp2_write_cb; -/* forward declared in frame_data.h */ -struct grpc_chttp2_incoming_byte_stream { - grpc_byte_stream base; - gpr_refcount refs; +namespace grpc_core { + +class Chttp2IncomingByteStream : public ByteStream { + public: + Chttp2IncomingByteStream(grpc_chttp2_transport* transport, + grpc_chttp2_stream* stream, uint32_t frame_size, + uint32_t flags); + + void Orphan() override; + + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; + + // TODO(roth): When I converted this class to C++, I wanted to make it + // inherit from RefCounted or InternallyRefCounted instead of continuing + // to use its own custom ref-counting code. However, that would require + // using multiple inheritence, which sucks in general. And to make matters + // worse, it causes problems with our New<> and Delete<> wrappers. + // Specifically, unless RefCounted is first in the list of parent classes, + // it will see a different value of the address of the object than the one + // we actually allocated, in which case gpr_free() will be called on a + // different address than the one we got from gpr_malloc(), thus causing a + // crash. Given the fragility of depending on that, as well as a desire to + // avoid multiple inheritence in general, I've decided to leave this + // alone for now. We can revisit this once we're able to link against + // libc++, at which point we can eliminate New<> and Delete<> and + // switch to std::shared_ptr<>. + void Ref(); + void Unref(); + + void PublishError(grpc_error* error); + + grpc_error* Push(grpc_slice slice, grpc_slice* slice_out); - grpc_chttp2_transport* transport; /* immutable */ - grpc_chttp2_stream* stream; /* immutable */ + grpc_error* Finished(grpc_error* error, bool reset_on_error); + + uint32_t remaining_bytes() const { return remaining_bytes_; } + + private: + static void NextLocked(void* arg, grpc_error* error_ignored); + static void OrphanLocked(void* arg, grpc_error* error_ignored); + + grpc_chttp2_transport* transport_; // Immutable. + grpc_chttp2_stream* stream_; // Immutable. + + gpr_refcount refs_; /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == * true */ - uint32_t remaining_bytes; + uint32_t remaining_bytes_; /* Accessed only by transport thread when stream->pending_byte_stream == false * Accessed only by application thread when stream->pending_byte_stream == @@ -223,11 +263,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure closure; size_t max_size_hint; grpc_closure* on_complete; - } next_action; - grpc_closure destroy_action; - grpc_closure finished_action; + } next_action_; + grpc_closure destroy_action_; }; +} // namespace grpc_core + typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, @@ -456,7 +497,7 @@ struct grpc_chttp2_stream { grpc_metadata_batch* send_trailing_metadata; grpc_closure* send_trailing_metadata_finished; - grpc_byte_stream* fetching_send_message; + grpc_core::OrphanablePtr fetching_send_message; uint32_t fetched_send_message_length; grpc_slice fetching_slice; int64_t next_message_end_offset; @@ -468,7 +509,7 @@ struct grpc_chttp2_stream { grpc_metadata_batch* recv_initial_metadata; grpc_closure* recv_initial_metadata_ready; bool* trailing_metadata_available; - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr* recv_message; grpc_closure* recv_message_ready; grpc_metadata_batch* recv_trailing_metadata; grpc_closure* recv_trailing_metadata_finished; @@ -719,18 +760,6 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t); void grpc_chttp2_ref_transport(grpc_chttp2_transport* t); #endif -grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create( - grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size, - uint32_t flags); -grpc_error* grpc_chttp2_incoming_byte_stream_push( - grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice, - grpc_slice* slice_out); -grpc_error* grpc_chttp2_incoming_byte_stream_finished( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error, - bool reset_on_error); -void grpc_chttp2_incoming_byte_stream_notify( - grpc_chttp2_incoming_byte_stream* bs, grpc_error* error); - void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id); /** Add a new ping strike to ping_recv_state.ping_strikes. If diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index ff1c1aad622..8e3ea057063 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -31,6 +31,7 @@ #include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/gpr/host_port.h" #include "src/core/lib/gpr/string.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/slice/slice_internal.h" @@ -122,7 +123,7 @@ struct read_state { bool read_stream_closed; /* vars for holding data destined for the application */ - struct grpc_slice_buffer_stream sbs; + grpc_core::ManualConstructor sbs; grpc_slice_buffer read_slice_buffer; /* vars for trailing metadata */ @@ -1041,16 +1042,14 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer write_slice_buffer; grpc_slice slice; grpc_slice_buffer_init(&write_slice_buffer); - if (1 != grpc_byte_stream_next( - stream_op->payload->send_message.send_message, - stream_op->payload->send_message.send_message->length, + if (1 != stream_op->payload->send_message.send_message->Next( + stream_op->payload->send_message.send_message->length(), nullptr)) { /* Should never reach here */ GPR_ASSERT(false); } if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(stream_op->payload->send_message.send_message, - &slice)) { + stream_op->payload->send_message.send_message->Pull(&slice)) { /* Should never reach here */ GPR_ASSERT(false); } @@ -1062,9 +1061,10 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } if (write_slice_buffer.count > 0) { size_t write_buffer_size; - create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, - &write_buffer_size, - stream_op->payload->send_message.send_message->flags); + create_grpc_frame( + &write_slice_buffer, &stream_state->ws.write_buffer, + &write_buffer_size, + stream_op->payload->send_message.send_message->flags()); CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs, stream_state->ws.write_buffer); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; @@ -1089,6 +1089,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + stream_op->payload->send_message.send_message.reset(); } else if (stream_op->send_trailing_metadata && op_can_be_run(stream_op, s, &oas->state, OP_SEND_TRAILING_METADATA)) { @@ -1195,14 +1196,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer_destroy_internal( &stream_state->rs.read_slice_buffer); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); - grpc_slice_buffer_stream_init(&stream_state->rs.sbs, - &stream_state->rs.read_slice_buffer, 0); + uint32_t flags = 0; if (stream_state->rs.compressed) { - stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - *(reinterpret_cast( - stream_op->payload->recv_message.recv_message)) = - reinterpret_cast(&stream_state->rs.sbs); + stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); + stream_op->payload->recv_message.recv_message->reset( + stream_state->rs.sbs.get()); GRPC_CLOSURE_SCHED( stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); @@ -1252,14 +1252,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) { grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); - grpc_slice_buffer_stream_init(&stream_state->rs.sbs, - &stream_state->rs.read_slice_buffer, 0); + uint32_t flags = 0; if (stream_state->rs.compressed) { - stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS; + flags = GRPC_WRITE_INTERNAL_COMPRESS; } - *(reinterpret_cast( - stream_op->payload->recv_message.recv_message)) = - reinterpret_cast(&stream_state->rs.sbs); + stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags); + stream_op->payload->recv_message.recv_message->reset( + stream_state->rs.sbs.get()); GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 5f898bbf257..67a380077b4 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -25,6 +25,7 @@ #include #include "src/core/ext/transport/inproc/inproc_transport.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -99,7 +100,7 @@ typedef struct inproc_stream { grpc_transport_stream_op_batch* recv_trailing_md_op; grpc_slice_buffer recv_message; - grpc_slice_buffer_stream recv_stream; + grpc_core::ManualConstructor recv_stream; bool recv_inited; bool initial_md_sent; @@ -482,8 +483,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { s->recv_message_op = nullptr; } if (s->send_message_op) { - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, error, s->send_message_op, "fail_helper scheduling send-message-on-complete"); @@ -521,7 +521,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) { static void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) { size_t remaining = - sender->send_message_op->payload->send_message.send_message->length; + sender->send_message_op->payload->send_message.send_message->length(); if (receiver->recv_inited) { grpc_slice_buffer_destroy_internal(&receiver->recv_message); } @@ -530,12 +530,12 @@ static void message_transfer_locked(inproc_stream* sender, do { grpc_slice message_slice; grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next( - sender->send_message_op->payload->send_message.send_message, SIZE_MAX, - &unused)); - grpc_error* error = grpc_byte_stream_pull( - sender->send_message_op->payload->send_message.send_message, - &message_slice); + GPR_ASSERT( + sender->send_message_op->payload->send_message.send_message->Next( + SIZE_MAX, &unused)); + grpc_error* error = + sender->send_message_op->payload->send_message.send_message->Pull( + &message_slice); if (error != GRPC_ERROR_NONE) { cancel_stream_locked(sender, GRPC_ERROR_REF(error)); break; @@ -544,13 +544,11 @@ static void message_transfer_locked(inproc_stream* sender, remaining -= GRPC_SLICE_LENGTH(message_slice); grpc_slice_buffer_add(&receiver->recv_message, message_slice); } while (remaining > 0); - grpc_byte_stream_destroy( - sender->send_message_op->payload->send_message.send_message); + sender->send_message_op->payload->send_message.send_message.reset(); - grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, - 0); - *receiver->recv_message_op->payload->recv_message.recv_message = - &receiver->recv_stream.base; + receiver->recv_stream.Init(&receiver->recv_message, 0); + receiver->recv_message_op->payload->recv_message.recv_message->reset( + receiver->recv_stream.get()); INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", receiver); GRPC_CLOSURE_SCHED( @@ -606,8 +604,7 @@ static void op_state_machine(void* arg, grpc_error* error) { (s->trailing_md_sent || other->recv_trailing_md_op)) { // A server send will never be matched if the client is waiting // for trailing metadata already - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, GRPC_ERROR_NONE, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -744,8 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) { if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); @@ -803,8 +799,7 @@ static void op_state_machine(void* arg, grpc_error* error) { s->send_message_op) { // Nothing further will try to receive from this stream, so finish off // any outstanding send_message op - grpc_byte_stream_destroy( - s->send_message_op->payload->send_message.send_message); + s->send_message_op->payload->send_message.send_message.reset(); complete_if_batch_end_locked( s, new_err, s->send_message_op, "op_state_machine scheduling send-message-on-complete"); diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c4844da3188..adb6ee5a06e 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -37,6 +37,7 @@ #include "src/core/lib/gpr/arena.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -221,9 +222,9 @@ struct grpc_call { int send_extra_metadata_count; grpc_millis send_deadline; - grpc_slice_buffer_stream sending_stream; + grpc_core::ManualConstructor sending_stream; - grpc_byte_stream* receiving_stream; + grpc_core::OrphanablePtr receiving_stream; grpc_byte_buffer** receiving_buffer; grpc_slice receiving_slice; grpc_closure receiving_slice_ready; @@ -522,9 +523,7 @@ static void destroy_call(void* call, grpc_error* error) { grpc_metadata_batch_destroy( &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]); } - if (c->receiving_stream != nullptr) { - grpc_byte_stream_destroy(c->receiving_stream); - } + c->receiving_stream.reset(); parent_call* pc = get_parent_call(c); if (pc != nullptr) { gpr_mu_destroy(&pc->child_list_mu); @@ -1281,25 +1280,21 @@ static void continue_receiving_slices(batch_control* bctl) { grpc_error* error; grpc_call* call = bctl->call; for (;;) { - size_t remaining = call->receiving_stream->length - + size_t remaining = call->receiving_stream->length() - (*call->receiving_buffer)->data.raw.slice_buffer.length; if (remaining == 0) { call->receiving_message = 0; - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); finish_batch_step(bctl); return; } - if (grpc_byte_stream_next(call->receiving_stream, remaining, - &call->receiving_slice_ready)) { - error = - grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice); + if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) { + error = call->receiving_stream->Pull(&call->receiving_slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, call->receiving_slice); } else { - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; call->receiving_message = 0; @@ -1315,19 +1310,17 @@ static void continue_receiving_slices(batch_control* bctl) { static void receiving_slice_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; - grpc_byte_stream* bs = call->receiving_stream; bool release_error = false; if (error == GRPC_ERROR_NONE) { grpc_slice slice; - error = grpc_byte_stream_pull(bs, &slice); + error = call->receiving_stream->Pull(&slice); if (error == GRPC_ERROR_NONE) { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, slice); continue_receiving_slices(bctl); } else { - /* Error returned by grpc_byte_stream_pull needs to be released manually - */ + /* Error returned by ByteStream::Pull() needs to be released manually */ release_error = true; } } @@ -1336,8 +1329,7 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) { if (grpc_trace_operation_failures.enabled()) { GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); } - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; + call->receiving_stream.reset(); grpc_byte_buffer_destroy(*call->receiving_buffer); *call->receiving_buffer = nullptr; call->receiving_message = 0; @@ -1355,8 +1347,8 @@ static void process_data_after_md(batch_control* bctl) { call->receiving_message = 0; finish_batch_step(bctl); } else { - call->test_only_last_message_flags = call->receiving_stream->flags; - if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) && + call->test_only_last_message_flags = call->receiving_stream->flags(); + if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && (call->incoming_message_compression_algorithm > GRPC_MESSAGE_COMPRESS_NONE)) { grpc_compression_algorithm algo; @@ -1379,10 +1371,7 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) { batch_control* bctl = static_cast(bctlp); grpc_call* call = bctl->call; if (error != GRPC_ERROR_NONE) { - if (call->receiving_stream != nullptr) { - grpc_byte_stream_destroy(call->receiving_stream); - call->receiving_stream = nullptr; - } + call->receiving_stream.reset(); add_batch_error(bctl, GRPC_ERROR_REF(error), true); cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } @@ -1676,21 +1665,20 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops, error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - stream_op->send_message = true; - call->sending_message = true; - grpc_slice_buffer_stream_init( - &call->sending_stream, - &op->data.send_message.send_message->data.raw.slice_buffer, - op->flags); + uint32_t flags = op->flags; /* If the outgoing buffer is already compressed, mark it as so in the flags. These will be picked up by the compression filter and further (wasteful) attempts at compression skipped. */ if (op->data.send_message.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { - call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - stream_op_payload->send_message.send_message = - &call->sending_stream.base; + stream_op->send_message = true; + call->sending_message = true; + call->sending_stream.Init( + &op->data.send_message.send_message->data.raw.slice_buffer, flags); + stream_op_payload->send_message.send_message.reset( + call->sending_stream.get()); break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { @@ -1909,7 +1897,7 @@ done_with_error: } if (stream_op->send_message) { call->sending_message = false; - grpc_byte_stream_destroy(&call->sending_stream.base); + call->sending_stream->Orphan(); } if (stream_op->send_trailing_metadata) { call->sent_final_op = false; diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index e1751f80108..1aaf40fb997 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -25,160 +25,123 @@ #include +#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/slice/slice_internal.h" -bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete) { - return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete); -} +namespace grpc_core { -grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - return byte_stream->vtable->pull(byte_stream, slice); -} +// +// SliceBufferByteStream +// -void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - byte_stream->vtable->shutdown(byte_stream, error); +SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, + uint32_t flags) + : ByteStream(static_cast(slice_buffer->length), flags) { + GPR_ASSERT(slice_buffer->length <= UINT32_MAX); + grpc_slice_buffer_init(&backing_buffer_); + grpc_slice_buffer_swap(slice_buffer, &backing_buffer_); } -void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) { - byte_stream->vtable->destroy(byte_stream); -} +SliceBufferByteStream::~SliceBufferByteStream() {} -// grpc_slice_buffer_stream +void SliceBufferByteStream::Orphan() { + grpc_slice_buffer_destroy(&backing_buffer_); + GRPC_ERROR_UNREF(shutdown_error_); + // Note: We do not actually delete the object here, since + // SliceBufferByteStream is usually allocated as part of a larger + // object and has an OrphanablePtr of itself passed down through the + // filter stack. +} -static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { - grpc_slice_buffer_stream* stream = - reinterpret_cast(byte_stream); - GPR_ASSERT(stream->cursor < stream->backing_buffer.count); +bool SliceBufferByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { + GPR_ASSERT(cursor_ < backing_buffer_.count); return true; } -static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - grpc_slice_buffer_stream* stream = - reinterpret_cast(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); +grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) { + if (shutdown_error_ != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(shutdown_error_); } - GPR_ASSERT(stream->cursor < stream->backing_buffer.count); - *slice = - grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]); - stream->cursor++; + GPR_ASSERT(cursor_ < backing_buffer_.count); + *slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]); + ++cursor_; return GRPC_ERROR_NONE; } -static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_slice_buffer_stream* stream = - reinterpret_cast(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; +void SliceBufferByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(shutdown_error_); + shutdown_error_ = error; } -static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) { - grpc_slice_buffer_stream* stream = - reinterpret_cast(byte_stream); - grpc_slice_buffer_destroy(&stream->backing_buffer); - GRPC_ERROR_UNREF(stream->shutdown_error); +// +// ByteStreamCache +// + +ByteStreamCache::ByteStreamCache(OrphanablePtr underlying_stream) + : underlying_stream_(std::move(underlying_stream)) { + grpc_slice_buffer_init(&cache_buffer_); } -static const grpc_byte_stream_vtable slice_buffer_stream_vtable = { - slice_buffer_stream_next, slice_buffer_stream_pull, - slice_buffer_stream_shutdown, slice_buffer_stream_destroy}; +ByteStreamCache::~ByteStreamCache() { + if (underlying_stream_ != nullptr) Destroy(); +} -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, - grpc_slice_buffer* slice_buffer, - uint32_t flags) { - GPR_ASSERT(slice_buffer->length <= UINT32_MAX); - stream->base.length = static_cast(slice_buffer->length); - stream->base.flags = flags; - stream->base.vtable = &slice_buffer_stream_vtable; - grpc_slice_buffer_init(&stream->backing_buffer); - grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer); - stream->cursor = 0; - stream->shutdown_error = GRPC_ERROR_NONE; +void ByteStreamCache::Destroy() { + underlying_stream_.reset(); + grpc_slice_buffer_destroy_internal(&cache_buffer_); } -// grpc_caching_byte_stream +// +// ByteStreamCache::CachingByteStream +// -void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, - grpc_byte_stream* underlying_stream) { - cache->underlying_stream = underlying_stream; - grpc_slice_buffer_init(&cache->cache_buffer); -} +ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache) + : ByteStream(cache->underlying_stream_->length(), + cache->underlying_stream_->flags()), + cache_(cache) {} -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) { - grpc_byte_stream_destroy(cache->underlying_stream); - grpc_slice_buffer_destroy_internal(&cache->cache_buffer); +ByteStreamCache::CachingByteStream::~CachingByteStream() {} + +void ByteStreamCache::CachingByteStream::Orphan() { + GRPC_ERROR_UNREF(shutdown_error_); + // Note: We do not actually delete the object here, since + // CachingByteStream is usually allocated as part of a larger + // object and has an OrphanablePtr of itself passed down through the + // filter stack. } -static bool caching_byte_stream_next(grpc_byte_stream* byte_stream, - size_t max_size_hint, - grpc_closure* on_complete) { - grpc_caching_byte_stream* stream = - reinterpret_cast(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) return true; - if (stream->cursor < stream->cache->cache_buffer.count) return true; - return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint, - on_complete); +bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint, + grpc_closure* on_complete) { + if (shutdown_error_ != GRPC_ERROR_NONE) return true; + if (cursor_ < cache_->cache_buffer_.count) return true; + return cache_->underlying_stream_->Next(max_size_hint, on_complete); } -static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice) { - grpc_caching_byte_stream* stream = - reinterpret_cast(byte_stream); - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); +grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) { + if (shutdown_error_ != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(shutdown_error_); } - if (stream->cursor < stream->cache->cache_buffer.count) { - *slice = grpc_slice_ref_internal( - stream->cache->cache_buffer.slices[stream->cursor]); - ++stream->cursor; + if (cursor_ < cache_->cache_buffer_.count) { + *slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]); + ++cursor_; return GRPC_ERROR_NONE; } - grpc_error* error = - grpc_byte_stream_pull(stream->cache->underlying_stream, slice); + grpc_error* error = cache_->underlying_stream_->Pull(slice); if (error == GRPC_ERROR_NONE) { - ++stream->cursor; - grpc_slice_buffer_add(&stream->cache->cache_buffer, + ++cursor_; + grpc_slice_buffer_add(&cache_->cache_buffer_, grpc_slice_ref_internal(*slice)); } return error; } -static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error) { - grpc_caching_byte_stream* stream = - reinterpret_cast(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = GRPC_ERROR_REF(error); - grpc_byte_stream_shutdown(stream->cache->underlying_stream, error); +void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) { + GRPC_ERROR_UNREF(shutdown_error_); + shutdown_error_ = GRPC_ERROR_REF(error); + cache_->underlying_stream_->Shutdown(error); } -static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) { - grpc_caching_byte_stream* stream = - reinterpret_cast(byte_stream); - GRPC_ERROR_UNREF(stream->shutdown_error); -} +void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; } -static const grpc_byte_stream_vtable caching_byte_stream_vtable = { - caching_byte_stream_next, caching_byte_stream_pull, - caching_byte_stream_shutdown, caching_byte_stream_destroy}; - -void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, - grpc_byte_stream_cache* cache) { - memset(stream, 0, sizeof(*stream)); - stream->base.length = cache->underlying_stream->length; - stream->base.flags = cache->underlying_stream->flags; - stream->base.vtable = &caching_byte_stream_vtable; - stream->cache = cache; - stream->shutdown_error = GRPC_ERROR_NONE; -} - -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) { - stream->cursor = 0; -} +} // namespace grpc_core diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 4d3c3c131b0..f8243ac40d3 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -22,6 +22,8 @@ #include #include +#include "src/core/lib/gprpp/abstract.h" +#include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/iomgr/exec_ctx.h" /** Internal bit flag for grpc_begin_message's \a flags signaling the use of @@ -30,71 +32,82 @@ /** Mask of all valid internal flags. */ #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) -typedef struct grpc_byte_stream grpc_byte_stream; - -typedef struct { - bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete); - grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice); - void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error); - void (*destroy)(grpc_byte_stream* byte_stream); -} grpc_byte_stream_vtable; - -struct grpc_byte_stream { - uint32_t length; - uint32_t flags; - const grpc_byte_stream_vtable* vtable; +namespace grpc_core { + +class ByteStream : public Orphanable { + public: + virtual ~ByteStream() {} + + // Returns true if the bytes are available immediately (in which case + // on_complete will not be called), or false if the bytes will be available + // asynchronously (in which case on_complete will be called when they + // are available). + // + // max_size_hint can be set as a hint as to the maximum number + // of bytes that would be acceptable to read. + virtual bool Next(size_t max_size_hint, + grpc_closure* on_complete) GRPC_ABSTRACT; + + // Returns the next slice in the byte stream when it is available, as + // indicated by Next(). + // + // Once a slice is returned into *slice, it is owned by the caller. + virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT; + + // Shuts down the byte stream. + // + // If there is a pending call to on_complete from Next(), it will be + // invoked with the error passed to Shutdown(). + // + // The next call to Pull() (if any) will return the error passed to + // Shutdown(). + virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT; + + uint32_t length() const { return length_; } + uint32_t flags() const { return flags_; } + + void set_flags(uint32_t flags) { flags_ = flags; } + + GRPC_ABSTRACT_BASE_CLASS + + protected: + ByteStream(uint32_t length, uint32_t flags) + : length_(length), flags_(flags) {} + + private: + const uint32_t length_; + uint32_t flags_; }; -// Returns true if the bytes are available immediately (in which case -// on_complete will not be called), false if the bytes will be available -// asynchronously. // -// max_size_hint can be set as a hint as to the maximum number -// of bytes that would be acceptable to read. -bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint, - grpc_closure* on_complete); - -// Returns the next slice in the byte stream when it is ready (indicated by -// either grpc_byte_stream_next returning true or on_complete passed to -// grpc_byte_stream_next is called). +// SliceBufferByteStream // -// Once a slice is returned into *slice, it is owned by the caller. -grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream, - grpc_slice* slice); - -// Shuts down the byte stream. +// A ByteStream that wraps a slice buffer. // -// If there is a pending call to on_complete from grpc_byte_stream_next(), -// it will be invoked with the error passed to grpc_byte_stream_shutdown(). -// -// The next call to grpc_byte_stream_pull() (if any) will return the error -// passed to grpc_byte_stream_shutdown(). -void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream, - grpc_error* error); -void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream); +class SliceBufferByteStream : public ByteStream { + public: + // Removes all slices in slice_buffer, leaving it empty. + SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags); + + ~SliceBufferByteStream(); + + void Orphan() override; + + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; + + private: + grpc_slice_buffer backing_buffer_; + size_t cursor_ = 0; + grpc_error* shutdown_error_ = GRPC_ERROR_NONE; +}; -// grpc_slice_buffer_stream // -// A grpc_byte_stream that wraps a slice buffer. The stream takes -// ownership of the slices in the buffer, and on destruction will -// reset the contents of the buffer. - -typedef struct grpc_slice_buffer_stream { - grpc_byte_stream base; - grpc_slice_buffer backing_buffer; - size_t cursor; - grpc_error* shutdown_error; -} grpc_slice_buffer_stream; - -void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, - grpc_slice_buffer* slice_buffer, - uint32_t flags); - -// grpc_caching_byte_stream +// CachingByteStream // -// A grpc_byte_stream that that wraps an underlying byte stream but caches +// A ByteStream that that wraps an underlying byte stream but caches // the resulting slices in a slice buffer. If an initial attempt fails // without fully draining the underlying stream, a new caching stream // can be created from the same underlying cache, in which case it will @@ -102,32 +115,47 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream, // underlying stream. // // NOTE: No synchronization is done, so it is not safe to have multiple -// grpc_caching_byte_streams simultaneously drawing from the same underlying -// grpc_byte_stream_cache at the same time. +// CachingByteStreams simultaneously drawing from the same underlying +// ByteStreamCache at the same time. +// + +class ByteStreamCache { + public: + class CachingByteStream : public ByteStream { + public: + explicit CachingByteStream(ByteStreamCache* cache); + + ~CachingByteStream(); + + void Orphan() override; -typedef struct { - grpc_byte_stream* underlying_stream; - grpc_slice_buffer cache_buffer; -} grpc_byte_stream_cache; + bool Next(size_t max_size_hint, grpc_closure* on_complete) override; + grpc_error* Pull(grpc_slice* slice) override; + void Shutdown(grpc_error* error) override; -// Takes ownership of underlying_stream. -void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache, - grpc_byte_stream* underlying_stream); + // Resets the byte stream to the start of the underlying stream. + void Reset(); -// Must not be called while still in use by a grpc_caching_byte_stream. -void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache); + private: + ByteStreamCache* cache_; + size_t cursor_ = 0; + grpc_error* shutdown_error_ = GRPC_ERROR_NONE; + }; -typedef struct { - grpc_byte_stream base; - grpc_byte_stream_cache* cache; - size_t cursor; - grpc_error* shutdown_error; -} grpc_caching_byte_stream; + explicit ByteStreamCache(OrphanablePtr underlying_stream); -void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream, - grpc_byte_stream_cache* cache); + ~ByteStreamCache(); + + // Must not be destroyed while still in use by a CachingByteStream. + void Destroy(); + + grpc_slice_buffer* cache_buffer() { return &cache_buffer_; } + + private: + OrphanablePtr underlying_stream_; + grpc_slice_buffer cache_buffer_; +}; -// Resets the byte stream to the start of the underlying stream. -void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream); +} // namespace grpc_core #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index c90d16fc32e..6b41e4b37e5 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -209,7 +209,7 @@ void grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch* batch, grpc_error* error, grpc_call_combiner* call_combiner) { if (batch->send_message) { - grpc_byte_stream_destroy(batch->payload->send_message.send_message); + batch->payload->send_message.send_message.reset(); } if (batch->recv_message) { GRPC_CALL_COMBINER_START( diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 37e50344c48..10e9df0f7c7 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -184,11 +184,10 @@ struct grpc_transport_stream_op_batch_payload { struct { // The transport (or a filter that decides to return a failure before - // the op gets down to the transport) is responsible for calling - // grpc_byte_stream_destroy() on this. + // the op gets down to the transport) takes ownership. // The batch's on_complete will not be called until after the byte - // stream is destroyed. - grpc_byte_stream* send_message; + // stream is orphaned. + grpc_core::OrphanablePtr send_message; } send_message; struct { @@ -216,10 +215,8 @@ struct grpc_transport_stream_op_batch_payload { struct { // Will be set by the transport to point to the byte stream // containing a received message. - // The caller is responsible for calling grpc_byte_stream_destroy() - // on this byte stream. // Will be NULL if trailing metadata is received instead of a message. - grpc_byte_stream** recv_message; + grpc_core::OrphanablePtr* recv_message; /** Should be enqueued when one message is ready to be processed. */ grpc_closure* recv_message_ready; } recv_message; diff --git a/src/core/lib/transport/transport_op_string.cc b/src/core/lib/transport/transport_op_string.cc index 6898da17ed2..99af7c1931e 100644 --- a/src/core/lib/transport/transport_op_string.cc +++ b/src/core/lib/transport/transport_op_string.cc @@ -75,9 +75,16 @@ char* grpc_transport_stream_op_batch_string( if (op->send_message) { gpr_strvec_add(&b, gpr_strdup(" ")); - gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", - op->payload->send_message.send_message->flags, - op->payload->send_message.send_message->length); + if (op->payload->send_message.send_message != nullptr) { + gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d", + op->payload->send_message.send_message->flags(), + op->payload->send_message.send_message->length()); + } else { + // This can happen when we check a batch after the transport has + // processed and cleared the send_message op. + tmp = + gpr_strdup("SEND_MESSAGE(flag and length unknown, already orphaned)"); + } gpr_strvec_add(&b, tmp); } diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 60eefcb0d17..bd0ec969b4a 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -182,20 +182,22 @@ class TransportStreamOpBatch { op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure; } - grpc_byte_stream* send_message() const { - return op_->send_message ? op_->payload->send_message.send_message + grpc_core::OrphanablePtr* send_message() const { + return op_->send_message ? &op_->payload->send_message.send_message : nullptr; } - void set_send_message(grpc_byte_stream* send_message) { + void set_send_message( + grpc_core::OrphanablePtr send_message) { op_->send_message = true; - op_->payload->send_message.send_message = send_message; + op_->payload->send_message.send_message = std::move(send_message); } - grpc_byte_stream** recv_message() const { + grpc_core::OrphanablePtr* recv_message() const { return op_->recv_message ? op_->payload->recv_message.recv_message : nullptr; } - void set_recv_message(grpc_byte_stream** recv_message) { + void set_recv_message( + grpc_core::OrphanablePtr* recv_message) { op_->recv_message = true; op_->payload->recv_message.recv_message = recv_message; } diff --git a/test/core/transport/BUILD b/test/core/transport/BUILD index 2c2d05b9aea..84fb3a1421d 100644 --- a/test/core/transport/BUILD +++ b/test/core/transport/BUILD @@ -43,6 +43,9 @@ grpc_cc_test( "//test/core/util:gpr_test_util", "//test/core/util:grpc_test_util", ], + external_deps = [ + "gtest", + ], ) grpc_cc_test( diff --git a/test/core/transport/byte_stream_test.cc b/test/core/transport/byte_stream_test.cc index 6947d50976f..80692ec2da0 100644 --- a/test/core/transport/byte_stream_test.cc +++ b/test/core/transport/byte_stream_test.cc @@ -27,16 +27,18 @@ #include "test/core/util/test_config.h" +#include + +namespace grpc_core { +namespace { + // -// grpc_slice_buffer_stream tests +// SliceBufferByteStream tests // -static void not_called_closure(void* arg, grpc_error* error) { - GPR_ASSERT(false); -} +void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); } -static void test_slice_buffer_stream_basic(void) { - gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic"); +TEST(SliceBufferByteStream, Basic) { grpc_core::ExecCtx exec_ctx; // Create and populate slice buffer. grpc_slice_buffer buffer; @@ -49,28 +51,26 @@ static void test_slice_buffer_stream_basic(void) { grpc_slice_buffer_add(&buffer, input[i]); } // Create byte stream. - grpc_slice_buffer_stream stream; - grpc_slice_buffer_stream_init(&stream, &buffer, 0); - GPR_ASSERT(stream.base.length == 6); + SliceBufferByteStream stream(&buffer, 0); + grpc_slice_buffer_destroy_internal(&buffer); + EXPECT_EQ(6U, stream.length()); grpc_closure closure; - GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr, + GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, grpc_schedule_on_exec_ctx); - // Read each slice. Note that next() always returns synchronously. + // Read each slice. Note that Next() always returns synchronously. for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); grpc_slice output; - grpc_error* error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_error* error = stream.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[i], output)); grpc_slice_unref_internal(output); } // Clean up. - grpc_byte_stream_destroy(&stream.base); - grpc_slice_buffer_destroy_internal(&buffer); + stream.Orphan(); } -static void test_slice_buffer_stream_shutdown(void) { - gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown"); +TEST(SliceBufferByteStream, Shutdown) { grpc_core::ExecCtx exec_ctx; // Create and populate slice buffer. grpc_slice_buffer buffer; @@ -83,40 +83,38 @@ static void test_slice_buffer_stream_shutdown(void) { grpc_slice_buffer_add(&buffer, input[i]); } // Create byte stream. - grpc_slice_buffer_stream stream; - grpc_slice_buffer_stream_init(&stream, &buffer, 0); - GPR_ASSERT(stream.base.length == 6); + SliceBufferByteStream stream(&buffer, 0); + grpc_slice_buffer_destroy_internal(&buffer); + EXPECT_EQ(6U, stream.length()); grpc_closure closure; - GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr, + GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, grpc_schedule_on_exec_ctx); // Read the first slice. - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); grpc_slice output; - grpc_error* error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_error* error = stream.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[0], output)); grpc_slice_unref_internal(output); // Now shutdown. grpc_error* shutdown_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error"); - grpc_byte_stream_shutdown(&stream.base, GRPC_ERROR_REF(shutdown_error)); + stream.Shutdown(GRPC_ERROR_REF(shutdown_error)); // After shutdown, the next pull() should return the error. - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); - error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == shutdown_error); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); + error = stream.Pull(&output); + EXPECT_TRUE(error == shutdown_error); GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(shutdown_error); // Clean up. - grpc_byte_stream_destroy(&stream.base); - grpc_slice_buffer_destroy_internal(&buffer); + stream.Orphan(); } // -// grpc_caching_byte_stream tests +// CachingByteStream tests // -static void test_caching_byte_stream_basic(void) { - gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic"); +TEST(CachingByteStream, Basic) { grpc_core::ExecCtx exec_ctx; // Create and populate slice buffer byte stream. grpc_slice_buffer buffer; @@ -128,34 +126,30 @@ static void test_caching_byte_stream_basic(void) { for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { grpc_slice_buffer_add(&buffer, input[i]); } - grpc_slice_buffer_stream underlying_stream; - grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + SliceBufferByteStream underlying_stream(&buffer, 0); + grpc_slice_buffer_destroy_internal(&buffer); // Create cache and caching stream. - grpc_byte_stream_cache cache; - grpc_byte_stream_cache_init(&cache, &underlying_stream.base); - grpc_caching_byte_stream stream; - grpc_caching_byte_stream_init(&stream, &cache); + ByteStreamCache cache((OrphanablePtr(&underlying_stream))); + ByteStreamCache::CachingByteStream stream(&cache); grpc_closure closure; - GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr, + GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, grpc_schedule_on_exec_ctx); // Read each slice. Note that next() always returns synchronously, // because the underlying byte stream always does. for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); grpc_slice output; - grpc_error* error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[i], output)); + grpc_error* error = stream.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[i], output)); grpc_slice_unref_internal(output); } // Clean up. - grpc_byte_stream_destroy(&stream.base); - grpc_byte_stream_cache_destroy(&cache); - grpc_slice_buffer_destroy_internal(&buffer); + stream.Orphan(); + cache.Destroy(); } -static void test_caching_byte_stream_reset(void) { - gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset"); +TEST(CachingByteStream, Reset) { grpc_core::ExecCtx exec_ctx; // Create and populate slice buffer byte stream. grpc_slice_buffer buffer; @@ -167,41 +161,37 @@ static void test_caching_byte_stream_reset(void) { for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { grpc_slice_buffer_add(&buffer, input[i]); } - grpc_slice_buffer_stream underlying_stream; - grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + SliceBufferByteStream underlying_stream(&buffer, 0); + grpc_slice_buffer_destroy_internal(&buffer); // Create cache and caching stream. - grpc_byte_stream_cache cache; - grpc_byte_stream_cache_init(&cache, &underlying_stream.base); - grpc_caching_byte_stream stream; - grpc_caching_byte_stream_init(&stream, &cache); + ByteStreamCache cache((OrphanablePtr(&underlying_stream))); + ByteStreamCache::CachingByteStream stream(&cache); grpc_closure closure; - GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr, + GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, grpc_schedule_on_exec_ctx); // Read one slice. - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); grpc_slice output; - grpc_error* error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_error* error = stream.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[0], output)); grpc_slice_unref_internal(output); // Reset the caching stream. The reads should start over from the // first slice. - grpc_caching_byte_stream_reset(&stream); + stream.Reset(); for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { - GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure)); - error = grpc_byte_stream_pull(&stream.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[i], output)); + ASSERT_TRUE(stream.Next(~(size_t)0, &closure)); + error = stream.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[i], output)); grpc_slice_unref_internal(output); } // Clean up. - grpc_byte_stream_destroy(&stream.base); - grpc_byte_stream_cache_destroy(&cache); - grpc_slice_buffer_destroy_internal(&buffer); + stream.Orphan(); + cache.Destroy(); } -static void test_caching_byte_stream_shared_cache(void) { - gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache"); +TEST(CachingByteStream, SharedCache) { grpc_core::ExecCtx exec_ctx; // Create and populate slice buffer byte stream. grpc_slice_buffer buffer; @@ -213,54 +203,50 @@ static void test_caching_byte_stream_shared_cache(void) { for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { grpc_slice_buffer_add(&buffer, input[i]); } - grpc_slice_buffer_stream underlying_stream; - grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0); + SliceBufferByteStream underlying_stream(&buffer, 0); + grpc_slice_buffer_destroy_internal(&buffer); // Create cache and two caching streams. - grpc_byte_stream_cache cache; - grpc_byte_stream_cache_init(&cache, &underlying_stream.base); - grpc_caching_byte_stream stream1; - grpc_caching_byte_stream_init(&stream1, &cache); - grpc_caching_byte_stream stream2; - grpc_caching_byte_stream_init(&stream2, &cache); + ByteStreamCache cache((OrphanablePtr(&underlying_stream))); + ByteStreamCache::CachingByteStream stream1(&cache); + ByteStreamCache::CachingByteStream stream2(&cache); grpc_closure closure; - GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr, + GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr, grpc_schedule_on_exec_ctx); // Read one slice from stream1. - GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure)); + EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); grpc_slice output; - grpc_error* error = grpc_byte_stream_pull(&stream1.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[0], output)); + grpc_error* error = stream1.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[0], output)); grpc_slice_unref_internal(output); // Read all slices from stream2. for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) { - GPR_ASSERT(grpc_byte_stream_next(&stream2.base, ~(size_t)0, &closure)); - error = grpc_byte_stream_pull(&stream2.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[i], output)); + EXPECT_TRUE(stream2.Next(~(size_t)0, &closure)); + error = stream2.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[i], output)); grpc_slice_unref_internal(output); } // Now read the second slice from stream1. - GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure)); - error = grpc_byte_stream_pull(&stream1.base, &output); - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(grpc_slice_eq(input[1], output)); + EXPECT_TRUE(stream1.Next(~(size_t)0, &closure)); + error = stream1.Pull(&output); + EXPECT_TRUE(error == GRPC_ERROR_NONE); + EXPECT_TRUE(grpc_slice_eq(input[1], output)); grpc_slice_unref_internal(output); // Clean up. - grpc_byte_stream_destroy(&stream1.base); - grpc_byte_stream_destroy(&stream2.base); - grpc_byte_stream_cache_destroy(&cache); - grpc_slice_buffer_destroy_internal(&buffer); + stream1.Orphan(); + stream2.Orphan(); + cache.Destroy(); } +} // namespace +} // namespace grpc_core + int main(int argc, char** argv) { grpc_init(); grpc_test_init(argc, argv); - test_slice_buffer_stream_basic(); - test_slice_buffer_stream_shutdown(); - test_caching_byte_stream_basic(); - test_caching_byte_stream_reset(); - test_caching_byte_stream_shared_cache(); + ::testing::InitGoogleTest(&argc, argv); + int retval = RUN_ALL_TESTS(); grpc_shutdown(); - return 0; + return retval; } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 259bef4fd1d..d00c79b610c 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -398,13 +398,13 @@ static void BM_TransportStreamSend(benchmark::State& state) { memset(&op, 0, sizeof(op)); op.payload = &op_payload; }; - grpc_slice_buffer_stream send_stream; - grpc_slice_buffer send_buffer; - grpc_slice_buffer_init(&send_buffer); - grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0))); - memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0, - GRPC_SLICE_LENGTH(send_buffer.slices[0])); - + // Create the send_message payload slice. + // Note: We use grpc_slice_malloc_large() instead of grpc_slice_malloc() + // to force the slice to be refcounted, so that it remains alive when it + // is unreffed after each send_message op. + grpc_slice send_slice = grpc_slice_malloc_large(state.range(0)); + memset(GRPC_SLICE_START_PTR(send_slice), 0, GRPC_SLICE_LENGTH(send_slice)); + grpc_core::ManualConstructor send_stream; grpc_metadata_batch b; grpc_metadata_batch_init(&b); b.deadline = GRPC_MILLIS_INF_FUTURE; @@ -424,14 +424,18 @@ static void BM_TransportStreamSend(benchmark::State& state) { gpr_event_set(bm_done, (void*)1); return; } + grpc_slice_buffer send_buffer; + grpc_slice_buffer_init(&send_buffer); + grpc_slice_buffer_add(&send_buffer, grpc_slice_ref(send_slice)); + send_stream.Init(&send_buffer, 0); + grpc_slice_buffer_destroy(&send_buffer); // force outgoing window to be yuge s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow(); f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow(); - grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0); reset_op(); op.on_complete = c.get(); op.send_message = true; - op.payload->send_message.send_message = &send_stream.base; + op.payload->send_message.send_message.reset(send_stream.get()); s->Op(&op); }); @@ -454,7 +458,7 @@ static void BM_TransportStreamSend(benchmark::State& state) { s.reset(); track_counters.Finish(state); grpc_metadata_batch_destroy(&b); - grpc_slice_buffer_destroy(&send_buffer); + grpc_slice_unref(send_slice); } BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024); @@ -524,7 +528,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) { grpc_transport_stream_op_batch_payload op_payload; memset(&op_payload, 0, sizeof(op_payload)); grpc_transport_stream_op_batch op; - grpc_byte_stream* recv_stream; + grpc_core::OrphanablePtr recv_stream; grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384); auto reset_op = [&]() { @@ -579,21 +583,20 @@ static void BM_TransportStreamRecv(benchmark::State& state) { drain = MakeClosure([&](grpc_error* error) { do { - if (received == recv_stream->length) { - grpc_byte_stream_destroy(recv_stream); + if (received == recv_stream->length()) { + recv_stream.reset(); GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE); return; } - } while (grpc_byte_stream_next(recv_stream, recv_stream->length - received, - drain_continue.get()) && - GRPC_ERROR_NONE == - grpc_byte_stream_pull(recv_stream, &recv_slice) && + } while (recv_stream->Next(recv_stream->length() - received, + drain_continue.get()) && + GRPC_ERROR_NONE == recv_stream->Pull(&recv_slice) && (received += GRPC_SLICE_LENGTH(recv_slice), grpc_slice_unref_internal(recv_slice), true)); }); drain_continue = MakeClosure([&](grpc_error* error) { - grpc_byte_stream_pull(recv_stream, &recv_slice); + recv_stream->Pull(&recv_slice); received += GRPC_SLICE_LENGTH(recv_slice); grpc_slice_unref_internal(recv_slice); GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE); diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index f5a9eb828ed..fb1a4fe0f02 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -146,23 +146,6 @@ "third_party": false, "type": "target" }, - { - "deps": [ - "gpr", - "gpr_test_util", - "grpc", - "grpc_test_util" - ], - "headers": [], - "is_filegroup": false, - "language": "c", - "name": "byte_stream_test", - "src": [ - "test/core/transport/byte_stream_test.cc" - ], - "third_party": false, - "type": "target" - }, { "deps": [ "gpr", @@ -3002,6 +2985,23 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c++", + "name": "byte_stream_test", + "src": [ + "test/core/transport/byte_stream_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 1406c4ac7ee..1e320fc15dc 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -195,30 +195,6 @@ ], "uses_polling": false }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": false, - "language": "c", - "name": "byte_stream_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": false - }, { "args": [], "benchmark": false, @@ -3601,6 +3577,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "byte_stream_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, { "args": [], "benchmark": false,