Convert byte_stream API to C++.

pull/14607/head
Mark D. Roth 7 years ago
parent 59ea0ae3eb
commit 3d8b32d8b3
  1. 1
      BUILD
  2. 65
      CMakeLists.txt
  3. 84
      Makefile
  4. 23
      build.yaml
  5. 38
      src/core/ext/filters/client_channel/client_channel.cc
  6. 48
      src/core/ext/filters/http/client/http_client_filter.cc
  7. 41
      src/core/ext/filters/http/message_compress/message_compress_filter.cc
  8. 17
      src/core/ext/filters/http/server/http_server_filter.cc
  9. 10
      src/core/ext/filters/message_size/message_size_filter.cc
  10. 4
      src/core/ext/filters/workarounds/workaround_cronet_compression_filter.cc
  11. 293
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  12. 32
      src/core/ext/transport/chttp2/transport/frame_data.cc
  13. 9
      src/core/ext/transport/chttp2/transport/frame_data.h
  14. 77
      src/core/ext/transport/chttp2/transport/internal.h
  15. 41
      src/core/ext/transport/cronet/transport/cronet_transport.cc
  16. 39
      src/core/ext/transport/inproc/inproc_transport.cc
  17. 60
      src/core/lib/surface/call.cc
  18. 197
      src/core/lib/transport/byte_stream.cc
  19. 182
      src/core/lib/transport/byte_stream.h
  20. 2
      src/core/lib/transport/transport.cc
  21. 11
      src/core/lib/transport/transport.h
  22. 13
      src/core/lib/transport/transport_op_string.cc
  23. 14
      src/cpp/common/channel_filter.h
  24. 3
      test/core/transport/BUILD
  25. 194
      test/core/transport/byte_stream_test.cc
  26. 39
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  27. 34
      tools/run_tests/generated/sources_and_headers.json
  28. 48
      tools/run_tests/generated/tests.json

@ -939,6 +939,7 @@ grpc_cc_library(
"grpc_codegen",
"grpc_trace",
"inlined_vector",
"orphanable",
"ref_counted",
"ref_counted_ptr",
],

@ -224,7 +224,6 @@ add_dependencies(buildtests_c avl_test)
add_dependencies(buildtests_c bad_server_response_test)
add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test)
add_dependencies(buildtests_c byte_stream_test)
add_dependencies(buildtests_c channel_create_test)
add_dependencies(buildtests_c chttp2_hpack_encoder_test)
add_dependencies(buildtests_c chttp2_stream_map_test)
@ -530,6 +529,7 @@ endif()
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx bm_pollset)
endif()
add_dependencies(buildtests_cxx byte_stream_test)
add_dependencies(buildtests_cxx channel_arguments_test)
add_dependencies(buildtests_cxx channel_filter_test)
add_dependencies(buildtests_cxx check_gcp_environment_linux_test)
@ -5436,33 +5436,6 @@ target_link_libraries(bin_encoder_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(byte_stream_test
test/core/transport/byte_stream_test.cc
)
target_include_directories(byte_stream_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
)
target_link_libraries(byte_stream_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(channel_create_test
test/core/surface/channel_create_test.cc
)
@ -9984,6 +9957,42 @@ endif()
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(byte_stream_test
test/core/transport/byte_stream_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(byte_stream_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${_gRPC_SSL_INCLUDE_DIR}
PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR}
PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR}
PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR}
PRIVATE ${_gRPC_CARES_INCLUDE_DIR}
PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR}
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(byte_stream_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(channel_arguments_test
test/cpp/common/channel_arguments_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -961,7 +961,6 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test
bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test
channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test
check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive
chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test
@ -1127,6 +1126,7 @@ bm_fullstack_trickle: $(BINDIR)/$(CONFIG)/bm_fullstack_trickle
bm_fullstack_unary_ping_pong: $(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong
bm_metadata: $(BINDIR)/$(CONFIG)/bm_metadata
bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset
byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test
channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
check_gcp_environment_linux_test: $(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test
@ -1399,7 +1399,6 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/bad_server_response_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \
$(BINDIR)/$(CONFIG)/byte_stream_test \
$(BINDIR)/$(CONFIG)/channel_create_test \
$(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \
$(BINDIR)/$(CONFIG)/chttp2_stream_map_test \
@ -1613,6 +1612,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong \
$(BINDIR)/$(CONFIG)/bm_metadata \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/byte_stream_test \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \
@ -1779,6 +1779,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_fullstack_unary_ping_pong \
$(BINDIR)/$(CONFIG)/bm_metadata \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/byte_stream_test \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/check_gcp_environment_linux_test \
@ -1885,8 +1886,6 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
$(E) "[RUN] Testing bin_encoder_test"
$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
$(E) "[RUN] Testing byte_stream_test"
$(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 )
$(E) "[RUN] Testing channel_create_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 )
$(E) "[RUN] Testing chttp2_hpack_encoder_test"
@ -2203,6 +2202,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/bm_metadata || ( echo test bm_metadata failed ; exit 1 )
$(E) "[RUN] Testing bm_pollset"
$(Q) $(BINDIR)/$(CONFIG)/bm_pollset || ( echo test bm_pollset failed ; exit 1 )
$(E) "[RUN] Testing byte_stream_test"
$(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 )
$(E) "[RUN] Testing channel_arguments_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
$(E) "[RUN] Testing channel_filter_test"
@ -10128,38 +10129,6 @@ endif
endif
BYTE_STREAM_TEST_SRC = \
test/core/transport/byte_stream_test.cc \
BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BYTE_STREAM_TEST_OBJS:.o=.dep)
endif
endif
CHANNEL_CREATE_TEST_SRC = \
test/core/surface/channel_create_test.cc \
@ -15830,6 +15799,49 @@ endif
endif
BYTE_STREAM_TEST_SRC = \
test/core/transport/byte_stream_test.cc \
BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/byte_stream_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/byte_stream_test: $(PROTOBUF_DEP) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/byte_stream_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BYTE_STREAM_TEST_OBJS:.o=.dep)
endif
endif
CHANNEL_ARGUMENTS_TEST_SRC = \
test/cpp/common/channel_arguments_test.cc \

@ -1986,17 +1986,6 @@ targets:
- grpc_test_util
- grpc
uses_polling: false
- name: byte_stream_test
build: test
language: c
src:
- test/core/transport/byte_stream_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
uses_polling: false
- name: channel_create_test
build: test
language: c
@ -4071,6 +4060,18 @@ targets:
- mac
- linux
- posix
- name: byte_stream_test
gtest: true
build: test
language: c++
src:
- test/core/transport/byte_stream_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
uses_polling: false
- name: channel_arguments_test
gtest: true
build: test

@ -798,7 +798,8 @@ typedef struct {
grpc_linked_mdelem* send_initial_metadata_storage;
grpc_metadata_batch send_initial_metadata;
// For send_message.
grpc_caching_byte_stream send_message;
grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
send_message;
// For send_trailing_metadata.
grpc_linked_mdelem* send_trailing_metadata_storage;
grpc_metadata_batch send_trailing_metadata;
@ -808,7 +809,7 @@ typedef struct {
bool trailing_metadata_available;
// For intercepting recv_message.
grpc_closure recv_message_ready;
grpc_byte_stream* recv_message;
grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
// For intercepting recv_trailing_metadata.
grpc_metadata_batch recv_trailing_metadata;
grpc_transport_stream_stats collect_stats;
@ -914,12 +915,12 @@ typedef struct client_channel_call_data {
gpr_atm* peer_string;
// send_message
// When we get a send_message op, we replace the original byte stream
// with a grpc_caching_byte_stream that caches the slices to a
// local buffer for use in retries.
// with a CachingByteStream that caches the slices to a local buffer for
// use in retries.
// Note: We inline the cache for the first 3 send_message ops and use
// dynamic allocation after that. This number was essentially picked
// at random; it could be changed in the future to tune performance.
grpc_core::InlinedVector<grpc_byte_stream_cache*, 3> send_messages;
grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages;
// send_trailing_metadata
bool seen_send_trailing_metadata;
grpc_linked_mdelem* send_trailing_metadata_storage;
@ -964,10 +965,11 @@ static void maybe_cache_send_ops_for_batch(call_data* calld,
}
// Set up cache for send_message ops.
if (batch->send_message) {
grpc_byte_stream_cache* cache = (grpc_byte_stream_cache*)gpr_arena_alloc(
calld->arena, sizeof(grpc_byte_stream_cache));
grpc_byte_stream_cache_init(cache,
batch->payload->send_message.send_message);
grpc_core::ByteStreamCache* cache =
static_cast<grpc_core::ByteStreamCache*>(
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
new (cache) grpc_core::ByteStreamCache(
std::move(batch->payload->send_message.send_message));
calld->send_messages.push_back(cache);
}
// Save metadata batch for send_trailing_metadata ops.
@ -1002,7 +1004,7 @@ static void free_cached_send_op_data_after_commit(
"]",
chand, calld, i);
}
grpc_byte_stream_cache_destroy(calld->send_messages[i]);
calld->send_messages[i]->Destroy();
}
if (retry_state->completed_send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@ -1026,8 +1028,8 @@ static void free_cached_send_op_data_for_completed_batch(
"]",
chand, calld, retry_state->completed_send_message_count - 1);
}
grpc_byte_stream_cache_destroy(
calld->send_messages[retry_state->completed_send_message_count - 1]);
calld->send_messages[retry_state->completed_send_message_count - 1]
->Destroy();
}
if (batch_data->batch.send_trailing_metadata) {
grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
@ -1079,7 +1081,7 @@ static void pending_batches_add(grpc_call_element* elem,
if (batch->send_message) {
calld->pending_send_message = true;
calld->bytes_buffered_for_retry +=
batch->payload->send_message.send_message->length;
batch->payload->send_message.send_message->length();
}
if (batch->send_trailing_metadata) {
calld->pending_send_trailing_metadata = true;
@ -1680,7 +1682,7 @@ static void invoke_recv_message_callback(void* arg, grpc_error* error) {
GPR_ASSERT(pending != nullptr);
// Return payload.
*pending->batch->payload->recv_message.recv_message =
batch_data->recv_message;
std::move(batch_data->recv_message);
// Update bookkeeping.
// Note: Need to do this before invoking the callback, since invoking
// the callback will result in yielding the call combiner.
@ -2124,13 +2126,13 @@ static void add_retriable_send_message_op(
"chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
chand, calld, retry_state->started_send_message_count);
}
grpc_byte_stream_cache* cache =
grpc_core::ByteStreamCache* cache =
calld->send_messages[retry_state->started_send_message_count];
++retry_state->started_send_message_count;
grpc_caching_byte_stream_init(&batch_data->send_message, cache);
batch_data->send_message.Init(cache);
batch_data->batch.send_message = true;
batch_data->batch.payload->send_message.send_message =
&batch_data->send_message.base;
batch_data->batch.payload->send_message.send_message.reset(
batch_data->send_message.get());
}
// Adds retriable send_trailing_metadata op to batch_data.

@ -20,9 +20,11 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <stdint.h>
#include <string.h>
#include "src/core/ext/filters/http/client/http_client_filter.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
@ -58,8 +60,9 @@ struct call_data {
// State for handling send_message ops.
grpc_transport_stream_op_batch* send_message_batch;
size_t send_message_bytes_read;
grpc_byte_stream_cache send_message_cache;
grpc_caching_byte_stream send_message_caching_stream;
grpc_core::ManualConstructor<grpc_core::ByteStreamCache> send_message_cache;
grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
send_message_caching_stream;
grpc_closure on_send_message_next_done;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
@ -166,7 +169,7 @@ static void recv_trailing_metadata_on_complete(void* user_data,
static void send_message_on_complete(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
grpc_byte_stream_cache_destroy(&calld->send_message_cache);
calld->send_message_cache.Destroy();
GRPC_CLOSURE_RUN(calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
}
@ -175,8 +178,7 @@ static void send_message_on_complete(void* arg, grpc_error* error) {
// calld->send_message_bytes_read.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
grpc_error* error = grpc_byte_stream_pull(
&calld->send_message_caching_stream.base, &incoming_slice);
grpc_error* error = calld->send_message_caching_stream->Pull(&incoming_slice);
if (error == GRPC_ERROR_NONE) {
calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
grpc_slice_unref_internal(incoming_slice);
@ -186,24 +188,23 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// Upon successful return, if calld->send_message_bytes_read ==
// calld->send_message_caching_stream.base.length, then we have completed
// calld->send_message_caching_stream->length(), then we have completed
// reading from the byte stream; otherwise, an async read has been dispatched
// and on_send_message_next_done() will be invoked when it is complete.
static grpc_error* read_all_available_send_message_data(call_data* calld) {
while (grpc_byte_stream_next(&calld->send_message_caching_stream.base,
~static_cast<size_t>(0),
&calld->on_send_message_next_done)) {
while (calld->send_message_caching_stream->Next(
SIZE_MAX, &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) return error;
if (calld->send_message_bytes_read ==
calld->send_message_caching_stream.base.length) {
calld->send_message_caching_stream->length()) {
break;
}
}
return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -222,7 +223,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
// here, then we know that all of the data was not available
// synchronously, so we were not able to do a cached call. Instead,
// we just reset the byte stream and then send down the batch as-is.
grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
calld->send_message_caching_stream->Reset();
grpc_call_next_op(elem, calld->send_message_batch);
}
@ -253,7 +254,7 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
batch->payload->send_message.send_message->length, true /* url_safe */,
batch->payload->send_message.send_message->length(), true /* url_safe */,
false /* multi_line */);
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
@ -265,9 +266,9 @@ static grpc_error* update_path_for_get(grpc_call_element* elem,
write_ptr += GRPC_SLICE_LENGTH(path_slice);
*write_ptr++ = '?';
char* payload_bytes =
slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
slice_buffer_to_string(calld->send_message_cache->cache_buffer());
grpc_base64_encode_core(write_ptr, payload_bytes,
batch->payload->send_message.send_message->length,
batch->payload->send_message.send_message->length(),
true /* url_safe */, false /* multi_line */);
gpr_free(payload_bytes);
/* remove trailing unused memory and add trailing 0 to terminate string */
@ -326,15 +327,14 @@ static void hc_start_transport_stream_op_batch(
if (batch->send_message &&
(batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
batch->payload->send_message.send_message->length <
batch->payload->send_message.send_message->length() <
channeld->max_payload_size_for_get) {
calld->send_message_bytes_read = 0;
grpc_byte_stream_cache_init(&calld->send_message_cache,
batch->payload->send_message.send_message);
grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
&calld->send_message_cache);
batch->payload->send_message.send_message =
&calld->send_message_caching_stream.base;
calld->send_message_cache.Init(
std::move(batch->payload->send_message.send_message));
calld->send_message_caching_stream.Init(calld->send_message_cache.get());
batch->payload->send_message.send_message.reset(
calld->send_message_caching_stream.get());
calld->original_send_message_on_complete = batch->on_complete;
batch->on_complete = &calld->send_message_on_complete;
calld->send_message_batch = batch;
@ -342,12 +342,12 @@ static void hc_start_transport_stream_op_batch(
if (error != GRPC_ERROR_NONE) goto done;
// If all the data has been read, then we can use GET.
if (calld->send_message_bytes_read ==
calld->send_message_caching_stream.base.length) {
calld->send_message_caching_stream->length()) {
method = GRPC_MDELEM_METHOD_GET;
error = update_path_for_get(elem, batch);
if (error != GRPC_ERROR_NONE) goto done;
batch->send_message = false;
grpc_byte_stream_destroy(&calld->send_message_caching_stream.base);
calld->send_message_caching_stream->Orphan();
} else {
// Not all data is available. The batch will be sent down
// asynchronously in on_send_message_next_done().

@ -32,6 +32,7 @@
#include "src/core/lib/compression/compression_internal.h"
#include "src/core/lib/compression/message_compress.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@ -62,7 +63,8 @@ struct call_data {
grpc_closure start_send_message_batch_in_call_combiner;
grpc_transport_stream_op_batch* send_message_batch;
grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */
grpc_slice_buffer_stream replacement_stream;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream>
replacement_stream;
grpc_closure* original_send_message_on_complete;
grpc_closure send_message_on_complete;
grpc_closure on_send_message_next_done;
@ -220,7 +222,7 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp);
uint32_t send_flags =
calld->send_message_batch->payload->send_message.send_message->flags;
calld->send_message_batch->payload->send_message.send_message->flags();
bool did_compress = grpc_msg_compress(calld->message_compression_algorithm,
&calld->slices, &tmp);
if (did_compress) {
@ -253,12 +255,9 @@ static void finish_send_message(grpc_call_element* elem) {
grpc_slice_buffer_destroy_internal(&tmp);
// Swap out the original byte stream with our new one and send the
// batch down.
grpc_byte_stream_destroy(
calld->send_message_batch->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
send_flags);
calld->send_message_batch->payload->send_message.send_message =
&calld->replacement_stream.base;
calld->replacement_stream.Init(&calld->slices, send_flags);
calld->send_message_batch->payload->send_message.send_message.reset(
calld->replacement_stream.get());
calld->original_send_message_on_complete =
calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
@ -278,9 +277,9 @@ static void fail_send_message_batch_in_call_combiner(void* arg,
// Pulls a slice from the send_message byte stream and adds it to calld->slices.
static grpc_error* pull_slice_from_send_message(call_data* calld) {
grpc_slice incoming_slice;
grpc_error* error = grpc_byte_stream_pull(
calld->send_message_batch->payload->send_message.send_message,
&incoming_slice);
grpc_error* error =
calld->send_message_batch->payload->send_message.send_message->Pull(
&incoming_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
@ -289,12 +288,11 @@ static grpc_error* pull_slice_from_send_message(call_data* calld) {
// Reads as many slices as possible from the send_message byte stream.
// If all data has been read, invokes finish_send_message(). Otherwise,
// an async call to grpc_byte_stream_next() has been started, which will
// an async call to ByteStream::Next() has been started, which will
// eventually result in calling on_send_message_next_done().
static void continue_reading_send_message(grpc_call_element* elem) {
call_data* calld = static_cast<call_data*>(elem->call_data);
while (grpc_byte_stream_next(
calld->send_message_batch->payload->send_message.send_message,
while (calld->send_message_batch->payload->send_message.send_message->Next(
~static_cast<size_t>(0), &calld->on_send_message_next_done)) {
grpc_error* error = pull_slice_from_send_message(calld);
if (error != GRPC_ERROR_NONE) {
@ -303,15 +301,15 @@ static void continue_reading_send_message(grpc_call_element* elem) {
GRPC_ERROR_UNREF(error);
return;
}
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
if (calld->slices.length == calld->send_message_batch->payload->send_message
.send_message->length()) {
finish_send_message(elem);
break;
}
}
}
// Async callback for grpc_byte_stream_next().
// Async callback for ByteStream::Next().
static void on_send_message_next_done(void* arg, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
call_data* calld = static_cast<call_data*>(elem->call_data);
@ -328,7 +326,7 @@ static void on_send_message_next_done(void* arg, grpc_error* error) {
return;
}
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
calld->send_message_batch->payload->send_message.send_message->length()) {
finish_send_message(elem);
} else {
continue_reading_send_message(elem);
@ -340,7 +338,8 @@ static void start_send_message_batch(void* arg, grpc_error* unused) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (skip_compression(
elem,
calld->send_message_batch->payload->send_message.send_message->flags,
calld->send_message_batch->payload->send_message.send_message
->flags(),
calld->send_initial_metadata_state == HAS_COMPRESSION_ALGORITHM)) {
send_message_batch_continue(elem);
} else {
@ -365,9 +364,7 @@ static void compress_start_transport_stream_op_batch(
grpc_schedule_on_exec_ctx),
GRPC_ERROR_REF(calld->cancel_error), "failing send_message op");
} else {
grpc_byte_stream_shutdown(
calld->send_message_batch->payload->send_message.send_message,
calld->send_message_batch->payload->send_message.send_message->Shutdown(
GRPC_ERROR_REF(calld->cancel_error));
}
}

@ -23,6 +23,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <string.h>
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/percent_encoding.h"
@ -53,8 +54,8 @@ struct call_data {
*/
grpc_closure* recv_message_ready;
grpc_closure* on_complete;
grpc_byte_stream** pp_recv_message;
grpc_slice_buffer_stream read_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* pp_recv_message;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> read_stream;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
@ -232,7 +233,7 @@ static grpc_error* server_filter_incoming_metadata(grpc_call_element* elem,
grpc_base64_decode_with_len(
reinterpret_cast<const char*> GRPC_SLICE_START_PTR(query_slice),
GRPC_SLICE_LENGTH(query_slice), k_url_safe));
grpc_slice_buffer_stream_init(&calld->read_stream, &read_slice_buffer, 0);
calld->read_stream.Init(&read_slice_buffer, 0);
grpc_slice_buffer_destroy_internal(&read_slice_buffer);
calld->seen_path_with_query = true;
grpc_slice_unref_internal(query_slice);
@ -281,10 +282,10 @@ static void hs_on_complete(void* user_data, grpc_error* err) {
call_data* calld = static_cast<call_data*>(elem->call_data);
/* Call recv_message_ready if we got the payload via the path field */
if (calld->seen_path_with_query && calld->recv_message_ready != nullptr) {
*calld->pp_recv_message =
calld->payload_bin_delivered
? nullptr
: reinterpret_cast<grpc_byte_stream*>(&calld->read_stream);
calld->pp_recv_message->reset(
calld->payload_bin_delivered ? nullptr
: reinterpret_cast<grpc_core::ByteStream*>(
calld->read_stream.get()));
// Re-enter call combiner for recv_message_ready, since the surface
// code will release the call combiner for each callback it receives.
GRPC_CALL_COMBINER_START(calld->call_combiner, calld->recv_message_ready,
@ -405,7 +406,7 @@ static void destroy_call_elem(grpc_call_element* elem,
grpc_closure* ignored) {
call_data* calld = static_cast<call_data*>(elem->call_data);
if (calld->seen_path_with_query && !calld->payload_bin_delivered) {
grpc_byte_stream_destroy(&calld->read_stream.base);
calld->read_stream->Orphan();
}
}

@ -100,7 +100,7 @@ struct call_data {
// call our next_recv_message_ready member after handling it.
grpc_closure recv_message_ready;
// Used by recv_message_ready.
grpc_byte_stream** recv_message;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
// Original recv_message_ready callback, invoked after our own.
grpc_closure* next_recv_message_ready;
};
@ -121,12 +121,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
call_data* calld = static_cast<call_data*>(elem->call_data);
if (*calld->recv_message != nullptr && calld->limits.max_recv_size >= 0 &&
(*calld->recv_message)->length >
(*calld->recv_message)->length() >
static_cast<size_t>(calld->limits.max_recv_size)) {
char* message_string;
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, calld->limits.max_recv_size);
(*calld->recv_message)->length(), calld->limits.max_recv_size);
grpc_error* new_error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
@ -150,11 +150,11 @@ static void start_transport_stream_op_batch(
call_data* calld = static_cast<call_data*>(elem->call_data);
// Check max send message size.
if (op->send_message && calld->limits.max_send_size >= 0 &&
op->payload->send_message.send_message->length >
op->payload->send_message.send_message->length() >
static_cast<size_t>(calld->limits.max_send_size)) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->payload->send_message.send_message->length,
op->payload->send_message.send_message->length(),
calld->limits.max_send_size);
grpc_transport_stream_op_batch_finish_with_failure(
op,

@ -93,7 +93,9 @@ static void start_transport_stream_op_batch(
/* Send message happens after client's user-agent (initial metadata) is
* received, so workaround_active must be set already */
if (calld->workaround_active) {
op->payload->send_message.send_message->flags |= GRPC_WRITE_NO_COMPRESS;
op->payload->send_message.send_message->set_flags(
op->payload->send_message.send_message->flags() |
GRPC_WRITE_NO_COMPRESS);
}
}

@ -39,6 +39,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
@ -117,12 +118,6 @@ static void connectivity_state_set(grpc_chttp2_transport* t,
grpc_connectivity_state state,
grpc_error* error, const char* reason);
static void incoming_byte_stream_destroy_locked(void* byte_stream,
grpc_error* error_ignored);
static void incoming_byte_stream_publish_error(
grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs);
static void benign_reclaimer_locked(void* t, grpc_error* error);
static void destructive_reclaimer_locked(void* t, grpc_error* error);
@ -662,8 +657,8 @@ static int init_stream(grpc_transport* gt, grpc_stream* gs,
s->t = t;
s->refcount = refcount;
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
read-closed. The others are for Chttp2IncomingByteStreams that are
actively reading */
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@ -1256,8 +1251,7 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
abort(); /* TODO(ctiller): what cleanup here? */
return; /* early out */
}
if (s->fetched_send_message_length == s->fetching_send_message->length) {
grpc_byte_stream_destroy(s->fetching_send_message);
if (s->fetched_send_message_length == s->fetching_send_message->length()) {
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(
@ -1274,20 +1268,19 @@ static void continue_fetching_send_locked(grpc_chttp2_transport* t,
cb->closure = s->fetching_send_message_finished;
s->fetching_send_message_finished = nullptr;
grpc_chttp2_write_cb** list =
s->fetching_send_message->flags & GRPC_WRITE_THROUGH
s->fetching_send_message->flags() & GRPC_WRITE_THROUGH
? &s->on_write_finished_cbs
: &s->on_flow_controlled_cbs;
cb->next = *list;
*list = cb;
}
s->fetching_send_message = nullptr;
s->fetching_send_message.reset();
return; /* early out */
} else if (grpc_byte_stream_next(s->fetching_send_message, UINT32_MAX,
&s->complete_fetch_locked)) {
grpc_error* error =
grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
} else if (s->fetching_send_message->Next(UINT32_MAX,
&s->complete_fetch_locked)) {
grpc_error* error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error != GRPC_ERROR_NONE) {
grpc_byte_stream_destroy(s->fetching_send_message);
s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
} else {
add_fetched_slice_locked(t, s);
@ -1300,14 +1293,14 @@ static void complete_fetch_locked(void* gs, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(gs);
grpc_chttp2_transport* t = s->t;
if (error == GRPC_ERROR_NONE) {
error = grpc_byte_stream_pull(s->fetching_send_message, &s->fetching_slice);
error = s->fetching_send_message->Pull(&s->fetching_slice);
if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(t, s);
continue_fetching_send_locked(t, s);
}
}
if (error != GRPC_ERROR_NONE) {
grpc_byte_stream_destroy(s->fetching_send_message);
s->fetching_send_message.reset();
grpc_chttp2_cancel_stream(t, s, error);
}
}
@ -1439,7 +1432,7 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->id != 0);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
(op->payload->send_message.send_message->flags &
(op->payload->send_message.send_message->flags() &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
@ -1466,7 +1459,7 @@ static void perform_stream_op_locked(void* stream_op,
if (op->send_message) {
GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
op->payload->send_message.send_message->length);
op->payload->send_message.send_message->length());
on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
if (s->write_closed) {
@ -1475,7 +1468,7 @@ static void perform_stream_op_locked(void* stream_op,
// streaming call might send another message before getting a
// recv_message failure, breaking out of its loop, and then
// starting recv_trailing_metadata.
grpc_byte_stream_destroy(op->payload->send_message.send_message);
op->payload->send_message.send_message.reset();
grpc_chttp2_complete_closure_step(
t, s, &s->fetching_send_message_finished,
t->is_client && s->received_trailing_metadata
@ -1488,14 +1481,15 @@ static void perform_stream_op_locked(void* stream_op,
GPR_ASSERT(s->fetching_send_message == nullptr);
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
&s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
uint32_t flags = op_payload->send_message.send_message->flags;
uint32_t flags = op_payload->send_message.send_message->flags();
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->length;
size_t len = op_payload->send_message.send_message->length();
frame_hdr[1] = static_cast<uint8_t>(len >> 24);
frame_hdr[2] = static_cast<uint8_t>(len >> 16);
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
s->fetching_send_message = op_payload->send_message.send_message;
s->fetching_send_message =
std::move(op_payload->send_message.send_message);
s->fetched_send_message_length = 0;
s->next_message_end_offset =
s->flow_controlled_bytes_written +
@ -1947,12 +1941,12 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
}
if (s->pending_byte_stream) {
if (s->on_next != nullptr) {
grpc_chttp2_incoming_byte_stream* bs = s->data_parser.parsing_frame;
grpc_core::Chttp2IncomingByteStream* bs = s->data_parser.parsing_frame;
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
incoming_byte_stream_publish_error(bs, error);
incoming_byte_stream_unref(bs);
bs->PublishError(error);
bs->Unref();
s->data_parser.parsing_frame = nullptr;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
@ -2096,10 +2090,7 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
GRPC_ERROR_REF(error),
"send_trailing_metadata_finished");
if (s->fetching_send_message != nullptr) {
grpc_byte_stream_destroy(s->fetching_send_message);
s->fetching_send_message = nullptr;
}
s->fetching_send_message.reset();
grpc_chttp2_complete_closure_step(t, s, &s->fetching_send_message_finished,
GRPC_ERROR_REF(error),
"fetching_send_message_finished");
@ -2715,7 +2706,6 @@ static void set_pollset_set(grpc_transport* gt, grpc_stream* gs,
static void reset_byte_stream(void* arg, grpc_error* error) {
grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(arg);
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(s->t, s);
@ -2731,22 +2721,56 @@ static void reset_byte_stream(void* arg, grpc_error* error) {
}
}
static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream* bs) {
if (gpr_unref(&bs->refs)) {
gpr_free(bs);
namespace grpc_core {
Chttp2IncomingByteStream::Chttp2IncomingByteStream(
grpc_chttp2_transport* transport, grpc_chttp2_stream* stream,
uint32_t frame_size, uint32_t flags)
: ByteStream(frame_size, flags),
transport_(transport),
stream_(stream),
remaining_bytes_(frame_size) {
gpr_ref_init(&refs_, 2);
GRPC_ERROR_UNREF(stream->byte_stream_error);
stream->byte_stream_error = GRPC_ERROR_NONE;
}
void Chttp2IncomingByteStream::OrphanLocked(void* arg,
grpc_error* error_ignored) {
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
grpc_chttp2_stream* s = bs->stream_;
grpc_chttp2_transport* t = s->t;
bs->Unref();
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
}
void Chttp2IncomingByteStream::Orphan() {
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&destroy_action_,
&Chttp2IncomingByteStream::OrphanLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE);
}
void Chttp2IncomingByteStream::Unref() {
if (gpr_unref(&refs_)) {
Delete(this);
}
}
static void incoming_byte_stream_next_locked(void* argp,
grpc_error* error_ignored) {
grpc_chttp2_incoming_byte_stream* bs =
static_cast<grpc_chttp2_incoming_byte_stream*>(argp);
grpc_chttp2_transport* t = bs->transport;
grpc_chttp2_stream* s = bs->stream;
void Chttp2IncomingByteStream::Ref() { gpr_ref(&refs_); }
void Chttp2IncomingByteStream::NextLocked(void* arg,
grpc_error* error_ignored) {
Chttp2IncomingByteStream* bs = static_cast<Chttp2IncomingByteStream*>(arg);
grpc_chttp2_transport* t = bs->transport_;
grpc_chttp2_stream* s = bs->stream_;
size_t cur_length = s->frame_storage.length;
if (!s->read_closed) {
s->flow_control->IncomingByteStreamUpdate(bs->next_action.max_size_hint,
s->flow_control->IncomingByteStreamUpdate(bs->next_action_.max_size_hint,
cur_length);
grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
}
@ -2755,22 +2779,22 @@ static void incoming_byte_stream_next_locked(void* argp,
grpc_slice_buffer_swap(&s->frame_storage,
&s->unprocessed_incoming_frames_buffer);
s->unprocessed_incoming_frames_decompressed = false;
GRPC_CLOSURE_SCHED(bs->next_action.on_complete, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete, GRPC_ERROR_NONE);
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
incoming_byte_stream_unref(s->data_parser.parsing_frame);
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
if (bs->remaining_bytes_ != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(bs->next_action.on_complete,
GRPC_CLOSURE_SCHED(bs->next_action_.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != nullptr) {
incoming_byte_stream_unref(s->data_parser.parsing_frame);
s->data_parser.parsing_frame->Unref();
s->data_parser.parsing_frame = nullptr;
}
} else {
@ -2778,122 +2802,94 @@ static void incoming_byte_stream_next_locked(void* argp,
GPR_ASSERT(false);
}
} else {
s->on_next = bs->next_action.on_complete;
s->on_next = bs->next_action_.on_complete;
}
incoming_byte_stream_unref(bs);
bs->Unref();
}
static bool incoming_byte_stream_next(grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
bool Chttp2IncomingByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
GPR_TIMER_SCOPE("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream* bs =
reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
grpc_chttp2_stream* s = bs->stream;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
return true;
} else {
gpr_ref(&bs->refs);
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
Ref();
next_action_.max_size_hint = max_size_hint;
next_action_.on_complete = on_complete;
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&bs->next_action.closure,
incoming_byte_stream_next_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_CLOSURE_INIT(&next_action_.closure,
&Chttp2IncomingByteStream::NextLocked, this,
grpc_combiner_scheduler(transport_->combiner)),
GRPC_ERROR_NONE);
return false;
}
}
static grpc_error* incoming_byte_stream_pull(grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_error* Chttp2IncomingByteStream::Pull(grpc_slice* slice) {
GPR_TIMER_SCOPE("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream* bs =
reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
grpc_chttp2_stream* s = bs->stream;
grpc_error* error;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
if (!s->unprocessed_incoming_frames_decompressed) {
if (stream_->unprocessed_incoming_frames_buffer.length > 0) {
if (!stream_->unprocessed_incoming_frames_decompressed) {
bool end_of_context;
if (!s->stream_decompression_ctx) {
s->stream_decompression_ctx = grpc_stream_compression_context_create(
s->stream_decompression_method);
if (!stream_->stream_decompression_ctx) {
stream_->stream_decompression_ctx =
grpc_stream_compression_context_create(
stream_->stream_decompression_method);
}
if (!grpc_stream_decompress(s->stream_decompression_ctx,
&s->unprocessed_incoming_frames_buffer,
&s->decompressed_data_buffer, nullptr,
if (!grpc_stream_decompress(stream_->stream_decompression_ctx,
&stream_->unprocessed_incoming_frames_buffer,
&stream_->decompressed_data_buffer, nullptr,
MAX_SIZE_T, &end_of_context)) {
error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Stream decompression error.");
return error;
}
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->decompressed_data_buffer);
s->unprocessed_incoming_frames_decompressed = true;
GPR_ASSERT(stream_->unprocessed_incoming_frames_buffer.length == 0);
grpc_slice_buffer_swap(&stream_->unprocessed_incoming_frames_buffer,
&stream_->decompressed_data_buffer);
stream_->unprocessed_incoming_frames_decompressed = true;
if (end_of_context) {
grpc_stream_compression_context_destroy(s->stream_decompression_ctx);
s->stream_decompression_ctx = nullptr;
grpc_stream_compression_context_destroy(
stream_->stream_decompression_ctx);
stream_->stream_decompression_ctx = nullptr;
}
if (s->unprocessed_incoming_frames_buffer.length == 0) {
if (stream_->unprocessed_incoming_frames_buffer.length == 0) {
*slice = grpc_empty_slice();
}
}
error = grpc_deframe_unprocessed_incoming_frames(
&s->data_parser, s, &s->unprocessed_incoming_frames_buffer, slice,
nullptr);
&stream_->data_parser, stream_,
&stream_->unprocessed_incoming_frames_buffer, slice, nullptr);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
return GRPC_ERROR_NONE;
}
static void incoming_byte_stream_destroy_locked(void* byte_stream,
grpc_error* error_ignored);
static void incoming_byte_stream_destroy(grpc_byte_stream* byte_stream) {
GPR_TIMER_SCOPE("incoming_byte_stream_destroy", 0);
grpc_chttp2_incoming_byte_stream* bs =
reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_INIT(&bs->destroy_action,
incoming_byte_stream_destroy_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner)),
GRPC_ERROR_NONE);
}
static void incoming_byte_stream_publish_error(
grpc_chttp2_incoming_byte_stream* bs, grpc_error* error) {
grpc_chttp2_stream* s = bs->stream;
void Chttp2IncomingByteStream::PublishError(grpc_error* error) {
GPR_ASSERT(error != GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(s->on_next, GRPC_ERROR_REF(error));
s->on_next = nullptr;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(bs->transport, bs->stream, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(stream_->on_next, GRPC_ERROR_REF(error));
stream_->on_next = nullptr;
GRPC_ERROR_UNREF(stream_->byte_stream_error);
stream_->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(transport_, stream_, GRPC_ERROR_REF(error));
}
grpc_error* grpc_chttp2_incoming_byte_stream_push(
grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
grpc_slice* slice_out) {
grpc_chttp2_stream* s = bs->stream;
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
grpc_error* Chttp2IncomingByteStream::Push(grpc_slice slice,
grpc_slice* slice_out) {
if (remaining_bytes_ < GRPC_SLICE_LENGTH(slice)) {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(slice);
return error;
} else {
bs->remaining_bytes -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
remaining_bytes_ -= static_cast<uint32_t> GRPC_SLICE_LENGTH(slice);
if (slice_out != nullptr) {
*slice_out = slice;
}
@ -2901,66 +2897,25 @@ grpc_error* grpc_chttp2_incoming_byte_stream_push(
}
}
grpc_error* grpc_chttp2_incoming_byte_stream_finished(
grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
bool reset_on_error) {
grpc_chttp2_stream* s = bs->stream;
grpc_error* Chttp2IncomingByteStream::Finished(grpc_error* error,
bool reset_on_error) {
if (error == GRPC_ERROR_NONE) {
if (bs->remaining_bytes != 0) {
if (remaining_bytes_ != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
}
if (error != GRPC_ERROR_NONE && reset_on_error) {
GRPC_CLOSURE_SCHED(&s->reset_byte_stream, GRPC_ERROR_REF(error));
GRPC_CLOSURE_SCHED(&stream_->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(bs);
Unref();
return error;
}
static void incoming_byte_stream_shutdown(grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_chttp2_incoming_byte_stream* bs =
reinterpret_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
bs, error, true /* reset_on_error */));
void Chttp2IncomingByteStream::Shutdown(grpc_error* error) {
GRPC_ERROR_UNREF(Finished(error, true /* reset_on_error */));
}
static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
incoming_byte_stream_next, incoming_byte_stream_pull,
incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
static void incoming_byte_stream_destroy_locked(void* byte_stream,
grpc_error* error_ignored) {
grpc_chttp2_incoming_byte_stream* bs =
static_cast<grpc_chttp2_incoming_byte_stream*>(byte_stream);
grpc_chttp2_stream* s = bs->stream;
grpc_chttp2_transport* t = s->t;
GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
incoming_byte_stream_unref(bs);
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
}
grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
uint32_t flags) {
grpc_chttp2_incoming_byte_stream* incoming_byte_stream =
static_cast<grpc_chttp2_incoming_byte_stream*>(
gpr_malloc(sizeof(*incoming_byte_stream)));
incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
return incoming_byte_stream;
}
} // namespace grpc_core
/*******************************************************************************
* RESOURCE QUOTAS

@ -27,6 +27,7 @@
#include <grpc/support/string_util.h>
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/transport.h"
@ -39,8 +40,7 @@ grpc_error* grpc_chttp2_data_parser_init(grpc_chttp2_data_parser* parser) {
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser* parser) {
if (parser->parsing_frame != nullptr) {
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
parser->parsing_frame,
GRPC_ERROR_UNREF(parser->parsing_frame->Finished(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
}
GRPC_ERROR_UNREF(parser->error);
@ -100,7 +100,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
grpc_byte_stream** stream_out) {
grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_chttp2_transport* t = s->t;
@ -197,12 +197,11 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
p->parsing_frame = grpc_core::New<grpc_core::Chttp2IncomingByteStream>(
t, s, p->frame_size, message_flags);
*stream_out = &p->parsing_frame->base;
if (p->parsing_frame->remaining_bytes == 0) {
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
p->parsing_frame, GRPC_ERROR_NONE, true));
stream_out->reset(p->parsing_frame);
if (p->parsing_frame->remaining_bytes() == 0) {
GRPC_ERROR_UNREF(p->parsing_frame->Finished(GRPC_ERROR_NONE, true));
p->parsing_frame = nullptr;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
@ -226,8 +225,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
if (remaining == p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_push(
p->parsing_frame,
(error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@ -235,8 +233,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
p->parsing_frame, GRPC_ERROR_NONE, true))) {
(error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}
@ -247,8 +244,7 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
} else if (remaining < p->frame_size) {
s->stats.incoming.data_bytes += remaining;
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_push(
p->parsing_frame,
(error = p->parsing_frame->Push(
grpc_slice_sub(slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(end - beg)),
slice_out))) {
@ -261,18 +257,16 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames(
GPR_ASSERT(remaining > p->frame_size);
s->stats.incoming.data_bytes += p->frame_size;
if (GRPC_ERROR_NONE !=
(grpc_chttp2_incoming_byte_stream_push(
p->parsing_frame,
p->parsing_frame->Push(
grpc_slice_sub(
slice, static_cast<size_t>(cur - beg),
static_cast<size_t>(cur + p->frame_size - beg)),
slice_out))) {
slice_out)) {
grpc_slice_unref_internal(slice);
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
p->parsing_frame, GRPC_ERROR_NONE, true))) {
(error = p->parsing_frame->Finished(GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(slice);
return error;
}

@ -40,8 +40,9 @@ typedef enum {
GRPC_CHTTP2_DATA_ERROR
} grpc_chttp2_stream_state;
typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream;
namespace grpc_core {
class Chttp2IncomingByteStream;
} // namespace grpc_core
typedef struct {
grpc_chttp2_stream_state state;
@ -50,7 +51,7 @@ typedef struct {
grpc_error* error;
bool is_frame_compressed;
grpc_chttp2_incoming_byte_stream* parsing_frame;
grpc_core::Chttp2IncomingByteStream* parsing_frame;
} grpc_chttp2_data_parser;
/* initialize per-stream state for data frame parsing */
@ -79,6 +80,6 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
grpc_error* grpc_deframe_unprocessed_incoming_frames(
grpc_chttp2_data_parser* p, grpc_chttp2_stream* s,
grpc_slice_buffer* slices, grpc_slice* slice_out,
grpc_byte_stream** stream_out);
grpc_core::OrphanablePtr<grpc_core::ByteStream>* stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */

@ -203,18 +203,58 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_write_cb* next;
} grpc_chttp2_write_cb;
/* forward declared in frame_data.h */
struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
namespace grpc_core {
class Chttp2IncomingByteStream : public ByteStream {
public:
Chttp2IncomingByteStream(grpc_chttp2_transport* transport,
grpc_chttp2_stream* stream, uint32_t frame_size,
uint32_t flags);
void Orphan() override;
bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
grpc_error* Pull(grpc_slice* slice) override;
void Shutdown(grpc_error* error) override;
// TODO(roth): When I converted this class to C++, I wanted to make it
// inherit from RefCounted or InternallyRefCounted instead of continuing
// to use its own custom ref-counting code. However, that would require
// using multiple inheritence, which sucks in general. And to make matters
// worse, it causes problems with our New<> and Delete<> wrappers.
// Specifically, unless RefCounted is first in the list of parent classes,
// it will see a different value of the address of the object than the one
// we actually allocated, in which case gpr_free() will be called on a
// different address than the one we got from gpr_malloc(), thus causing a
// crash. Given the fragility of depending on that, as well as a desire to
// avoid multiple inheritence in general, I've decided to leave this
// alone for now. We can revisit this once we're able to link against
// libc++, at which point we can eliminate New<> and Delete<> and
// switch to std::shared_ptr<>.
void Ref();
void Unref();
void PublishError(grpc_error* error);
grpc_error* Push(grpc_slice slice, grpc_slice* slice_out);
grpc_chttp2_transport* transport; /* immutable */
grpc_chttp2_stream* stream; /* immutable */
grpc_error* Finished(grpc_error* error, bool reset_on_error);
uint32_t remaining_bytes() const { return remaining_bytes_; }
private:
static void NextLocked(void* arg, grpc_error* error_ignored);
static void OrphanLocked(void* arg, grpc_error* error_ignored);
grpc_chttp2_transport* transport_; // Immutable.
grpc_chttp2_stream* stream_; // Immutable.
gpr_refcount refs_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
uint32_t remaining_bytes;
uint32_t remaining_bytes_;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
@ -223,11 +263,12 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_closure closure;
size_t max_size_hint;
grpc_closure* on_complete;
} next_action;
grpc_closure destroy_action;
grpc_closure finished_action;
} next_action_;
grpc_closure destroy_action_;
};
} // namespace grpc_core
typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
@ -456,7 +497,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* send_trailing_metadata;
grpc_closure* send_trailing_metadata_finished;
grpc_byte_stream* fetching_send_message;
grpc_core::OrphanablePtr<grpc_core::ByteStream> fetching_send_message;
uint32_t fetched_send_message_length;
grpc_slice fetching_slice;
int64_t next_message_end_offset;
@ -468,7 +509,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* recv_initial_metadata;
grpc_closure* recv_initial_metadata_ready;
bool* trailing_metadata_available;
grpc_byte_stream** recv_message;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
grpc_closure* recv_message_ready;
grpc_metadata_batch* recv_trailing_metadata;
grpc_closure* recv_trailing_metadata_finished;
@ -719,18 +760,6 @@ void grpc_chttp2_unref_transport(grpc_chttp2_transport* t);
void grpc_chttp2_ref_transport(grpc_chttp2_transport* t);
#endif
grpc_chttp2_incoming_byte_stream* grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, uint32_t frame_size,
uint32_t flags);
grpc_error* grpc_chttp2_incoming_byte_stream_push(
grpc_chttp2_incoming_byte_stream* bs, grpc_slice slice,
grpc_slice* slice_out);
grpc_error* grpc_chttp2_incoming_byte_stream_finished(
grpc_chttp2_incoming_byte_stream* bs, grpc_error* error,
bool reset_on_error);
void grpc_chttp2_incoming_byte_stream_notify(
grpc_chttp2_incoming_byte_stream* bs, grpc_error* error);
void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
/** Add a new ping strike to ping_recv_state.ping_strikes. If

@ -31,6 +31,7 @@
#include "src/core/ext/transport/cronet/transport/cronet_transport.h"
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice_internal.h"
@ -122,7 +123,7 @@ struct read_state {
bool read_stream_closed;
/* vars for holding data destined for the application */
struct grpc_slice_buffer_stream sbs;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sbs;
grpc_slice_buffer read_slice_buffer;
/* vars for trailing metadata */
@ -1041,16 +1042,14 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer write_slice_buffer;
grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer);
if (1 != grpc_byte_stream_next(
stream_op->payload->send_message.send_message,
stream_op->payload->send_message.send_message->length,
if (1 != stream_op->payload->send_message.send_message->Next(
stream_op->payload->send_message.send_message->length(),
nullptr)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(stream_op->payload->send_message.send_message,
&slice)) {
stream_op->payload->send_message.send_message->Pull(&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
@ -1062,9 +1061,10 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
if (write_slice_buffer.count > 0) {
size_t write_buffer_size;
create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size,
stream_op->payload->send_message.send_message->flags);
create_grpc_frame(
&write_slice_buffer, &stream_state->ws.write_buffer,
&write_buffer_size,
stream_op->payload->send_message.send_message->flags());
CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
@ -1089,6 +1089,7 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
stream_op->payload->send_message.send_message.reset();
} else if (stream_op->send_trailing_metadata &&
op_can_be_run(stream_op, s, &oas->state,
OP_SEND_TRAILING_METADATA)) {
@ -1195,14 +1196,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_destroy_internal(
&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
uint32_t flags = 0;
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
*(reinterpret_cast<grpc_byte_buffer**>(
stream_op->payload->recv_message.recv_message)) =
reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(
stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
@ -1252,14 +1252,13 @@ static enum e_op_result execute_stream_op(struct op_and_state* oas) {
grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
read_data_slice);
grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
&stream_state->rs.read_slice_buffer, 0);
uint32_t flags = 0;
if (stream_state->rs.compressed) {
stream_state->rs.sbs.base.flags = GRPC_WRITE_INTERNAL_COMPRESS;
flags = GRPC_WRITE_INTERNAL_COMPRESS;
}
*(reinterpret_cast<grpc_byte_buffer**>(
stream_op->payload->recv_message.recv_message)) =
reinterpret_cast<grpc_byte_buffer*>(&stream_state->rs.sbs);
stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
stream_op->payload->recv_message.recv_message->reset(
stream_state->rs.sbs.get());
GRPC_CLOSURE_SCHED(stream_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;

@ -25,6 +25,7 @@
#include <string.h>
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@ -99,7 +100,7 @@ typedef struct inproc_stream {
grpc_transport_stream_op_batch* recv_trailing_md_op;
grpc_slice_buffer recv_message;
grpc_slice_buffer_stream recv_stream;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> recv_stream;
bool recv_inited;
bool initial_md_sent;
@ -482,8 +483,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
s->recv_message_op = nullptr;
}
if (s->send_message_op) {
grpc_byte_stream_destroy(
s->send_message_op->payload->send_message.send_message);
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, error, s->send_message_op,
"fail_helper scheduling send-message-on-complete");
@ -521,7 +521,7 @@ static void fail_helper_locked(inproc_stream* s, grpc_error* error) {
static void message_transfer_locked(inproc_stream* sender,
inproc_stream* receiver) {
size_t remaining =
sender->send_message_op->payload->send_message.send_message->length;
sender->send_message_op->payload->send_message.send_message->length();
if (receiver->recv_inited) {
grpc_slice_buffer_destroy_internal(&receiver->recv_message);
}
@ -530,12 +530,12 @@ static void message_transfer_locked(inproc_stream* sender,
do {
grpc_slice message_slice;
grpc_closure unused;
GPR_ASSERT(grpc_byte_stream_next(
sender->send_message_op->payload->send_message.send_message, SIZE_MAX,
&unused));
grpc_error* error = grpc_byte_stream_pull(
sender->send_message_op->payload->send_message.send_message,
&message_slice);
GPR_ASSERT(
sender->send_message_op->payload->send_message.send_message->Next(
SIZE_MAX, &unused));
grpc_error* error =
sender->send_message_op->payload->send_message.send_message->Pull(
&message_slice);
if (error != GRPC_ERROR_NONE) {
cancel_stream_locked(sender, GRPC_ERROR_REF(error));
break;
@ -544,13 +544,11 @@ static void message_transfer_locked(inproc_stream* sender,
remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(&receiver->recv_message, message_slice);
} while (remaining > 0);
grpc_byte_stream_destroy(
sender->send_message_op->payload->send_message.send_message);
sender->send_message_op->payload->send_message.send_message.reset();
grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message,
0);
*receiver->recv_message_op->payload->recv_message.recv_message =
&receiver->recv_stream.base;
receiver->recv_stream.Init(&receiver->recv_message, 0);
receiver->recv_message_op->payload->recv_message.recv_message->reset(
receiver->recv_stream.get());
INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready",
receiver);
GRPC_CLOSURE_SCHED(
@ -606,8 +604,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
(s->trailing_md_sent || other->recv_trailing_md_op)) {
// A server send will never be matched if the client is waiting
// for trailing metadata already
grpc_byte_stream_destroy(
s->send_message_op->payload->send_message.send_message);
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@ -744,8 +741,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
grpc_byte_stream_destroy(
s->send_message_op->payload->send_message.send_message);
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
@ -803,8 +799,7 @@ static void op_state_machine(void* arg, grpc_error* error) {
s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
grpc_byte_stream_destroy(
s->send_message_op->payload->send_message.send_message);
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");

@ -37,6 +37,7 @@
#include "src/core/lib/gpr/arena.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/manual_constructor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -221,9 +222,9 @@ struct grpc_call {
int send_extra_metadata_count;
grpc_millis send_deadline;
grpc_slice_buffer_stream sending_stream;
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
grpc_byte_stream* receiving_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
grpc_byte_buffer** receiving_buffer;
grpc_slice receiving_slice;
grpc_closure receiving_slice_ready;
@ -522,9 +523,7 @@ static void destroy_call(void* call, grpc_error* error) {
grpc_metadata_batch_destroy(
&c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
}
if (c->receiving_stream != nullptr) {
grpc_byte_stream_destroy(c->receiving_stream);
}
c->receiving_stream.reset();
parent_call* pc = get_parent_call(c);
if (pc != nullptr) {
gpr_mu_destroy(&pc->child_list_mu);
@ -1281,25 +1280,21 @@ static void continue_receiving_slices(batch_control* bctl) {
grpc_error* error;
grpc_call* call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
size_t remaining = call->receiving_stream->length() -
(*call->receiving_buffer)->data.raw.slice_buffer.length;
if (remaining == 0) {
call->receiving_message = 0;
grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
call->receiving_stream.reset();
finish_batch_step(bctl);
return;
}
if (grpc_byte_stream_next(call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
error =
grpc_byte_stream_pull(call->receiving_stream, &call->receiving_slice);
if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
error = call->receiving_stream->Pull(&call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@ -1315,19 +1310,17 @@ static void continue_receiving_slices(batch_control* bctl) {
static void receiving_slice_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
grpc_byte_stream* bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) {
grpc_slice slice;
error = grpc_byte_stream_pull(bs, &slice);
error = call->receiving_stream->Pull(&slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
continue_receiving_slices(bctl);
} else {
/* Error returned by grpc_byte_stream_pull needs to be released manually
*/
/* Error returned by ByteStream::Pull() needs to be released manually */
release_error = true;
}
}
@ -1336,8 +1329,7 @@ static void receiving_slice_ready(void* bctlp, grpc_error* error) {
if (grpc_trace_operation_failures.enabled()) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
}
grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
call->receiving_stream.reset();
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = nullptr;
call->receiving_message = 0;
@ -1355,8 +1347,8 @@ static void process_data_after_md(batch_control* bctl) {
call->receiving_message = 0;
finish_batch_step(bctl);
} else {
call->test_only_last_message_flags = call->receiving_stream->flags;
if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
call->test_only_last_message_flags = call->receiving_stream->flags();
if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
(call->incoming_message_compression_algorithm >
GRPC_MESSAGE_COMPRESS_NONE)) {
grpc_compression_algorithm algo;
@ -1379,10 +1371,7 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
batch_control* bctl = static_cast<batch_control*>(bctlp);
grpc_call* call = bctl->call;
if (error != GRPC_ERROR_NONE) {
if (call->receiving_stream != nullptr) {
grpc_byte_stream_destroy(call->receiving_stream);
call->receiving_stream = nullptr;
}
call->receiving_stream.reset();
add_batch_error(bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
}
@ -1676,21 +1665,20 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
stream_op->send_message = true;
call->sending_message = true;
grpc_slice_buffer_stream_init(
&call->sending_stream,
&op->data.send_message.send_message->data.raw.slice_buffer,
op->flags);
uint32_t flags = op->flags;
/* If the outgoing buffer is already compressed, mark it as so in the
flags. These will be picked up by the compression filter and further
(wasteful) attempts at compression skipped. */
if (op->data.send_message.send_message->data.raw.compression >
GRPC_COMPRESS_NONE) {
call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS;
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
stream_op_payload->send_message.send_message =
&call->sending_stream.base;
stream_op->send_message = true;
call->sending_message = true;
call->sending_stream.Init(
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
@ -1909,7 +1897,7 @@ done_with_error:
}
if (stream_op->send_message) {
call->sending_message = false;
grpc_byte_stream_destroy(&call->sending_stream.base);
call->sending_stream->Orphan();
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;

@ -25,160 +25,123 @@
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/slice/slice_internal.h"
bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete) {
return byte_stream->vtable->next(byte_stream, max_size_hint, on_complete);
}
namespace grpc_core {
grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
grpc_slice* slice) {
return byte_stream->vtable->pull(byte_stream, slice);
}
//
// SliceBufferByteStream
//
void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
grpc_error* error) {
byte_stream->vtable->shutdown(byte_stream, error);
SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
uint32_t flags)
: ByteStream(static_cast<uint32_t>(slice_buffer->length), flags) {
GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
grpc_slice_buffer_init(&backing_buffer_);
grpc_slice_buffer_swap(slice_buffer, &backing_buffer_);
}
void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream) {
byte_stream->vtable->destroy(byte_stream);
}
SliceBufferByteStream::~SliceBufferByteStream() {}
// grpc_slice_buffer_stream
void SliceBufferByteStream::Orphan() {
grpc_slice_buffer_destroy(&backing_buffer_);
GRPC_ERROR_UNREF(shutdown_error_);
// Note: We do not actually delete the object here, since
// SliceBufferByteStream is usually allocated as part of a larger
// object and has an OrphanablePtr of itself passed down through the
// filter stack.
}
static bool slice_buffer_stream_next(grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_slice_buffer_stream* stream =
reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
bool SliceBufferByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
GPR_ASSERT(cursor_ < backing_buffer_.count);
return true;
}
static grpc_error* slice_buffer_stream_pull(grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_slice_buffer_stream* stream =
reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
grpc_error* SliceBufferByteStream::Pull(grpc_slice* slice) {
if (shutdown_error_ != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(shutdown_error_);
}
GPR_ASSERT(stream->cursor < stream->backing_buffer.count);
*slice =
grpc_slice_ref_internal(stream->backing_buffer.slices[stream->cursor]);
stream->cursor++;
GPR_ASSERT(cursor_ < backing_buffer_.count);
*slice = grpc_slice_ref_internal(backing_buffer_.slices[cursor_]);
++cursor_;
return GRPC_ERROR_NONE;
}
static void slice_buffer_stream_shutdown(grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_slice_buffer_stream* stream =
reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
void SliceBufferByteStream::Shutdown(grpc_error* error) {
GRPC_ERROR_UNREF(shutdown_error_);
shutdown_error_ = error;
}
static void slice_buffer_stream_destroy(grpc_byte_stream* byte_stream) {
grpc_slice_buffer_stream* stream =
reinterpret_cast<grpc_slice_buffer_stream*>(byte_stream);
grpc_slice_buffer_destroy(&stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
//
// ByteStreamCache
//
ByteStreamCache::ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream)
: underlying_stream_(std::move(underlying_stream)) {
grpc_slice_buffer_init(&cache_buffer_);
}
static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
slice_buffer_stream_next, slice_buffer_stream_pull,
slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
ByteStreamCache::~ByteStreamCache() {
if (underlying_stream_ != nullptr) Destroy();
}
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
grpc_slice_buffer* slice_buffer,
uint32_t flags) {
GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
stream->base.length = static_cast<uint32_t>(slice_buffer->length);
stream->base.flags = flags;
stream->base.vtable = &slice_buffer_stream_vtable;
grpc_slice_buffer_init(&stream->backing_buffer);
grpc_slice_buffer_swap(slice_buffer, &stream->backing_buffer);
stream->cursor = 0;
stream->shutdown_error = GRPC_ERROR_NONE;
void ByteStreamCache::Destroy() {
underlying_stream_.reset();
grpc_slice_buffer_destroy_internal(&cache_buffer_);
}
// grpc_caching_byte_stream
//
// ByteStreamCache::CachingByteStream
//
void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
grpc_byte_stream* underlying_stream) {
cache->underlying_stream = underlying_stream;
grpc_slice_buffer_init(&cache->cache_buffer);
}
ByteStreamCache::CachingByteStream::CachingByteStream(ByteStreamCache* cache)
: ByteStream(cache->underlying_stream_->length(),
cache->underlying_stream_->flags()),
cache_(cache) {}
void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache) {
grpc_byte_stream_destroy(cache->underlying_stream);
grpc_slice_buffer_destroy_internal(&cache->cache_buffer);
ByteStreamCache::CachingByteStream::~CachingByteStream() {}
void ByteStreamCache::CachingByteStream::Orphan() {
GRPC_ERROR_UNREF(shutdown_error_);
// Note: We do not actually delete the object here, since
// CachingByteStream is usually allocated as part of a larger
// object and has an OrphanablePtr of itself passed down through the
// filter stack.
}
static bool caching_byte_stream_next(grpc_byte_stream* byte_stream,
size_t max_size_hint,
grpc_closure* on_complete) {
grpc_caching_byte_stream* stream =
reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
if (stream->cursor < stream->cache->cache_buffer.count) return true;
return grpc_byte_stream_next(stream->cache->underlying_stream, max_size_hint,
on_complete);
bool ByteStreamCache::CachingByteStream::Next(size_t max_size_hint,
grpc_closure* on_complete) {
if (shutdown_error_ != GRPC_ERROR_NONE) return true;
if (cursor_ < cache_->cache_buffer_.count) return true;
return cache_->underlying_stream_->Next(max_size_hint, on_complete);
}
static grpc_error* caching_byte_stream_pull(grpc_byte_stream* byte_stream,
grpc_slice* slice) {
grpc_caching_byte_stream* stream =
reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
grpc_error* ByteStreamCache::CachingByteStream::Pull(grpc_slice* slice) {
if (shutdown_error_ != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(shutdown_error_);
}
if (stream->cursor < stream->cache->cache_buffer.count) {
*slice = grpc_slice_ref_internal(
stream->cache->cache_buffer.slices[stream->cursor]);
++stream->cursor;
if (cursor_ < cache_->cache_buffer_.count) {
*slice = grpc_slice_ref_internal(cache_->cache_buffer_.slices[cursor_]);
++cursor_;
return GRPC_ERROR_NONE;
}
grpc_error* error =
grpc_byte_stream_pull(stream->cache->underlying_stream, slice);
grpc_error* error = cache_->underlying_stream_->Pull(slice);
if (error == GRPC_ERROR_NONE) {
++stream->cursor;
grpc_slice_buffer_add(&stream->cache->cache_buffer,
++cursor_;
grpc_slice_buffer_add(&cache_->cache_buffer_,
grpc_slice_ref_internal(*slice));
}
return error;
}
static void caching_byte_stream_shutdown(grpc_byte_stream* byte_stream,
grpc_error* error) {
grpc_caching_byte_stream* stream =
reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = GRPC_ERROR_REF(error);
grpc_byte_stream_shutdown(stream->cache->underlying_stream, error);
void ByteStreamCache::CachingByteStream::Shutdown(grpc_error* error) {
GRPC_ERROR_UNREF(shutdown_error_);
shutdown_error_ = GRPC_ERROR_REF(error);
cache_->underlying_stream_->Shutdown(error);
}
static void caching_byte_stream_destroy(grpc_byte_stream* byte_stream) {
grpc_caching_byte_stream* stream =
reinterpret_cast<grpc_caching_byte_stream*>(byte_stream);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
void ByteStreamCache::CachingByteStream::Reset() { cursor_ = 0; }
static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
caching_byte_stream_next, caching_byte_stream_pull,
caching_byte_stream_shutdown, caching_byte_stream_destroy};
void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
grpc_byte_stream_cache* cache) {
memset(stream, 0, sizeof(*stream));
stream->base.length = cache->underlying_stream->length;
stream->base.flags = cache->underlying_stream->flags;
stream->base.vtable = &caching_byte_stream_vtable;
stream->cache = cache;
stream->shutdown_error = GRPC_ERROR_NONE;
}
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream) {
stream->cursor = 0;
}
} // namespace grpc_core

@ -22,6 +22,8 @@
#include <grpc/support/port_platform.h>
#include <grpc/slice_buffer.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/exec_ctx.h"
/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
@ -30,71 +32,82 @@
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
typedef struct grpc_byte_stream grpc_byte_stream;
typedef struct {
bool (*next)(grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete);
grpc_error* (*pull)(grpc_byte_stream* byte_stream, grpc_slice* slice);
void (*shutdown)(grpc_byte_stream* byte_stream, grpc_error* error);
void (*destroy)(grpc_byte_stream* byte_stream);
} grpc_byte_stream_vtable;
struct grpc_byte_stream {
uint32_t length;
uint32_t flags;
const grpc_byte_stream_vtable* vtable;
namespace grpc_core {
class ByteStream : public Orphanable {
public:
virtual ~ByteStream() {}
// Returns true if the bytes are available immediately (in which case
// on_complete will not be called), or false if the bytes will be available
// asynchronously (in which case on_complete will be called when they
// are available).
//
// max_size_hint can be set as a hint as to the maximum number
// of bytes that would be acceptable to read.
virtual bool Next(size_t max_size_hint,
grpc_closure* on_complete) GRPC_ABSTRACT;
// Returns the next slice in the byte stream when it is available, as
// indicated by Next().
//
// Once a slice is returned into *slice, it is owned by the caller.
virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;
// Shuts down the byte stream.
//
// If there is a pending call to on_complete from Next(), it will be
// invoked with the error passed to Shutdown().
//
// The next call to Pull() (if any) will return the error passed to
// Shutdown().
virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;
uint32_t length() const { return length_; }
uint32_t flags() const { return flags_; }
void set_flags(uint32_t flags) { flags_ = flags; }
GRPC_ABSTRACT_BASE_CLASS
protected:
ByteStream(uint32_t length, uint32_t flags)
: length_(length), flags_(flags) {}
private:
const uint32_t length_;
uint32_t flags_;
};
// Returns true if the bytes are available immediately (in which case
// on_complete will not be called), false if the bytes will be available
// asynchronously.
//
// max_size_hint can be set as a hint as to the maximum number
// of bytes that would be acceptable to read.
bool grpc_byte_stream_next(grpc_byte_stream* byte_stream, size_t max_size_hint,
grpc_closure* on_complete);
// Returns the next slice in the byte stream when it is ready (indicated by
// either grpc_byte_stream_next returning true or on_complete passed to
// grpc_byte_stream_next is called).
// SliceBufferByteStream
//
// Once a slice is returned into *slice, it is owned by the caller.
grpc_error* grpc_byte_stream_pull(grpc_byte_stream* byte_stream,
grpc_slice* slice);
// Shuts down the byte stream.
// A ByteStream that wraps a slice buffer.
//
// If there is a pending call to on_complete from grpc_byte_stream_next(),
// it will be invoked with the error passed to grpc_byte_stream_shutdown().
//
// The next call to grpc_byte_stream_pull() (if any) will return the error
// passed to grpc_byte_stream_shutdown().
void grpc_byte_stream_shutdown(grpc_byte_stream* byte_stream,
grpc_error* error);
void grpc_byte_stream_destroy(grpc_byte_stream* byte_stream);
class SliceBufferByteStream : public ByteStream {
public:
// Removes all slices in slice_buffer, leaving it empty.
SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);
~SliceBufferByteStream();
void Orphan() override;
bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
grpc_error* Pull(grpc_slice* slice) override;
void Shutdown(grpc_error* error) override;
private:
grpc_slice_buffer backing_buffer_;
size_t cursor_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
// grpc_slice_buffer_stream
//
// A grpc_byte_stream that wraps a slice buffer. The stream takes
// ownership of the slices in the buffer, and on destruction will
// reset the contents of the buffer.
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
grpc_slice_buffer backing_buffer;
size_t cursor;
grpc_error* shutdown_error;
} grpc_slice_buffer_stream;
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
grpc_slice_buffer* slice_buffer,
uint32_t flags);
// grpc_caching_byte_stream
// CachingByteStream
//
// A grpc_byte_stream that that wraps an underlying byte stream but caches
// A ByteStream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer. If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
@ -102,32 +115,47 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream* stream,
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
// grpc_caching_byte_streams simultaneously drawing from the same underlying
// grpc_byte_stream_cache at the same time.
// CachingByteStreams simultaneously drawing from the same underlying
// ByteStreamCache at the same time.
//
class ByteStreamCache {
public:
class CachingByteStream : public ByteStream {
public:
explicit CachingByteStream(ByteStreamCache* cache);
~CachingByteStream();
void Orphan() override;
typedef struct {
grpc_byte_stream* underlying_stream;
grpc_slice_buffer cache_buffer;
} grpc_byte_stream_cache;
bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
grpc_error* Pull(grpc_slice* slice) override;
void Shutdown(grpc_error* error) override;
// Takes ownership of underlying_stream.
void grpc_byte_stream_cache_init(grpc_byte_stream_cache* cache,
grpc_byte_stream* underlying_stream);
// Resets the byte stream to the start of the underlying stream.
void Reset();
// Must not be called while still in use by a grpc_caching_byte_stream.
void grpc_byte_stream_cache_destroy(grpc_byte_stream_cache* cache);
private:
ByteStreamCache* cache_;
size_t cursor_ = 0;
grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
};
typedef struct {
grpc_byte_stream base;
grpc_byte_stream_cache* cache;
size_t cursor;
grpc_error* shutdown_error;
} grpc_caching_byte_stream;
explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);
void grpc_caching_byte_stream_init(grpc_caching_byte_stream* stream,
grpc_byte_stream_cache* cache);
~ByteStreamCache();
// Must not be destroyed while still in use by a CachingByteStream.
void Destroy();
grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }
private:
OrphanablePtr<ByteStream> underlying_stream_;
grpc_slice_buffer cache_buffer_;
};
// Resets the byte stream to the start of the underlying stream.
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream* stream);
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */

@ -209,7 +209,7 @@ void grpc_transport_stream_op_batch_finish_with_failure(
grpc_transport_stream_op_batch* batch, grpc_error* error,
grpc_call_combiner* call_combiner) {
if (batch->send_message) {
grpc_byte_stream_destroy(batch->payload->send_message.send_message);
batch->payload->send_message.send_message.reset();
}
if (batch->recv_message) {
GRPC_CALL_COMBINER_START(

@ -184,11 +184,10 @@ struct grpc_transport_stream_op_batch_payload {
struct {
// The transport (or a filter that decides to return a failure before
// the op gets down to the transport) is responsible for calling
// grpc_byte_stream_destroy() on this.
// the op gets down to the transport) takes ownership.
// The batch's on_complete will not be called until after the byte
// stream is destroyed.
grpc_byte_stream* send_message;
// stream is orphaned.
grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message;
} send_message;
struct {
@ -216,10 +215,8 @@ struct grpc_transport_stream_op_batch_payload {
struct {
// Will be set by the transport to point to the byte stream
// containing a received message.
// The caller is responsible for calling grpc_byte_stream_destroy()
// on this byte stream.
// Will be NULL if trailing metadata is received instead of a message.
grpc_byte_stream** recv_message;
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
/** Should be enqueued when one message is ready to be processed. */
grpc_closure* recv_message_ready;
} recv_message;

@ -75,9 +75,16 @@ char* grpc_transport_stream_op_batch_string(
if (op->send_message) {
gpr_strvec_add(&b, gpr_strdup(" "));
gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
op->payload->send_message.send_message->flags,
op->payload->send_message.send_message->length);
if (op->payload->send_message.send_message != nullptr) {
gpr_asprintf(&tmp, "SEND_MESSAGE:flags=0x%08x:len=%d",
op->payload->send_message.send_message->flags(),
op->payload->send_message.send_message->length());
} else {
// This can happen when we check a batch after the transport has
// processed and cleared the send_message op.
tmp =
gpr_strdup("SEND_MESSAGE(flag and length unknown, already orphaned)");
}
gpr_strvec_add(&b, tmp);
}

@ -182,20 +182,22 @@ class TransportStreamOpBatch {
op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure;
}
grpc_byte_stream* send_message() const {
return op_->send_message ? op_->payload->send_message.send_message
grpc_core::OrphanablePtr<grpc_core::ByteStream>* send_message() const {
return op_->send_message ? &op_->payload->send_message.send_message
: nullptr;
}
void set_send_message(grpc_byte_stream* send_message) {
void set_send_message(
grpc_core::OrphanablePtr<grpc_core::ByteStream> send_message) {
op_->send_message = true;
op_->payload->send_message.send_message = send_message;
op_->payload->send_message.send_message = std::move(send_message);
}
grpc_byte_stream** recv_message() const {
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message() const {
return op_->recv_message ? op_->payload->recv_message.recv_message
: nullptr;
}
void set_recv_message(grpc_byte_stream** recv_message) {
void set_recv_message(
grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message) {
op_->recv_message = true;
op_->payload->recv_message.recv_message = recv_message;
}

@ -43,6 +43,9 @@ grpc_cc_test(
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
external_deps = [
"gtest",
],
)
grpc_cc_test(

@ -27,16 +27,18 @@
#include "test/core/util/test_config.h"
#include <gtest/gtest.h>
namespace grpc_core {
namespace {
//
// grpc_slice_buffer_stream tests
// SliceBufferByteStream tests
//
static void not_called_closure(void* arg, grpc_error* error) {
GPR_ASSERT(false);
}
void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); }
static void test_slice_buffer_stream_basic(void) {
gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
TEST(SliceBufferByteStream, Basic) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
@ -49,28 +51,26 @@ static void test_slice_buffer_stream_basic(void) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
grpc_slice_buffer_stream stream;
grpc_slice_buffer_stream_init(&stream, &buffer, 0);
GPR_ASSERT(stream.base.length == 6);
SliceBufferByteStream stream(&buffer, 0);
grpc_slice_buffer_destroy_internal(&buffer);
EXPECT_EQ(6U, stream.length());
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read each slice. Note that next() always returns synchronously.
// Read each slice. Note that Next() always returns synchronously.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_error* error = stream.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
grpc_byte_stream_destroy(&stream.base);
grpc_slice_buffer_destroy_internal(&buffer);
stream.Orphan();
}
static void test_slice_buffer_stream_shutdown(void) {
gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
TEST(SliceBufferByteStream, Shutdown) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
@ -83,40 +83,38 @@ static void test_slice_buffer_stream_shutdown(void) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
grpc_slice_buffer_stream stream;
grpc_slice_buffer_stream_init(&stream, &buffer, 0);
GPR_ASSERT(stream.base.length == 6);
SliceBufferByteStream stream(&buffer, 0);
grpc_slice_buffer_destroy_internal(&buffer);
EXPECT_EQ(6U, stream.length());
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read the first slice.
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_error* error = stream.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Now shutdown.
grpc_error* shutdown_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
grpc_byte_stream_shutdown(&stream.base, GRPC_ERROR_REF(shutdown_error));
stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
// After shutdown, the next pull() should return the error.
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == shutdown_error);
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
error = stream.Pull(&output);
EXPECT_TRUE(error == shutdown_error);
GRPC_ERROR_UNREF(error);
GRPC_ERROR_UNREF(shutdown_error);
// Clean up.
grpc_byte_stream_destroy(&stream.base);
grpc_slice_buffer_destroy_internal(&buffer);
stream.Orphan();
}
//
// grpc_caching_byte_stream tests
// CachingByteStream tests
//
static void test_caching_byte_stream_basic(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
TEST(CachingByteStream, Basic) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@ -128,34 +126,30 @@ static void test_caching_byte_stream_basic(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
SliceBufferByteStream underlying_stream(&buffer, 0);
grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and caching stream.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream;
grpc_caching_byte_stream_init(&stream, &cache);
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
ByteStreamCache::CachingByteStream stream(&cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read each slice. Note that next() always returns synchronously,
// because the underlying byte stream always does.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_error* error = stream.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
grpc_byte_stream_destroy(&stream.base);
grpc_byte_stream_cache_destroy(&cache);
grpc_slice_buffer_destroy_internal(&buffer);
stream.Orphan();
cache.Destroy();
}
static void test_caching_byte_stream_reset(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
TEST(CachingByteStream, Reset) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@ -167,41 +161,37 @@ static void test_caching_byte_stream_reset(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
SliceBufferByteStream underlying_stream(&buffer, 0);
grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and caching stream.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream;
grpc_caching_byte_stream_init(&stream, &cache);
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
ByteStreamCache::CachingByteStream stream(&cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read one slice.
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
grpc_slice output;
grpc_error* error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_error* error = stream.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Reset the caching stream. The reads should start over from the
// first slice.
grpc_caching_byte_stream_reset(&stream);
stream.Reset();
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(grpc_byte_stream_next(&stream.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
error = stream.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Clean up.
grpc_byte_stream_destroy(&stream.base);
grpc_byte_stream_cache_destroy(&cache);
grpc_slice_buffer_destroy_internal(&buffer);
stream.Orphan();
cache.Destroy();
}
static void test_caching_byte_stream_shared_cache(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
TEST(CachingByteStream, SharedCache) {
grpc_core::ExecCtx exec_ctx;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
@ -213,54 +203,50 @@ static void test_caching_byte_stream_shared_cache(void) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
SliceBufferByteStream underlying_stream(&buffer, 0);
grpc_slice_buffer_destroy_internal(&buffer);
// Create cache and two caching streams.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream1;
grpc_caching_byte_stream_init(&stream1, &cache);
grpc_caching_byte_stream stream2;
grpc_caching_byte_stream_init(&stream2, &cache);
ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
ByteStreamCache::CachingByteStream stream1(&cache);
ByteStreamCache::CachingByteStream stream2(&cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, nullptr,
GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
grpc_schedule_on_exec_ctx);
// Read one slice from stream1.
GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure));
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
grpc_slice output;
grpc_error* error = grpc_byte_stream_pull(&stream1.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_error* error = stream1.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(output);
// Read all slices from stream2.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(grpc_byte_stream_next(&stream2.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&stream2.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
error = stream2.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(output);
}
// Now read the second slice from stream1.
GPR_ASSERT(grpc_byte_stream_next(&stream1.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&stream1.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[1], output));
EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
error = stream1.Pull(&output);
EXPECT_TRUE(error == GRPC_ERROR_NONE);
EXPECT_TRUE(grpc_slice_eq(input[1], output));
grpc_slice_unref_internal(output);
// Clean up.
grpc_byte_stream_destroy(&stream1.base);
grpc_byte_stream_destroy(&stream2.base);
grpc_byte_stream_cache_destroy(&cache);
grpc_slice_buffer_destroy_internal(&buffer);
stream1.Orphan();
stream2.Orphan();
cache.Destroy();
}
} // namespace
} // namespace grpc_core
int main(int argc, char** argv) {
grpc_init();
grpc_test_init(argc, argv);
test_slice_buffer_stream_basic();
test_slice_buffer_stream_shutdown();
test_caching_byte_stream_basic();
test_caching_byte_stream_reset();
test_caching_byte_stream_shared_cache();
::testing::InitGoogleTest(&argc, argv);
int retval = RUN_ALL_TESTS();
grpc_shutdown();
return 0;
return retval;
}

@ -398,13 +398,13 @@ static void BM_TransportStreamSend(benchmark::State& state) {
memset(&op, 0, sizeof(op));
op.payload = &op_payload;
};
grpc_slice_buffer_stream send_stream;
grpc_slice_buffer send_buffer;
grpc_slice_buffer_init(&send_buffer);
grpc_slice_buffer_add(&send_buffer, gpr_slice_malloc(state.range(0)));
memset(GRPC_SLICE_START_PTR(send_buffer.slices[0]), 0,
GRPC_SLICE_LENGTH(send_buffer.slices[0]));
// Create the send_message payload slice.
// Note: We use grpc_slice_malloc_large() instead of grpc_slice_malloc()
// to force the slice to be refcounted, so that it remains alive when it
// is unreffed after each send_message op.
grpc_slice send_slice = grpc_slice_malloc_large(state.range(0));
memset(GRPC_SLICE_START_PTR(send_slice), 0, GRPC_SLICE_LENGTH(send_slice));
grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> send_stream;
grpc_metadata_batch b;
grpc_metadata_batch_init(&b);
b.deadline = GRPC_MILLIS_INF_FUTURE;
@ -424,14 +424,18 @@ static void BM_TransportStreamSend(benchmark::State& state) {
gpr_event_set(bm_done, (void*)1);
return;
}
grpc_slice_buffer send_buffer;
grpc_slice_buffer_init(&send_buffer);
grpc_slice_buffer_add(&send_buffer, grpc_slice_ref(send_slice));
send_stream.Init(&send_buffer, 0);
grpc_slice_buffer_destroy(&send_buffer);
// force outgoing window to be yuge
s->chttp2_stream()->flow_control->TestOnlyForceHugeWindow();
f.chttp2_transport()->flow_control->TestOnlyForceHugeWindow();
grpc_slice_buffer_stream_init(&send_stream, &send_buffer, 0);
reset_op();
op.on_complete = c.get();
op.send_message = true;
op.payload->send_message.send_message = &send_stream.base;
op.payload->send_message.send_message.reset(send_stream.get());
s->Op(&op);
});
@ -454,7 +458,7 @@ static void BM_TransportStreamSend(benchmark::State& state) {
s.reset();
track_counters.Finish(state);
grpc_metadata_batch_destroy(&b);
grpc_slice_buffer_destroy(&send_buffer);
grpc_slice_unref(send_slice);
}
BENCHMARK(BM_TransportStreamSend)->Range(0, 128 * 1024 * 1024);
@ -524,7 +528,7 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
grpc_transport_stream_op_batch_payload op_payload;
memset(&op_payload, 0, sizeof(op_payload));
grpc_transport_stream_op_batch op;
grpc_byte_stream* recv_stream;
grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_stream;
grpc_slice incoming_data = CreateIncomingDataSlice(state.range(0), 16384);
auto reset_op = [&]() {
@ -579,21 +583,20 @@ static void BM_TransportStreamRecv(benchmark::State& state) {
drain = MakeClosure([&](grpc_error* error) {
do {
if (received == recv_stream->length) {
grpc_byte_stream_destroy(recv_stream);
if (received == recv_stream->length()) {
recv_stream.reset();
GRPC_CLOSURE_SCHED(c.get(), GRPC_ERROR_NONE);
return;
}
} while (grpc_byte_stream_next(recv_stream, recv_stream->length - received,
drain_continue.get()) &&
GRPC_ERROR_NONE ==
grpc_byte_stream_pull(recv_stream, &recv_slice) &&
} while (recv_stream->Next(recv_stream->length() - received,
drain_continue.get()) &&
GRPC_ERROR_NONE == recv_stream->Pull(&recv_slice) &&
(received += GRPC_SLICE_LENGTH(recv_slice),
grpc_slice_unref_internal(recv_slice), true));
});
drain_continue = MakeClosure([&](grpc_error* error) {
grpc_byte_stream_pull(recv_stream, &recv_slice);
recv_stream->Pull(&recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(recv_slice);
GRPC_CLOSURE_RUN(drain.get(), GRPC_ERROR_NONE);

@ -146,23 +146,6 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "byte_stream_test",
"src": [
"test/core/transport/byte_stream_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
@ -3002,6 +2985,23 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "byte_stream_test",
"src": [
"test/core/transport/byte_stream_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -195,30 +195,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "byte_stream_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
@ -3601,6 +3577,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "byte_stream_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save