Merge pull request #11905 from markdroth/byte_stream_shutdown

Improvements to grpc_byte_stream API and handling.
pull/11947/head
Mark D. Roth 8 years ago committed by GitHub
commit c44f036ee2
  1. 32
      CMakeLists.txt
  2. 36
      Makefile
  3. 10
      build.yaml
  4. 514
      src/core/ext/filters/http/client/http_client_filter.c
  5. 202
      src/core/ext/filters/http/message_compress/message_compress_filter.c
  6. 63
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  7. 33
      src/core/ext/transport/inproc/inproc_transport.c
  8. 128
      src/core/lib/transport/byte_stream.c
  9. 99
      src/core/lib/transport/byte_stream.h
  10. 24
      src/core/lib/transport/transport.c
  11. 9
      src/core/lib/transport/transport.h
  12. 12
      test/core/transport/BUILD
  13. 279
      test/core/transport/byte_stream_test.c
  14. 17
      tools/run_tests/generated/sources_and_headers.json
  15. 22
      tools/run_tests/generated/tests.json
  16. 27
      vsprojects/buildtests_c.sln
  17. 199
      vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj
  18. 21
      vsprojects/vcxproj/test/byte_stream_test/byte_stream_test.vcxproj.filters

@ -394,6 +394,7 @@ add_dependencies(buildtests_c bad_server_response_test)
add_dependencies(buildtests_c bdp_estimator_test) add_dependencies(buildtests_c bdp_estimator_test)
add_dependencies(buildtests_c bin_decoder_test) add_dependencies(buildtests_c bin_decoder_test)
add_dependencies(buildtests_c bin_encoder_test) add_dependencies(buildtests_c bin_encoder_test)
add_dependencies(buildtests_c byte_stream_test)
add_dependencies(buildtests_c census_context_test) add_dependencies(buildtests_c census_context_test)
add_dependencies(buildtests_c census_intrusive_hash_map_test) add_dependencies(buildtests_c census_intrusive_hash_map_test)
add_dependencies(buildtests_c census_resource_test) add_dependencies(buildtests_c census_resource_test)
@ -4785,6 +4786,37 @@ target_link_libraries(bin_encoder_test
endif (gRPC_BUILD_TESTS) endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS)
add_executable(byte_stream_test
test/core/transport/byte_stream_test.c
)
target_include_directories(byte_stream_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${BORINGSSL_ROOT_DIR}/include
PRIVATE ${PROTOBUF_ROOT_DIR}/src
PRIVATE ${BENCHMARK_ROOT_DIR}/include
PRIVATE ${ZLIB_ROOT_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
PRIVATE ${CARES_BUILD_INCLUDE_DIR}
PRIVATE ${CARES_INCLUDE_DIR}
PRIVATE ${CARES_PLATFORM_INCLUDE_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
)
target_link_libraries(byte_stream_test
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(census_context_test add_executable(census_context_test
test/core/census/context_test.c test/core/census/context_test.c
) )

@ -954,6 +954,7 @@ bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test
bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test bdp_estimator_test: $(BINDIR)/$(CONFIG)/bdp_estimator_test
bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test
bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test
byte_stream_test: $(BINDIR)/$(CONFIG)/byte_stream_test
census_context_test: $(BINDIR)/$(CONFIG)/census_context_test census_context_test: $(BINDIR)/$(CONFIG)/census_context_test
census_intrusive_hash_map_test: $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test census_intrusive_hash_map_test: $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test
census_resource_test: $(BINDIR)/$(CONFIG)/census_resource_test census_resource_test: $(BINDIR)/$(CONFIG)/census_resource_test
@ -1345,6 +1346,7 @@ buildtests_c: privatelibs_c \
$(BINDIR)/$(CONFIG)/bdp_estimator_test \ $(BINDIR)/$(CONFIG)/bdp_estimator_test \
$(BINDIR)/$(CONFIG)/bin_decoder_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \
$(BINDIR)/$(CONFIG)/bin_encoder_test \ $(BINDIR)/$(CONFIG)/bin_encoder_test \
$(BINDIR)/$(CONFIG)/byte_stream_test \
$(BINDIR)/$(CONFIG)/census_context_test \ $(BINDIR)/$(CONFIG)/census_context_test \
$(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test \ $(BINDIR)/$(CONFIG)/census_intrusive_hash_map_test \
$(BINDIR)/$(CONFIG)/census_resource_test \ $(BINDIR)/$(CONFIG)/census_resource_test \
@ -1746,6 +1748,8 @@ test_c: buildtests_c
$(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 )
$(E) "[RUN] Testing bin_encoder_test" $(E) "[RUN] Testing bin_encoder_test"
$(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 )
$(E) "[RUN] Testing byte_stream_test"
$(Q) $(BINDIR)/$(CONFIG)/byte_stream_test || ( echo test byte_stream_test failed ; exit 1 )
$(E) "[RUN] Testing census_context_test" $(E) "[RUN] Testing census_context_test"
$(Q) $(BINDIR)/$(CONFIG)/census_context_test || ( echo test census_context_test failed ; exit 1 ) $(Q) $(BINDIR)/$(CONFIG)/census_context_test || ( echo test census_context_test failed ; exit 1 )
$(E) "[RUN] Testing census_intrusive_hash_map_test" $(E) "[RUN] Testing census_intrusive_hash_map_test"
@ -8411,6 +8415,38 @@ endif
endif endif
BYTE_STREAM_TEST_SRC = \
test/core/transport/byte_stream_test.c \
BYTE_STREAM_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BYTE_STREAM_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/byte_stream_test: openssl_dep_error
else
$(BINDIR)/$(CONFIG)/byte_stream_test: $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LD) $(LDFLAGS) $(BYTE_STREAM_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/byte_stream_test
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/byte_stream_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_byte_stream_test: $(BYTE_STREAM_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(BYTE_STREAM_TEST_OBJS:.o=.dep)
endif
endif
CENSUS_CONTEXT_TEST_SRC = \ CENSUS_CONTEXT_TEST_SRC = \
test/core/census/context_test.c \ test/core/census/context_test.c \

@ -1702,6 +1702,16 @@ targets:
deps: deps:
- grpc_test_util - grpc_test_util
- grpc - grpc
- name: byte_stream_test
build: test
language: c
src:
- test/core/transport/byte_stream_test.c
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
- name: census_context_test - name: census_context_test
build: test build: test
language: c language: c

@ -36,41 +36,29 @@
static const size_t kMaxPayloadSizeForGet = 2048; static const size_t kMaxPayloadSizeForGet = 2048;
typedef struct call_data { typedef struct call_data {
// State for handling send_initial_metadata ops.
grpc_linked_mdelem method; grpc_linked_mdelem method;
grpc_linked_mdelem scheme; grpc_linked_mdelem scheme;
grpc_linked_mdelem authority; grpc_linked_mdelem authority;
grpc_linked_mdelem te_trailers; grpc_linked_mdelem te_trailers;
grpc_linked_mdelem content_type; grpc_linked_mdelem content_type;
grpc_linked_mdelem user_agent; grpc_linked_mdelem user_agent;
// State for handling recv_initial_metadata ops.
grpc_metadata_batch *recv_initial_metadata; grpc_metadata_batch *recv_initial_metadata;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
// State for handling recv_trailing_metadata ops.
grpc_metadata_batch *recv_trailing_metadata; grpc_metadata_batch *recv_trailing_metadata;
uint8_t *payload_bytes; grpc_closure *original_recv_trailing_metadata_on_complete;
grpc_closure recv_trailing_metadata_on_complete;
/* Vars to read data off of send_message */ // State for handling send_message ops.
grpc_transport_stream_op_batch *send_op; grpc_transport_stream_op_batch *send_message_batch;
uint32_t send_length; size_t send_message_bytes_read;
uint32_t send_flags; grpc_byte_stream_cache send_message_cache;
grpc_slice incoming_slice; grpc_caching_byte_stream send_message_caching_stream;
grpc_slice_buffer_stream replacement_stream; grpc_closure on_send_message_next_done;
grpc_slice_buffer slices; grpc_closure *original_send_message_on_complete;
/* flag that indicates that all slices of send_messages aren't availble */ grpc_closure send_message_on_complete;
bool send_message_blocked;
/** Closure to call when finished with the hc_on_recv hook */
grpc_closure *on_done_recv_initial_metadata;
grpc_closure *on_done_recv_trailing_metadata;
grpc_closure *on_complete;
grpc_closure *post_send;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
grpc_closure hc_on_recv_initial_metadata;
grpc_closure hc_on_recv_trailing_metadata;
grpc_closure hc_on_complete;
grpc_closure got_slice;
grpc_closure send_done;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
void *user_data, grpc_error *error) { void *user_data, grpc_error *error) {
grpc_call_element *elem = user_data; grpc_call_element *elem = user_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
} else { } else {
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
} }
GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready,
error);
} }
static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx,
void *user_data, grpc_error *error) { void *user_data,
grpc_error *error) {
grpc_call_element *elem = user_data; grpc_call_element *elem = user_data;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
} else { } else {
GRPC_ERROR_REF(error); GRPC_ERROR_REF(error);
} }
GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error); GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete,
error);
} }
static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) { grpc_error *error) {
grpc_call_element *elem = user_data; grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = elem->call_data; call_data *calld = (call_data *)elem->call_data;
if (calld->payload_bytes) { grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache);
gpr_free(calld->payload_bytes); GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
calld->payload_bytes = NULL; GRPC_ERROR_REF(error));
}
// Pulls a slice from the send_message byte stream, updating
// calld->send_message_bytes_read.
static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld) {
grpc_slice incoming_slice;
grpc_error *error = grpc_byte_stream_pull(
exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice);
if (error == GRPC_ERROR_NONE) {
calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice);
grpc_slice_unref_internal(exec_ctx, incoming_slice);
} }
calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); return error;
} }
static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { // Reads as many slices as possible from the send_message byte stream.
grpc_call_element *elem = elemp; // Upon successful return, if calld->send_message_bytes_read ==
call_data *calld = elem->call_data; // calld->send_message_caching_stream.base.length, then we have completed
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); // reading from the byte stream; otherwise, an async read has been dispatched
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); // and on_send_message_next_done() will be invoked when it is complete.
static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx,
call_data *calld) {
while (grpc_byte_stream_next(exec_ctx,
&calld->send_message_caching_stream.base,
~(size_t)0, &calld->on_send_message_next_done)) {
grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) return error;
if (calld->send_message_bytes_read ==
calld->send_message_caching_stream.base.length) {
break;
}
}
return GRPC_ERROR_NONE;
}
// Async callback for grpc_byte_stream_next().
static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->send_message_batch, error);
return;
}
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->send_message_batch, error);
return;
}
// There may or may not be more to read, but we don't care. If we got
// here, then we know that all of the data was not available
// synchronously, so we were not able to do a cached call. Instead,
// we just reset the byte stream and then send down the batch as-is.
grpc_caching_byte_stream_reset(&calld->send_message_caching_stream);
grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
}
static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) {
char *payload_bytes = gpr_malloc(slice_buffer->length + 1);
size_t offset = 0;
for (size_t i = 0; i < slice_buffer->count; ++i) {
memcpy(payload_bytes + offset,
GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
}
*(payload_bytes + offset) = '\0';
return payload_bytes;
}
// Modifies the path entry in the batch's send_initial_metadata to
// append the base64-encoded query for a GET request.
static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = (call_data *)elem->call_data;
grpc_slice path_slice =
GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata
->idx.named.path->md);
/* sum up individual component's lengths and allocate enough memory to
* hold combined path+query */
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
batch->payload->send_message.send_message->length, true /* url_safe */,
false /* multi_line */);
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice);
memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
write_ptr += GRPC_SLICE_LENGTH(path_slice);
*write_ptr++ = '?';
char *payload_bytes =
slice_buffer_to_string(&calld->send_message_cache.cache_buffer);
grpc_base64_encode_core((char *)write_ptr, payload_bytes,
batch->payload->send_message.send_message->length,
true /* url_safe */, false /* multi_line */);
gpr_free(payload_bytes);
/* remove trailing unused memory and add trailing 0 to terminate string */
char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
/* safe to use strlen since base64_encode will always add '\0' */
path_with_query_slice =
grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query =
grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
grpc_metadata_batch *b =
batch->payload->send_initial_metadata.send_initial_metadata;
return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
mdelem_path_and_query);
} }
static void remove_if_present(grpc_exec_ctx *exec_ctx, static void remove_if_present(grpc_exec_ctx *exec_ctx,
@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx,
} }
} }
static void continue_send_message(grpc_exec_ctx *exec_ctx, static void hc_start_transport_stream_op_batch(
grpc_call_element *elem) { grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes; channel_data *channeld = elem->channel_data;
while (grpc_byte_stream_next( GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0);
exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
&calld->got_slice)) {
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice);
if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) {
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice));
}
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
calld->send_message_blocked = false;
break;
}
}
}
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { if (batch->recv_initial_metadata) {
grpc_call_element *elem = elemp; /* substitute our callback for the higher callback */
call_data *calld = elem->call_data; calld->recv_initial_metadata =
calld->send_message_blocked = false; batch->payload->recv_initial_metadata.recv_initial_metadata;
if (GRPC_ERROR_NONE != calld->original_recv_initial_metadata_ready =
grpc_byte_stream_pull(exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
calld->send_op->payload->send_message.send_message, batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->incoming_slice)) { &calld->recv_initial_metadata_ready;
/* Should never reach here */
abort();
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags);
calld->send_op->payload->send_message.send_message =
&calld->replacement_stream.base;
calld->post_send = calld->send_op->on_complete;
calld->send_op->on_complete = &calld->send_done;
grpc_call_next_op(exec_ctx, elem, calld->send_op);
} else {
continue_send_message(exec_ctx, elem);
} }
}
static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, if (batch->recv_trailing_metadata) {
grpc_call_element *elem, /* substitute our callback for the higher callback */
grpc_transport_stream_op_batch *op) { calld->recv_trailing_metadata =
/* grab pointers to our data from the call element */ batch->payload->recv_trailing_metadata.recv_trailing_metadata;
call_data *calld = elem->call_data; calld->original_recv_trailing_metadata_on_complete = batch->on_complete;
channel_data *channeld = elem->channel_data; batch->on_complete = &calld->recv_trailing_metadata_on_complete;
grpc_error *error; }
if (op->send_initial_metadata) { grpc_error *error = GRPC_ERROR_NONE;
/* Decide which HTTP VERB to use. We use GET if the request is marked bool batch_will_be_handled_asynchronously = false;
cacheable, and the operation contains both initial metadata and send if (batch->send_initial_metadata) {
message, and the payload is below the size threshold, and all the data // Decide which HTTP VERB to use. We use GET if the request is marked
for this request is immediately available. */ // cacheable, and the operation contains both initial metadata and send
// message, and the payload is below the size threshold, and all the data
// for this request is immediately available.
grpc_mdelem method = GRPC_MDELEM_METHOD_POST; grpc_mdelem method = GRPC_MDELEM_METHOD_POST;
if (op->send_message && if (batch->send_message &&
(op->payload->send_initial_metadata.send_initial_metadata_flags & (batch->payload->send_initial_metadata.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) &&
op->payload->send_message.send_message->length < batch->payload->send_message.send_message->length <
channeld->max_payload_size_for_get) { channeld->max_payload_size_for_get) {
method = GRPC_MDELEM_METHOD_GET; calld->send_message_bytes_read = 0;
/* The following write to calld->send_message_blocked isn't racy with grpc_byte_stream_cache_init(&calld->send_message_cache,
reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because batch->payload->send_message.send_message);
being here means ops->send_message is not NULL, which is primarily grpc_caching_byte_stream_init(&calld->send_message_caching_stream,
guarding the read there. */ &calld->send_message_cache);
calld->send_message_blocked = true; batch->payload->send_message.send_message =
} else if (op->payload->send_initial_metadata.send_initial_metadata_flags & &calld->send_message_caching_stream.base;
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { calld->original_send_message_on_complete = batch->on_complete;
method = GRPC_MDELEM_METHOD_PUT; batch->on_complete = &calld->send_message_on_complete;
} calld->send_message_batch = batch;
error = read_all_available_send_message_data(exec_ctx, calld);
/* Attempt to read the data from send_message and create a header field. */ if (error != GRPC_ERROR_NONE) goto done;
if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { // If all the data has been read, then we can use GET.
/* allocate memory to hold the entire payload */ if (calld->send_message_bytes_read ==
calld->payload_bytes = calld->send_message_caching_stream.base.length) {
gpr_malloc(op->payload->send_message.send_message->length); method = GRPC_MDELEM_METHOD_GET;
error = update_path_for_get(exec_ctx, elem, batch);
/* read slices of send_message and copy into payload_bytes */ if (error != GRPC_ERROR_NONE) goto done;
calld->send_op = op; batch->send_message = false;
calld->send_length = op->payload->send_message.send_message->length; grpc_byte_stream_destroy(exec_ctx,
calld->send_flags = op->payload->send_message.send_message->flags; &calld->send_message_caching_stream.base);
continue_send_message(exec_ctx, elem);
if (calld->send_message_blocked == false) {
/* when all the send_message data is available, then modify the path
* MDELEM by appending base64 encoded query to the path */
const int k_url_safe = 1;
const int k_multi_line = 0;
const unsigned char k_query_separator = '?';
grpc_slice path_slice =
GRPC_MDVALUE(op->payload->send_initial_metadata
.send_initial_metadata->idx.named.path->md);
/* sum up individual component's lengths and allocate enough memory to
* hold combined path+query */
size_t estimated_len = GRPC_SLICE_LENGTH(path_slice);
estimated_len++; /* for the '?' */
estimated_len += grpc_base64_estimate_encoded_size(
op->payload->send_message.send_message->length, k_url_safe,
k_multi_line);
grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len);
/* memcopy individual pieces into this slice */
uint8_t *write_ptr =
(uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice);
uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice);
memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice));
write_ptr += GRPC_SLICE_LENGTH(path_slice);
*write_ptr = k_query_separator;
write_ptr++; /* for the '?' */
grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes,
op->payload->send_message.send_message->length,
k_url_safe, k_multi_line);
/* remove trailing unused memory and add trailing 0 to terminate string
*/
char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice);
/* safe to use strlen since base64_encode will always add '\0' */
path_with_query_slice =
grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t));
/* substitute previous path with the new path+query */
grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices(
exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice);
grpc_metadata_batch *b =
op->payload->send_initial_metadata.send_initial_metadata;
error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path,
mdelem_path_and_query);
if (error != GRPC_ERROR_NONE) return error;
calld->on_complete = op->on_complete;
op->on_complete = &calld->hc_on_complete;
op->send_message = false;
} else { } else {
/* Not all data is available. Fall back to POST. */ // Not all data is available. The batch will be sent down
// asynchronously in on_send_message_next_done().
batch_will_be_handled_asynchronously = true;
// Fall back to POST.
gpr_log(GPR_DEBUG, gpr_log(GPR_DEBUG,
"Request is marked Cacheable but not all data is available.\ "Request is marked Cacheable but not all data is available. "
Falling back to POST"); "Falling back to POST");
method = GRPC_MDELEM_METHOD_POST;
} }
} else if (batch->payload->send_initial_metadata
.send_initial_metadata_flags &
GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
method = GRPC_MDELEM_METHOD_PUT;
} }
remove_if_present(exec_ctx, remove_if_present(
op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_METHOD); GRPC_BATCH_METHOD);
remove_if_present(exec_ctx, remove_if_present(
op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_SCHEME); GRPC_BATCH_SCHEME);
remove_if_present(exec_ctx, remove_if_present(
op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_TE); GRPC_BATCH_TE);
remove_if_present(exec_ctx, remove_if_present(
op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_CONTENT_TYPE); GRPC_BATCH_CONTENT_TYPE);
remove_if_present(exec_ctx, remove_if_present(
op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
GRPC_BATCH_USER_AGENT); GRPC_BATCH_USER_AGENT);
/* Send : prefixed headers, which have to be before any application /* Send : prefixed headers, which have to be before any application
layer headers. */ layer headers. */
error = grpc_metadata_batch_add_head( error = grpc_metadata_batch_add_head(
exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->method, method); &calld->method, method);
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_head( error = grpc_metadata_batch_add_head(
exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->scheme, channeld->static_scheme); &calld->scheme, channeld->static_scheme);
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail( error = grpc_metadata_batch_add_tail(
exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS);
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail( error = grpc_metadata_batch_add_tail(
exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC);
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) goto done;
error = grpc_metadata_batch_add_tail( error = grpc_metadata_batch_add_tail(
exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata,
&calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent));
if (error != GRPC_ERROR_NONE) return error; if (error != GRPC_ERROR_NONE) goto done;
} }
if (op->recv_initial_metadata) { done:
/* substitute our callback for the higher callback */
calld->recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata;
calld->on_done_recv_initial_metadata =
op->payload->recv_initial_metadata.recv_initial_metadata_ready;
op->payload->recv_initial_metadata.recv_initial_metadata_ready =
&calld->hc_on_recv_initial_metadata;
}
if (op->recv_trailing_metadata) {
/* substitute our callback for the higher callback */
calld->recv_trailing_metadata =
op->payload->recv_trailing_metadata.recv_trailing_metadata;
calld->on_done_recv_trailing_metadata = op->on_complete;
op->on_complete = &calld->hc_on_recv_trailing_metadata;
}
return GRPC_ERROR_NONE;
}
static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
GPR_TIMER_BEGIN("hc_start_transport_op", 0);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_error *error = hc_mutate_op(exec_ctx, elem, op);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); grpc_transport_stream_op_batch_finish_with_failure(
} else { exec_ctx, calld->send_message_batch, error);
call_data *calld = elem->call_data; } else if (!batch_will_be_handled_asynchronously) {
if (op->send_message && calld->send_message_blocked) { grpc_call_next_op(exec_ctx, elem, batch);
/* Don't forward the op. send_message contains slices that aren't ready
yet. The call will be forwarded by the op_complete of slice read call.
*/
} else {
grpc_call_next_op(exec_ctx, elem, op);
}
} }
GPR_TIMER_END("hc_start_transport_op", 0); GPR_TIMER_END("hc_start_transport_stream_op_batch", 0);
} }
/* Constructor for call_data */ /* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem, grpc_call_element *elem,
const grpc_call_element_args *args) { const grpc_call_element_args *args) {
call_data *calld = elem->call_data; call_data *calld = (call_data *)elem->call_data;
calld->on_done_recv_initial_metadata = NULL; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
calld->on_done_recv_trailing_metadata = NULL; recv_initial_metadata_ready, elem,
calld->on_complete = NULL;
calld->payload_bytes = NULL;
calld->send_message_blocked = false;
grpc_slice_buffer_init(&calld->slices);
GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata,
hc_on_recv_initial_metadata, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata,
hc_on_recv_trailing_metadata, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem, GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete,
grpc_schedule_on_exec_ctx); recv_trailing_metadata_on_complete, elem,
GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
/* Destructor for call_data */ /* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info, const grpc_call_final_info *final_info,
grpc_closure *ignored) { grpc_closure *ignored) {}
call_data *calld = elem->call_data;
grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices);
}
static grpc_mdelem scheme_from_args(const grpc_channel_args *args) { static grpc_mdelem scheme_from_args(const grpc_channel_args *args) {
unsigned i; unsigned i;
@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
} }
const grpc_channel_filter grpc_http_client_filter = { const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op, hc_start_transport_stream_op_batch,
grpc_channel_next_op, grpc_channel_next_op,
sizeof(call_data), sizeof(call_data),
init_call_elem, init_call_elem,

@ -61,14 +61,11 @@ typedef struct call_data {
pointer | CANCELLED_BIT - request was cancelled with error pointed to */ pointer | CANCELLED_BIT - request was cancelled with error pointed to */
gpr_atm send_initial_metadata_state; gpr_atm send_initial_metadata_state;
grpc_transport_stream_op_batch *send_op; grpc_transport_stream_op_batch *send_message_batch;
uint32_t send_length;
uint32_t send_flags;
grpc_slice incoming_slice;
grpc_slice_buffer_stream replacement_stream; grpc_slice_buffer_stream replacement_stream;
grpc_closure *post_send; grpc_closure *original_send_message_on_complete;
grpc_closure send_done; grpc_closure send_message_on_complete;
grpc_closure got_slice; grpc_closure on_send_message_next_done;
} call_data; } call_data;
typedef struct channel_data { typedef struct channel_data {
@ -164,24 +161,25 @@ static grpc_error *process_send_initial_metadata(
return error; return error;
} }
static void continue_send_message(grpc_exec_ctx *exec_ctx, static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *elem); grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { call_data *calld = (call_data *)elem->call_data;
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices);
calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete,
GRPC_ERROR_REF(error));
} }
static void finish_send_message(grpc_exec_ctx *exec_ctx, static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
call_data *calld = elem->call_data; call_data *calld = (call_data *)elem->call_data;
int did_compress; // Compress the data if appropriate.
grpc_slice_buffer tmp; grpc_slice_buffer tmp;
grpc_slice_buffer_init(&tmp); grpc_slice_buffer_init(&tmp);
did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, uint32_t send_flags =
&calld->slices, &tmp); calld->send_message_batch->payload->send_message.send_message->flags;
const bool did_compress = grpc_msg_compress(
exec_ctx, calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) { if (did_compress) {
if (GRPC_TRACER_ON(grpc_compression_trace)) { if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name; char *algo_name;
@ -195,7 +193,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, before_size, after_size, 100 * savings_ratio); algo_name, before_size, after_size, 100 * savings_ratio);
} }
grpc_slice_buffer_swap(&calld->slices, &tmp); grpc_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else { } else {
if (GRPC_TRACER_ON(grpc_compression_trace)) { if (GRPC_TRACER_ON(grpc_compression_trace)) {
char *algo_name; char *algo_name;
@ -207,83 +205,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
algo_name, calld->slices.length); algo_name, calld->slices.length);
} }
} }
grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); grpc_slice_buffer_destroy_internal(exec_ctx, &tmp);
// Swap out the original byte stream with our new one and send the
// batch down.
grpc_byte_stream_destroy(
exec_ctx, calld->send_message_batch->payload->send_message.send_message);
grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
calld->send_flags); send_flags);
calld->send_op->payload->send_message.send_message = calld->send_message_batch->payload->send_message.send_message =
&calld->replacement_stream.base; &calld->replacement_stream.base;
calld->post_send = calld->send_op->on_complete; calld->original_send_message_on_complete =
calld->send_op->on_complete = &calld->send_done; calld->send_message_batch->on_complete;
calld->send_message_batch->on_complete = &calld->send_message_on_complete;
grpc_call_next_op(exec_ctx, elem, calld->send_op); grpc_call_next_op(exec_ctx, elem, calld->send_message_batch);
} }
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { // Pulls a slice from the send_message byte stream and adds it to calld->slices.
grpc_call_element *elem = elemp; static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data; call_data *calld) {
if (GRPC_ERROR_NONE != grpc_slice incoming_slice;
grpc_byte_stream_pull(exec_ctx, grpc_error *error = grpc_byte_stream_pull(
calld->send_op->payload->send_message.send_message, exec_ctx, calld->send_message_batch->payload->send_message.send_message,
&calld->incoming_slice)) { &incoming_slice);
/* Should never reach here */ if (error == GRPC_ERROR_NONE) {
abort(); grpc_slice_buffer_add(&calld->slices, incoming_slice);
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem);
} else {
continue_send_message(exec_ctx, elem);
} }
return error;
} }
static void continue_send_message(grpc_exec_ctx *exec_ctx, // Reads as many slices as possible from the send_message byte stream.
grpc_call_element *elem) { // If all data has been read, invokes finish_send_message(). Otherwise,
call_data *calld = elem->call_data; // an async call to grpc_byte_stream_next() has been started, which will
// eventually result in calling on_send_message_next_done().
static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) {
call_data *calld = (call_data *)elem->call_data;
while (grpc_byte_stream_next( while (grpc_byte_stream_next(
exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, exec_ctx, calld->send_message_batch->payload->send_message.send_message,
&calld->got_slice)) { ~(size_t)0, &calld->on_send_message_next_done)) {
grpc_byte_stream_pull(exec_ctx, grpc_error *error = pull_slice_from_send_message(exec_ctx, calld);
calld->send_op->payload->send_message.send_message, if (error != GRPC_ERROR_NONE) return error;
&calld->incoming_slice); if (calld->slices.length ==
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); calld->send_message_batch->payload->send_message.send_message->length) {
if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem); finish_send_message(exec_ctx, elem);
break; break;
} }
} }
return GRPC_ERROR_NONE;
} }
static void handle_send_message_batch(grpc_exec_ctx *exec_ctx, // Async callback for grpc_byte_stream_next().
grpc_call_element *elem, static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *op, grpc_error *error) {
bool has_compression_algorithm) { grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = elem->call_data; call_data *calld = (call_data *)elem->call_data;
if (!skip_compression(elem, op->payload->send_message.send_message->flags, if (error != GRPC_ERROR_NONE) goto fail;
error = pull_slice_from_send_message(exec_ctx, calld);
if (error != GRPC_ERROR_NONE) goto fail;
if (calld->slices.length ==
calld->send_message_batch->payload->send_message.send_message->length) {
finish_send_message(exec_ctx, elem);
} else {
// This will either finish reading all of the data and invoke
// finish_send_message(), or else it will make an async call to
// grpc_byte_stream_next(), which will eventually result in calling
// this function again.
error = continue_reading_send_message(exec_ctx, elem);
if (error != GRPC_ERROR_NONE) goto fail;
}
return;
fail:
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->send_message_batch, error);
}
static void start_send_message_batch(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op_batch *batch,
bool has_compression_algorithm) {
call_data *calld = (call_data *)elem->call_data;
if (!skip_compression(elem, batch->payload->send_message.send_message->flags,
has_compression_algorithm)) { has_compression_algorithm)) {
calld->send_op = op; calld->send_message_batch = batch;
calld->send_length = op->payload->send_message.send_message->length; // This will either finish reading all of the data and invoke
calld->send_flags = op->payload->send_message.send_message->flags; // finish_send_message(), or else it will make an async call to
continue_send_message(exec_ctx, elem); // grpc_byte_stream_next(), which will eventually result in calling
// on_send_message_next_done().
grpc_error *error = continue_reading_send_message(exec_ctx, elem);
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, calld->send_message_batch, error);
}
} else { } else {
/* pass control down the stack */ /* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, batch);
} }
} }
static void compress_start_transport_stream_op_batch( static void compress_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) { grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0);
if (op->cancel_stream) { if (batch->cancel_stream) {
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); // TODO(roth): As part of the upcoming call combiner work, change
// this to call grpc_byte_stream_shutdown() on the incoming byte
// stream, to cancel any in-flight calls to grpc_byte_stream_next().
GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
gpr_atm cur = gpr_atm_full_xchg( gpr_atm cur = gpr_atm_full_xchg(
&calld->send_initial_metadata_state, &calld->send_initial_metadata_state,
CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error); CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error);
switch (cur) { switch (cur) {
case HAS_COMPRESSION_ALGORITHM: case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM:
@ -293,7 +326,7 @@ static void compress_start_transport_stream_op_batch(
if ((cur & CANCELLED_BIT) == 0) { if ((cur & CANCELLED_BIT) == 0) {
grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, (grpc_transport_stream_op_batch *)cur, exec_ctx, (grpc_transport_stream_op_batch *)cur,
GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error));
} else { } else {
GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT));
} }
@ -301,14 +334,15 @@ static void compress_start_transport_stream_op_batch(
} }
} }
if (op->send_initial_metadata) { if (batch->send_initial_metadata) {
bool has_compression_algorithm; bool has_compression_algorithm;
grpc_error *error = process_send_initial_metadata( grpc_error *error = process_send_initial_metadata(
exec_ctx, elem, exec_ctx, elem,
op->payload->send_initial_metadata.send_initial_metadata, batch->payload->send_initial_metadata.send_initial_metadata,
&has_compression_algorithm); &has_compression_algorithm);
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
error);
return; return;
} }
gpr_atm cur; gpr_atm cur;
@ -324,32 +358,32 @@ static void compress_start_transport_stream_op_batch(
goto retry_send_im; goto retry_send_im;
} }
if (cur != INITIAL_METADATA_UNSEEN) { if (cur != INITIAL_METADATA_UNSEEN) {
handle_send_message_batch(exec_ctx, elem, start_send_message_batch(exec_ctx, elem,
(grpc_transport_stream_op_batch *)cur, (grpc_transport_stream_op_batch *)cur,
has_compression_algorithm); has_compression_algorithm);
} }
} }
} }
if (op->send_message) { if (batch->send_message) {
gpr_atm cur; gpr_atm cur;
retry_send: retry_send:
cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); cur = gpr_atm_acq_load(&calld->send_initial_metadata_state);
switch (cur) { switch (cur) {
case INITIAL_METADATA_UNSEEN: case INITIAL_METADATA_UNSEEN:
if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur,
(gpr_atm)op)) { (gpr_atm)batch)) {
goto retry_send; goto retry_send;
} }
break; break;
case HAS_COMPRESSION_ALGORITHM: case HAS_COMPRESSION_ALGORITHM:
case NO_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM:
handle_send_message_batch(exec_ctx, elem, op, start_send_message_batch(exec_ctx, elem, batch,
cur == HAS_COMPRESSION_ALGORITHM); cur == HAS_COMPRESSION_ALGORITHM);
break; break;
default: default:
if (cur & CANCELLED_BIT) { if (cur & CANCELLED_BIT) {
grpc_transport_stream_op_batch_finish_with_failure( grpc_transport_stream_op_batch_finish_with_failure(
exec_ctx, op, exec_ctx, batch,
GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT)));
} else { } else {
/* >1 send_message concurrently */ /* >1 send_message concurrently */
@ -358,7 +392,7 @@ static void compress_start_transport_stream_op_batch(
} }
} else { } else {
/* pass control down the stack */ /* pass control down the stack */
grpc_call_next_op(exec_ctx, elem, op); grpc_call_next_op(exec_ctx, elem, batch);
} }
GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); GPR_TIMER_END("compress_start_transport_stream_op_batch", 0);
@ -373,10 +407,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
/* initialize members */ /* initialize members */
grpc_slice_buffer_init(&calld->slices); grpc_slice_buffer_init(&calld->slices);
GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, GRPC_CLOSURE_INIT(&calld->on_send_message_next_done,
grpc_schedule_on_exec_ctx); on_send_message_next_done, elem, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete,
grpc_schedule_on_exec_ctx); elem, grpc_schedule_on_exec_ctx);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }

@ -1173,6 +1173,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
return; /* early out */ return; /* early out */
} }
if (s->fetched_send_message_length == s->fetching_send_message->length) { if (s->fetched_send_message_length == s->fetching_send_message->length) {
grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
int64_t notify_offset = s->next_message_end_offset; int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) { if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step( grpc_chttp2_complete_closure_step(
@ -1195,9 +1196,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
return; /* early out */ return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
UINT32_MAX, &s->complete_fetch_locked)) { UINT32_MAX, &s->complete_fetch_locked)) {
grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, grpc_error *error = grpc_byte_stream_pull(
&s->fetching_slice); exec_ctx, s->fetching_send_message, &s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s); if (error != GRPC_ERROR_NONE) {
grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
} else {
add_fetched_slice_locked(exec_ctx, t, s);
}
} }
} }
} }
@ -1214,10 +1220,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
continue_fetching_send_locked(exec_ctx, t, s); continue_fetching_send_locked(exec_ctx, t, s);
} }
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */ grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message);
abort(); grpc_chttp2_cancel_stream(exec_ctx, t, s, error);
} }
} }
@ -2686,22 +2691,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored) { grpc_error *error_ignored);
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
grpc_chttp2_stream *s = bs->stream;
grpc_chttp2_transport *t = s->t;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
incoming_byte_stream_unref(exec_ctx, bs);
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) { grpc_byte_stream *byte_stream) {
@ -2768,6 +2760,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished(
return error; return error;
} }
static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_error *error) {
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, bs, error, true /* reset_on_error */));
}
static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = {
incoming_byte_stream_next, incoming_byte_stream_pull,
incoming_byte_stream_shutdown, incoming_byte_stream_destroy};
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
grpc_chttp2_stream *s = bs->stream;
grpc_chttp2_transport *t = s->t;
GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable);
incoming_byte_stream_unref(exec_ctx, bs);
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags) { uint32_t frame_size, uint32_t flags) {
@ -2776,9 +2795,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->base.length = frame_size; incoming_byte_stream->base.length = frame_size;
incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next; incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable;
incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
gpr_ref_init(&incoming_byte_stream->refs, 2); gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->transport = t; incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s; incoming_byte_stream->stream = s;

@ -72,6 +72,7 @@ typedef struct sb_list_entry {
typedef struct { typedef struct {
grpc_byte_stream base; grpc_byte_stream base;
sb_list_entry *le; sb_list_entry *le;
grpc_error *shutdown_error;
} inproc_slice_byte_stream; } inproc_slice_byte_stream;
typedef struct { typedef struct {
@ -201,24 +202,39 @@ static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs, grpc_byte_stream *bs,
grpc_slice *slice) { grpc_slice *slice) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
*slice = grpc_slice_buffer_take_first(&stream->le->sb); *slice = grpc_slice_buffer_take_first(&stream->le->sb);
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs,
grpc_error *error) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
}
static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs) { grpc_byte_stream *bs) {
inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
sb_list_entry_destroy(exec_ctx, stream->le); sb_list_entry_destroy(exec_ctx, stream->le);
GRPC_ERROR_UNREF(stream->shutdown_error);
} }
static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = {
inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull,
inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy};
void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s,
sb_list_entry *le) { sb_list_entry *le) {
s->base.length = (uint32_t)le->sb.length; s->base.length = (uint32_t)le->sb.length;
s->base.flags = 0; s->base.flags = 0;
s->base.next = inproc_slice_byte_stream_next; s->base.vtable = &inproc_slice_byte_stream_vtable;
s->base.pull = inproc_slice_byte_stream_pull;
s->base.destroy = inproc_slice_byte_stream_destroy;
s->le = le; s->le = le;
s->shutdown_error = GRPC_ERROR_NONE;
} }
static void ref_transport(inproc_transport *t) { static void ref_transport(inproc_transport *t) {
@ -956,11 +972,18 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(grpc_byte_stream_next(exec_ctx, GPR_ASSERT(grpc_byte_stream_next(exec_ctx,
op->payload->send_message.send_message, op->payload->send_message.send_message,
SIZE_MAX, &unused)); SIZE_MAX, &unused));
grpc_byte_stream_pull(exec_ctx, op->payload->send_message.send_message, error = grpc_byte_stream_pull(
&message_slice); exec_ctx, op->payload->send_message.send_message, &message_slice);
if (error != GRPC_ERROR_NONE) {
cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error));
break;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
remaining -= GRPC_SLICE_LENGTH(message_slice); remaining -= GRPC_SLICE_LENGTH(message_slice);
grpc_slice_buffer_add(dest, message_slice); grpc_slice_buffer_add(dest, message_slice);
} while (remaining != 0); } while (remaining != 0);
grpc_byte_stream_destroy(exec_ctx,
op->payload->send_message.send_message);
} }
if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) {
grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md

@ -19,29 +19,37 @@
#include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/byte_stream.h"
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_byte_stream *byte_stream, size_t max_size_hint,
grpc_closure *on_complete) { grpc_closure *on_complete) {
return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); return byte_stream->vtable->next(exec_ctx, byte_stream, max_size_hint,
on_complete);
} }
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
grpc_slice *slice) { grpc_slice *slice) {
return byte_stream->pull(exec_ctx, byte_stream, slice); return byte_stream->vtable->pull(exec_ctx, byte_stream, slice);
}
void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_error *error) {
byte_stream->vtable->shutdown(exec_ctx, byte_stream, error);
} }
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) { grpc_byte_stream *byte_stream) {
byte_stream->destroy(exec_ctx, byte_stream); byte_stream->vtable->destroy(exec_ctx, byte_stream);
} }
/* slice_buffer_stream */ // grpc_slice_buffer_stream
static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
@ -56,6 +64,9 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
grpc_slice *slice) { grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
GPR_ASSERT(stream->cursor < stream->backing_buffer->count); GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice = *slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
@ -63,8 +74,23 @@ static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_error *error) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = error;
}
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {} grpc_byte_stream *byte_stream) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
}
static const grpc_byte_stream_vtable slice_buffer_stream_vtable = {
slice_buffer_stream_next, slice_buffer_stream_pull,
slice_buffer_stream_shutdown, slice_buffer_stream_destroy};
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer, grpc_slice_buffer *slice_buffer,
@ -72,9 +98,89 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
GPR_ASSERT(slice_buffer->length <= UINT32_MAX); GPR_ASSERT(slice_buffer->length <= UINT32_MAX);
stream->base.length = (uint32_t)slice_buffer->length; stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags; stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next; stream->base.vtable = &slice_buffer_stream_vtable;
stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer; stream->backing_buffer = slice_buffer;
stream->cursor = 0; stream->cursor = 0;
stream->shutdown_error = GRPC_ERROR_NONE;
}
// grpc_caching_byte_stream
void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
grpc_byte_stream *underlying_stream) {
cache->underlying_stream = underlying_stream;
grpc_slice_buffer_init(&cache->cache_buffer);
}
void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream_cache *cache) {
grpc_byte_stream_destroy(exec_ctx, cache->underlying_stream);
grpc_slice_buffer_destroy_internal(exec_ctx, &cache->cache_buffer);
}
static bool caching_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
size_t max_size_hint,
grpc_closure *on_complete) {
grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) return true;
if (stream->cursor < stream->cache->cache_buffer.count) return true;
return grpc_byte_stream_next(exec_ctx, stream->cache->underlying_stream,
max_size_hint, on_complete);
}
static grpc_error *caching_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
if (stream->shutdown_error != GRPC_ERROR_NONE) {
return GRPC_ERROR_REF(stream->shutdown_error);
}
if (stream->cursor < stream->cache->cache_buffer.count) {
*slice = grpc_slice_ref_internal(
stream->cache->cache_buffer.slices[stream->cursor]);
++stream->cursor;
return GRPC_ERROR_NONE;
}
grpc_error *error =
grpc_byte_stream_pull(exec_ctx, stream->cache->underlying_stream, slice);
if (error == GRPC_ERROR_NONE) {
++stream->cursor;
grpc_slice_buffer_add(&stream->cache->cache_buffer,
grpc_slice_ref_internal(*slice));
}
return error;
}
static void caching_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_error *error) {
grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
stream->shutdown_error = GRPC_ERROR_REF(error);
grpc_byte_stream_shutdown(exec_ctx, stream->cache->underlying_stream, error);
}
static void caching_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_caching_byte_stream *stream = (grpc_caching_byte_stream *)byte_stream;
GRPC_ERROR_UNREF(stream->shutdown_error);
}
static const grpc_byte_stream_vtable caching_byte_stream_vtable = {
caching_byte_stream_next, caching_byte_stream_pull,
caching_byte_stream_shutdown, caching_byte_stream_destroy};
void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
grpc_byte_stream_cache *cache) {
memset(stream, 0, sizeof(*stream));
stream->base.length = cache->underlying_stream->length;
stream->base.flags = cache->underlying_stream->flags;
stream->base.vtable = &caching_byte_stream_vtable;
stream->cache = cache;
stream->shutdown_error = GRPC_ERROR_NONE;
}
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream) {
stream->cursor = 0;
} }

@ -28,52 +28,109 @@
/** Mask of all valid internal flags. */ /** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS) #define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
struct grpc_byte_stream;
typedef struct grpc_byte_stream grpc_byte_stream; typedef struct grpc_byte_stream grpc_byte_stream;
struct grpc_byte_stream { typedef struct {
uint32_t length;
uint32_t flags;
bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete); size_t max_size_hint, grpc_closure *on_complete);
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice); grpc_slice *slice);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_error *error);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
} grpc_byte_stream_vtable;
struct grpc_byte_stream {
uint32_t length;
uint32_t flags;
const grpc_byte_stream_vtable *vtable;
}; };
/* returns 1 if the bytes are available immediately (in which case // Returns true if the bytes are available immediately (in which case
* on_complete will not be called), 0 if the bytes will be available // on_complete will not be called), false if the bytes will be available
* asynchronously. // asynchronously.
* //
* max_size_hint can be set as a hint as to the maximum number // max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read. // of bytes that would be acceptable to read.
*/ bool grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, size_t max_size_hint,
grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_closure *on_complete);
grpc_closure *on_complete);
/* returns the next slice in the byte stream when it is ready (indicated by // Returns the next slice in the byte stream when it is ready (indicated by
* either grpc_byte_stream_next returning 1 or on_complete passed to // either grpc_byte_stream_next returning true or on_complete passed to
* grpc_byte_stream_next is called). // grpc_byte_stream_next is called).
* //
* once a slice is returned into *slice, it is owned by the caller. // Once a slice is returned into *slice, it is owned by the caller.
*/
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
grpc_slice *slice); grpc_slice *slice);
// Shuts down the byte stream.
//
// If there is a pending call to on_complete from grpc_byte_stream_next(),
// it will be invoked with the error passed to grpc_byte_stream_shutdown().
//
// The next call to grpc_byte_stream_pull() (if any) will return the error
// passed to grpc_byte_stream_shutdown().
void grpc_byte_stream_shutdown(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_error *error);
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream); grpc_byte_stream *byte_stream);
/* grpc_byte_stream that wraps a slice buffer */ // grpc_slice_buffer_stream
//
// A grpc_byte_stream that wraps a slice buffer.
typedef struct grpc_slice_buffer_stream { typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base; grpc_byte_stream base;
grpc_slice_buffer *backing_buffer; grpc_slice_buffer *backing_buffer;
size_t cursor; size_t cursor;
grpc_error *shutdown_error;
} grpc_slice_buffer_stream; } grpc_slice_buffer_stream;
void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream, void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
grpc_slice_buffer *slice_buffer, grpc_slice_buffer *slice_buffer,
uint32_t flags); uint32_t flags);
// grpc_caching_byte_stream
//
// A grpc_byte_stream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer. If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
// return whatever is in the backing buffer before continuing to read the
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
// grpc_caching_byte_streams simultaneously drawing from the same underlying
// grpc_byte_stream_cache at the same time.
typedef struct {
grpc_byte_stream *underlying_stream;
grpc_slice_buffer cache_buffer;
} grpc_byte_stream_cache;
// Takes ownership of underlying_stream.
void grpc_byte_stream_cache_init(grpc_byte_stream_cache *cache,
grpc_byte_stream *underlying_stream);
// Must not be called while still in use by a grpc_caching_byte_stream.
void grpc_byte_stream_cache_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream_cache *cache);
typedef struct {
grpc_byte_stream base;
grpc_byte_stream_cache *cache;
size_t cursor;
grpc_error *shutdown_error;
} grpc_caching_byte_stream;
void grpc_caching_byte_stream_init(grpc_caching_byte_stream *stream,
grpc_byte_stream_cache *cache);
// Resets the byte stream to the start of the underlying stream.
void grpc_caching_byte_stream_reset(grpc_caching_byte_stream *stream);
#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */ #endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */

@ -207,27 +207,35 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
return transport->vtable->get_endpoint(exec_ctx, transport); return transport->vtable->get_endpoint(exec_ctx, transport);
} }
// This comment should be sung to the tune of
// "Supercalifragilisticexpialidocious":
//
// grpc_transport_stream_op_batch_finish_with_failure // grpc_transport_stream_op_batch_finish_with_failure
// is a function that must always unref cancel_error // is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure // though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure // it's grpc_transport_stream_op_batch_finish_with_failure
void grpc_transport_stream_op_batch_finish_with_failure( void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
grpc_error *error) { grpc_error *error) {
if (op->recv_message) { if (batch->send_message) {
GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, grpc_byte_stream_destroy(exec_ctx,
batch->payload->send_message.send_message);
}
if (batch->recv_message) {
GRPC_CLOSURE_SCHED(exec_ctx,
batch->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
if (op->recv_initial_metadata) { if (batch->recv_initial_metadata) {
GRPC_CLOSURE_SCHED( GRPC_CLOSURE_SCHED(
exec_ctx, exec_ctx,
op->payload->recv_initial_metadata.recv_initial_metadata_ready, batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
} }
GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error); GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (op->cancel_stream) { if (batch->cancel_stream) {
GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); GRPC_ERROR_UNREF(batch->payload->cancel_stream.cancel_error);
} }
} }

@ -159,6 +159,11 @@ struct grpc_transport_stream_op_batch_payload {
} send_trailing_metadata; } send_trailing_metadata;
struct { struct {
// The transport (or a filter that decides to return a failure before
// the op gets down to the transport) is responsible for calling
// grpc_byte_stream_destroy() on this.
// The batch's on_complete will not be called until after the byte
// stream is destroyed.
grpc_byte_stream *send_message; grpc_byte_stream *send_message;
} send_message; } send_message;
@ -174,6 +179,10 @@ struct grpc_transport_stream_op_batch_payload {
} recv_initial_metadata; } recv_initial_metadata;
struct { struct {
// Will be set by the transport to point to the byte stream
// containing a received message.
// The caller is responsible for calling grpc_byte_stream_destroy()
// on this byte stream.
grpc_byte_stream **recv_message; grpc_byte_stream **recv_message;
/** Should be enqueued when one message is ready to be processed. */ /** Should be enqueued when one message is ready to be processed. */
grpc_closure *recv_message_ready; grpc_closure *recv_message_ready;

@ -35,6 +35,18 @@ grpc_cc_test(
], ],
) )
grpc_cc_test(
name = "byte_stream_test",
srcs = ["byte_stream_test.c"],
language = "C",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
)
grpc_cc_test( grpc_cc_test(
name = "connectivity_state_test", name = "connectivity_state_test",
srcs = ["connectivity_state_test.c"], srcs = ["connectivity_state_test.c"],

@ -0,0 +1,279 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include "src/core/lib/transport/byte_stream.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/util/test_config.h"
//
// grpc_slice_buffer_stream tests
//
static void not_called_closure(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_ASSERT(false);
}
static void test_slice_buffer_stream_basic(void) {
gpr_log(GPR_DEBUG, "test_slice_buffer_stream_basic");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
grpc_slice_buffer_init(&buffer);
grpc_slice input[] = {
grpc_slice_from_static_string("foo"),
grpc_slice_from_static_string("bar"),
};
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
grpc_slice_buffer_stream stream;
grpc_slice_buffer_stream_init(&stream, &buffer, 0);
GPR_ASSERT(stream.base.length == 6);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
grpc_schedule_on_exec_ctx);
// Read each slice. Note that next() always returns synchronously.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
grpc_slice output;
grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(&exec_ctx, output);
}
// Clean up.
grpc_byte_stream_destroy(&exec_ctx, &stream.base);
grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_slice_buffer_stream_shutdown(void) {
gpr_log(GPR_DEBUG, "test_slice_buffer_stream_shutdown");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// Create and populate slice buffer.
grpc_slice_buffer buffer;
grpc_slice_buffer_init(&buffer);
grpc_slice input[] = {
grpc_slice_from_static_string("foo"),
grpc_slice_from_static_string("bar"),
};
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
// Create byte stream.
grpc_slice_buffer_stream stream;
grpc_slice_buffer_stream_init(&stream, &buffer, 0);
GPR_ASSERT(stream.base.length == 6);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
grpc_schedule_on_exec_ctx);
// Read the first slice.
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
grpc_slice output;
grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(&exec_ctx, output);
// Now shutdown.
grpc_error *shutdown_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
grpc_byte_stream_shutdown(&exec_ctx, &stream.base,
GRPC_ERROR_REF(shutdown_error));
// After shutdown, the next pull() should return the error.
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == shutdown_error);
GRPC_ERROR_UNREF(error);
GRPC_ERROR_UNREF(shutdown_error);
// Clean up.
grpc_byte_stream_destroy(&exec_ctx, &stream.base);
grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
grpc_exec_ctx_finish(&exec_ctx);
}
//
// grpc_caching_byte_stream tests
//
static void test_caching_byte_stream_basic(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_basic");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
grpc_slice_buffer_init(&buffer);
grpc_slice input[] = {
grpc_slice_from_static_string("foo"),
grpc_slice_from_static_string("bar"),
};
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
// Create cache and caching stream.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream;
grpc_caching_byte_stream_init(&stream, &cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
grpc_schedule_on_exec_ctx);
// Read each slice. Note that next() always returns synchronously,
// because the underlying byte stream always does.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
grpc_slice output;
grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(&exec_ctx, output);
}
// Clean up.
grpc_byte_stream_destroy(&exec_ctx, &stream.base);
grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_caching_byte_stream_reset(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_reset");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
grpc_slice_buffer_init(&buffer);
grpc_slice input[] = {
grpc_slice_from_static_string("foo"),
grpc_slice_from_static_string("bar"),
};
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
// Create cache and caching stream.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream;
grpc_caching_byte_stream_init(&stream, &cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
grpc_schedule_on_exec_ctx);
// Read one slice.
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
grpc_slice output;
grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(&exec_ctx, output);
// Reset the caching stream. The reads should start over from the
// first slice.
grpc_caching_byte_stream_reset(&stream);
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&exec_ctx, &stream.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(&exec_ctx, output);
}
// Clean up.
grpc_byte_stream_destroy(&exec_ctx, &stream.base);
grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
grpc_exec_ctx_finish(&exec_ctx);
}
static void test_caching_byte_stream_shared_cache(void) {
gpr_log(GPR_DEBUG, "test_caching_byte_stream_shared_cache");
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
// Create and populate slice buffer byte stream.
grpc_slice_buffer buffer;
grpc_slice_buffer_init(&buffer);
grpc_slice input[] = {
grpc_slice_from_static_string("foo"),
grpc_slice_from_static_string("bar"),
};
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
grpc_slice_buffer_add(&buffer, input[i]);
}
grpc_slice_buffer_stream underlying_stream;
grpc_slice_buffer_stream_init(&underlying_stream, &buffer, 0);
// Create cache and two caching streams.
grpc_byte_stream_cache cache;
grpc_byte_stream_cache_init(&cache, &underlying_stream.base);
grpc_caching_byte_stream stream1;
grpc_caching_byte_stream_init(&stream1, &cache);
grpc_caching_byte_stream stream2;
grpc_caching_byte_stream_init(&stream2, &cache);
grpc_closure closure;
GRPC_CLOSURE_INIT(&closure, not_called_closure, NULL,
grpc_schedule_on_exec_ctx);
// Read one slice from stream1.
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
grpc_slice output;
grpc_error *error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[0], output));
grpc_slice_unref_internal(&exec_ctx, output);
// Read all slices from stream2.
for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream2.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&exec_ctx, &stream2.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[i], output));
grpc_slice_unref_internal(&exec_ctx, output);
}
// Now read the second slice from stream1.
GPR_ASSERT(
grpc_byte_stream_next(&exec_ctx, &stream1.base, ~(size_t)0, &closure));
error = grpc_byte_stream_pull(&exec_ctx, &stream1.base, &output);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(grpc_slice_eq(input[1], output));
grpc_slice_unref_internal(&exec_ctx, output);
// Clean up.
grpc_byte_stream_destroy(&exec_ctx, &stream1.base);
grpc_byte_stream_destroy(&exec_ctx, &stream2.base);
grpc_byte_stream_cache_destroy(&exec_ctx, &cache);
grpc_slice_buffer_destroy_internal(&exec_ctx, &buffer);
grpc_exec_ctx_finish(&exec_ctx);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_slice_buffer_stream_basic();
test_slice_buffer_stream_shutdown();
test_caching_byte_stream_basic();
test_caching_byte_stream_reset();
test_caching_byte_stream_shared_cache();
return 0;
}

@ -164,6 +164,23 @@
"third_party": false, "third_party": false,
"type": "target" "type": "target"
}, },
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c",
"name": "byte_stream_test",
"src": [
"test/core/transport/byte_stream_test.c"
],
"third_party": false,
"type": "target"
},
{ {
"deps": [ "deps": [
"gpr", "gpr",

@ -201,6 +201,28 @@
"windows" "windows"
] ]
}, },
{
"args": [],
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": false,
"language": "c",
"name": "byte_stream_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
]
},
{ {
"args": [], "args": [],
"ci_platforms": [ "ci_platforms": [

@ -118,6 +118,17 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "bin_encoder_test", "vcxproj
{29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9} {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
EndProjectSection EndProjectSection
EndProject EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "byte_stream_test", "vcxproj\test\byte_stream_test\byte_stream_test.vcxproj", "{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}"
ProjectSection(myProperties) = preProject
lib = "False"
EndProjectSection
ProjectSection(ProjectDependencies) = postProject
{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}
{29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9}
{EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037}
{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "census_context_test", "vcxproj\test\census_context_test\census_context_test.vcxproj", "{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}" Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "census_context_test", "vcxproj\test\census_context_test\census_context_test.vcxproj", "{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}"
ProjectSection(myProperties) = preProject ProjectSection(myProperties) = preProject
lib = "False" lib = "False"
@ -1940,6 +1951,22 @@ Global
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|Win32.Build.0 = Release|Win32 {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|Win32.Build.0 = Release|Win32
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.ActiveCfg = Release|x64 {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.ActiveCfg = Release|x64
{D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.Build.0 = Release|x64 {D5C70922-D68E-0E9D-9988-995E0F9A79AE}.Release-DLL|x64.Build.0 = Release|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.ActiveCfg = Debug|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.ActiveCfg = Debug|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.ActiveCfg = Release|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.ActiveCfg = Release|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|Win32.Build.0 = Debug|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug|x64.Build.0 = Debug|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|Win32.Build.0 = Release|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release|x64.Build.0 = Release|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.ActiveCfg = Debug|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|Win32.Build.0 = Debug|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.ActiveCfg = Debug|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Debug-DLL|x64.Build.0 = Debug|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.ActiveCfg = Release|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|Win32.Build.0 = Release|Win32
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.ActiveCfg = Release|x64
{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}.Release-DLL|x64.Build.0 = Release|x64
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|Win32.ActiveCfg = Debug|Win32 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|Win32.ActiveCfg = Debug|Win32
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|x64.ActiveCfg = Debug|x64 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Debug|x64.ActiveCfg = Debug|x64
{5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Release|Win32.ActiveCfg = Release|Win32 {5C1CFC2D-AF3C-D7CB-BA74-D267E91CBC73}.Release|Win32.ActiveCfg = Release|Win32

@ -0,0 +1,199 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\1.0.204.1.props')" />
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{9AEDA345-E3E8-BFE9-11BF-64949EF41C9C}</ProjectGuid>
<IgnoreWarnIntDirInTempDetected>true</IgnoreWarnIntDirInTempDetected>
<IntDir>$(SolutionDir)IntDir\$(MSBuildProjectName)\</IntDir>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(VisualStudioVersion)' == '10.0'" Label="Configuration">
<PlatformToolset>v100</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(VisualStudioVersion)' == '11.0'" Label="Configuration">
<PlatformToolset>v110</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(VisualStudioVersion)' == '12.0'" Label="Configuration">
<PlatformToolset>v120</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(VisualStudioVersion)' == '14.0'" Label="Configuration">
<PlatformToolset>v140</PlatformToolset>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Debug'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
<Import Project="$(SolutionDir)\..\vsprojects\global.props" />
<Import Project="$(SolutionDir)\..\vsprojects\openssl.props" />
<Import Project="$(SolutionDir)\..\vsprojects\winsock.props" />
<Import Project="$(SolutionDir)\..\vsprojects\zlib.props" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)'=='Debug'">
<TargetName>byte_stream_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Debug</Configuration-grpc_dependencies_zlib>
<Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
<Configuration-grpc_dependencies_openssl>Debug</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)'=='Release'">
<TargetName>byte_stream_test</TargetName>
<Linkage-grpc_dependencies_zlib>static</Linkage-grpc_dependencies_zlib>
<Configuration-grpc_dependencies_zlib>Release</Configuration-grpc_dependencies_zlib>
<Linkage-grpc_dependencies_openssl>static</Linkage-grpc_dependencies_openssl>
<Configuration-grpc_dependencies_openssl>Release</Configuration-grpc_dependencies_openssl>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
<TreatWarningAsError>true</TreatWarningAsError>
<DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
<MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
<GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<SDLCheck>true</SDLCheck>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
<TreatWarningAsError>true</TreatWarningAsError>
<DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
<MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
<GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>MaxSpeed</Optimization>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<TreatWarningAsError>true</TreatWarningAsError>
<DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
<MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
<GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>MaxSpeed</Optimization>
<PreprocessorDefinitions>WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<TreatWarningAsError>true</TreatWarningAsError>
<DebugInformationFormat Condition="$(Jenkins)">None</DebugInformationFormat>
<MinimalRebuild Condition="$(Jenkins)">false</MinimalRebuild>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation Condition="!$(Jenkins)">true</GenerateDebugInformation>
<GenerateDebugInformation Condition="$(Jenkins)">false</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc_test_util\grpc_test_util.vcxproj">
<Project>{17BCAFC0-5FDC-4C94-AEB9-95F3E220614B}</Project>
</ProjectReference>
<ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\grpc\grpc.vcxproj">
<Project>{29D16885-7228-4C31-81ED-5F9187C7F2A9}</Project>
</ProjectReference>
<ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr_test_util\gpr_test_util.vcxproj">
<Project>{EAB0A629-17A9-44DB-B5FF-E91A721FE037}</Project>
</ProjectReference>
<ProjectReference Include="$(SolutionDir)\..\vsprojects\vcxproj\.\gpr\gpr.vcxproj">
<Project>{B23D3D1A-9438-4EDA-BEB6-9A0A03D17792}</Project>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
<Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
<Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies\grpc.dependencies.zlib.targets')" />
<Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
<Import Project="$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets" Condition="Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies\grpc.dependencies.openssl.targets')" />
</ImportGroup>
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.redist.1.2.8.10\build\native\grpc.dependencies.zlib.redist.targets')" />
<Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.zlib.1.2.8.10\build\native\grpc.dependencies.zlib.targets')" />
<Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.redist.1.0.204.1\build\native\grpc.dependencies.openssl.redist.targets')" />
<Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.props')" />
<Error Condition="!Exists('$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\..\vsprojects\packages\grpc.dependencies.openssl.1.0.204.1\build\native\grpc.dependencies.openssl.targets')" />
</Target>
</Project>

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="$(SolutionDir)\..\test\core\transport\byte_stream_test.c">
<Filter>test\core\transport</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="test">
<UniqueIdentifier>{f172d292-4ad6-342a-f27a-096c06d43a31}</UniqueIdentifier>
</Filter>
<Filter Include="test\core">
<UniqueIdentifier>{d7f690de-dfe0-56fc-ff3b-38eec3931699}</UniqueIdentifier>
</Filter>
<Filter Include="test\core\transport">
<UniqueIdentifier>{f78f56ef-47df-c99d-18f0-86277f7013f3}</UniqueIdentifier>
</Filter>
</ItemGroup>
</Project>
Loading…
Cancel
Save