Revert "Revert "Implement lazy deframe""

pull/10626/head
Muxi Yan 8 years ago committed by GitHub
parent 1838a8f22f
commit 29723ee1ba
  1. 338
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 365
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 24
      src/core/ext/transport/chttp2/transport/frame_data.h
  4. 51
      src/core/ext/transport/chttp2/transport/internal.h
  5. 5
      src/core/ext/transport/chttp2/transport/parsing.c
  6. 17
      src/core/ext/transport/cronet/transport/cronet_transport.c
  7. 14
      src/core/lib/channel/compress_filter.c
  8. 14
      src/core/lib/channel/http_client_filter.c
  9. 44
      src/core/lib/surface/call.c
  10. 32
      src/core/lib/transport/byte_stream.c
  11. 21
      src/core/lib/transport/byte_stream.h
  12. 9
      test/cpp/microbenchmarks/bm_chttp2_transport.cc

@ -44,6 +44,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -129,6 +130,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored); grpc_error *error_ignored);
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error); grpc_error *error);
@ -174,6 +180,9 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error); grpc_error *error);
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/******************************************************************************* /*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -655,7 +664,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is /* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively read-closed. The others are for incoming_byte_streams that are actively
reading */ reading */
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2"); GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@ -665,6 +673,11 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@ -682,7 +695,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) { grpc_error *error) {
grpc_byte_stream *bs;
grpc_chttp2_stream *s = sp; grpc_chttp2_stream *s = sp;
grpc_chttp2_transport *t = s->t; grpc_chttp2_transport *t = s->t;
@ -693,9 +705,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
} }
while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) { grpc_slice_buffer_destroy_internal(exec_ctx,
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); &s->unprocessed_incoming_frames_buffer);
} grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s);
@ -722,6 +734,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) { if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@ -1175,8 +1188,9 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL; s->fetching_send_message = NULL;
return; /* early out */ return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
&s->fetching_slice, UINT32_MAX, UINT32_MAX, &s->complete_fetch_locked)) {
&s->complete_fetch_locked)) { grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
&s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s); add_fetched_slice_locked(exec_ctx, t, s);
} }
} }
@ -1187,9 +1201,15 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs; grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t; grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(exec_ctx, t, s); error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message,
continue_fetching_send_locked(exec_ctx, t, s); &s->fetching_slice);
} else { if (error == GRPC_ERROR_NONE) {
add_fetched_slice_locked(exec_ctx, t, s);
continue_fetching_send_locked(exec_ctx, t, s);
}
}
if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */ /* TODO(ctiller): what to do here */
abort(); abort();
} }
@ -1424,8 +1444,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(s->recv_message_ready == NULL);
s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message; s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0 && if (s->id != 0 && s->frame_storage.length == 0) {
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
} }
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@ -1614,13 +1633,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL && if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) { if (s->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
NULL) { if (!s->pending_byte_stream) {
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
} }
} }
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
@ -1633,39 +1652,65 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_byte_stream *bs; grpc_error *error = GRPC_ERROR_NONE;
if (s->recv_message_ready != NULL) { if (s->recv_message_ready != NULL) {
while (s->final_metadata_requested && s->seen_error && *s->recv_message = NULL;
(bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != if (s->final_metadata_requested && s->seen_error) {
NULL) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
} }
if (s->incoming_frames.head != NULL) { if (!s->pending_byte_stream) {
*s->recv_message = while (s->unprocessed_incoming_frames_buffer.length > 0 ||
grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames); s->frame_storage.length > 0) {
GPR_ASSERT(*s->recv_message != NULL); if (s->unprocessed_incoming_frames_buffer.length == 0) {
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->frame_storage);
}
error = deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s,
&s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
&s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
break;
} else if (*s->recv_message != NULL) {
break;
}
}
}
if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL; *s->recv_message = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} }
GRPC_ERROR_UNREF(error);
} }
} }
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_byte_stream *bs;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != NULL && s->read_closed && if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) { s->write_closed) {
if (s->seen_error) { if (s->seen_error) {
while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) != grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage);
NULL) { if (!s->pending_byte_stream) {
incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE); grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
} }
} }
if (s->all_incoming_byte_streams_finished && bool pending_data = s->pending_byte_stream ||
s->unprocessed_incoming_frames_buffer.length > 0;
if (s->read_closed && s->frame_storage.length == 0 &&
(!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) { s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@ -1676,14 +1721,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
} }
} }
static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
}
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) { uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@ -1692,10 +1729,19 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL; t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
} }
if (s->data_parser.parsing_frame != NULL) { if (s->pending_byte_stream) {
grpc_chttp2_incoming_byte_stream_finished( if (s->on_next != NULL) {
exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error)); grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
s->data_parser.parsing_frame = NULL; if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
incoming_byte_stream_publish_error(exec_ctx, bs, error);
incoming_byte_stream_unref(exec_ctx, bs);
s->data_parser.parsing_frame = NULL;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
}
} }
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@ -1881,7 +1927,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
} }
} }
decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} }
@ -2419,12 +2464,28 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM * BYTE STREAM
*/ */
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
s->byte_stream_error = error;
}
}
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) { grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) { if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs); gpr_free(bs);
} }
} }
@ -2484,47 +2545,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport; grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream; grpc_chttp2_stream *s = bs->stream;
if (bs->is_tail) { size_t cur_length = s->frame_storage.length;
gpr_mu_lock(&bs->slice_mu); incoming_byte_stream_update_flow_control(
size_t cur_length = bs->slices.length; exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_update_flow_control( GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); if (s->frame_storage.length > 0) {
} grpc_slice_buffer_swap(&s->frame_storage,
gpr_mu_lock(&bs->slice_mu); &s->unprocessed_incoming_frames_buffer);
if (bs->slices.count > 0) { grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
*bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices); } else if (s->byte_stream_error != GRPC_ERROR_NONE) {
grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
} else if (bs->error != GRPC_ERROR_NONE) { GRPC_ERROR_REF(s->byte_stream_error));
grpc_closure_run(exec_ctx, bs->next_action.on_complete, if (s->data_parser.parsing_frame != NULL) {
GRPC_ERROR_REF(bs->error)); incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = NULL;
}
} else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = NULL;
}
} else {
/* Should never reach here. */
GPR_ASSERT(false);
}
} else { } else {
bs->on_next = bs->next_action.on_complete; s->on_next = bs->next_action.on_complete;
bs->next = bs->next_action.slice;
} }
gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
} }
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
grpc_slice *slice, size_t max_size_hint, size_t max_size_hint,
grpc_closure *on_complete) { grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs = grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream; (grpc_chttp2_incoming_byte_stream *)byte_stream;
gpr_ref(&bs->refs); grpc_chttp2_stream *s = bs->stream;
bs->next_action.slice = slice; if (s->unprocessed_incoming_frames_buffer.length > 0) {
bs->next_action.max_size_hint = max_size_hint; return true;
bs->next_action.on_complete = on_complete; } else {
grpc_closure_sched( gpr_ref(&bs->refs);
exec_ctx, bs->next_action.max_size_hint = max_size_hint;
grpc_closure_init( bs->next_action.on_complete = on_complete;
&bs->next_action.closure, incoming_byte_stream_next_locked, bs, grpc_closure_sched(
grpc_combiner_scheduler(bs->transport->combiner, false)), exec_ctx,
GRPC_ERROR_NONE); grpc_closure_init(
GPR_TIMER_END("incoming_byte_stream_next", 0); &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
return 0; grpc_combiner_scheduler(bs->transport->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return false;
}
}
static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
grpc_error *error = deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
grpc_error *error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
GPR_TIMER_END("incoming_byte_stream_pull", 0);
return GRPC_ERROR_NONE;
} }
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -2534,9 +2638,14 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored) { grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream; grpc_chttp2_incoming_byte_stream *bs = byte_stream;
grpc_chttp2_stream *s = bs->stream;
grpc_chttp2_transport *t = s->t;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
} }
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -2556,50 +2665,53 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error( static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) { grpc_error *error) {
grpc_chttp2_stream *s = bs->stream;
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL; s->on_next = NULL;
GRPC_ERROR_UNREF(bs->error); GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
bs->error = error;
} }
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_incoming_byte_stream_push(
grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice) { grpc_slice slice, grpc_slice *slice_out) {
gpr_mu_lock(&bs->slice_mu); grpc_chttp2_stream *s = bs->stream;
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
incoming_byte_stream_publish_error( grpc_error *error =
exec_ctx, bs, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(exec_ctx, slice);
return error;
} else { } else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (bs->on_next != NULL) { if (slice_out != NULL) {
*bs->next = slice; *slice_out = slice;
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
} else {
grpc_slice_buffer_add(&bs->slices, slice);
} }
return GRPC_ERROR_NONE;
} }
gpr_mu_unlock(&bs->slice_mu);
} }
void grpc_chttp2_incoming_byte_stream_finished( grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) { grpc_error *error, bool reset_on_error) {
grpc_chttp2_stream *s = bs->stream;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) { if (bs->remaining_bytes != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
} }
gpr_mu_unlock(&bs->slice_mu);
} }
if (error != GRPC_ERROR_NONE) { if (error != GRPC_ERROR_NONE && reset_on_error) {
incoming_byte_stream_publish_error(exec_ctx, bs, error); grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
} }
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
return error;
} }
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@ -2611,26 +2723,12 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next; incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2); gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t; incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s; incoming_byte_stream->stream = s;
gpr_ref(&incoming_byte_stream->stream->active_streams); s->byte_stream_error = GRPC_ERROR_NONE;
grpc_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
incoming_byte_stream->error = GRPC_ERROR_NONE;
grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
if (q->head == NULL) {
q->head = incoming_byte_stream;
} else {
q->tail->is_tail = 0;
q->tail->next_message = incoming_byte_stream;
}
q->tail = incoming_byte_stream;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
return incoming_byte_stream; return incoming_byte_stream;
} }

@ -40,6 +40,7 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
@ -53,16 +54,17 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) { grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) { if (parser->parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream_finished( GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame, exec_ctx, parser->parsing_frame,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false));
} }
GRPC_ERROR_UNREF(parser->error); GRPC_ERROR_UNREF(parser->error);
} }
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags, uint8_t flags,
uint32_t stream_id) { uint32_t stream_id,
grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg; char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@ -74,47 +76,14 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
} }
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
parser->is_last_frame = 1; s->received_last_frame = true;
} else { } else {
parser->is_last_frame = 0; s->received_last_frame = false;
} }
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_chttp2_incoming_frame_queue_merge(
grpc_chttp2_incoming_frame_queue *head_dst,
grpc_chttp2_incoming_frame_queue *tail_src) {
if (tail_src->head == NULL) {
return;
}
if (head_dst->head == NULL) {
*head_dst = *tail_src;
memset(tail_src, 0, sizeof(*tail_src));
return;
}
head_dst->tail->next_message = tail_src->head;
head_dst->tail = tail_src->tail;
memset(tail_src, 0, sizeof(*tail_src));
}
grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_incoming_frame_queue *q) {
grpc_byte_stream *out;
if (q->head == NULL) {
return NULL;
}
out = &q->head->base;
if (q->head == q->tail) {
memset(q, 0, sizeof(*q));
} else {
q->head = q->head->next_message;
}
return out;
}
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof, uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats *stats, grpc_transport_one_way_stats *stats,
@ -143,145 +112,217 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes; stats->data_bytes += write_bytes;
} }
static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx, grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p, grpc_chttp2_data_parser *p,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_chttp2_stream *s,
grpc_slice slice) { grpc_slice_buffer *slices,
uint8_t *const beg = GRPC_SLICE_START_PTR(slice); grpc_slice *slice_out,
uint8_t *const end = GRPC_SLICE_END_PTR(slice); grpc_byte_stream **stream_out) {
uint8_t *cur = beg; grpc_error *error = GRPC_ERROR_NONE;
uint32_t message_flags; grpc_chttp2_transport *t = s->t;
grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
char *msg;
if (cur == end) { while (slices->count > 0) {
return GRPC_ERROR_NONE; uint8_t *beg = NULL;
} uint8_t *end = NULL;
uint8_t *cur = NULL;
switch (p->state) { grpc_slice slice = grpc_slice_buffer_take_first(slices);
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR; beg = GRPC_SLICE_START_PTR(slice);
return GRPC_ERROR_REF(p->error); end = GRPC_SLICE_END_PTR(slice);
fh_0: cur = beg;
case GRPC_CHTTP2_DATA_FH_0: uint32_t message_flags;
s->stats.incoming.framing_bytes++; char *msg;
p->frame_type = *cur;
switch (p->frame_type) { if (cur == end) {
case 0: grpc_slice_unref_internal(exec_ctx, slice);
p->is_frame_compressed = 0; /* GPR_FALSE */ continue;
break; }
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */ switch (p->state) {
break; case GRPC_CHTTP2_DATA_ERROR:
default: p->state = GRPC_CHTTP2_DATA_ERROR;
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); grpc_slice_unref_internal(exec_ctx, slice);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); return GRPC_ERROR_REF(p->error);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, case GRPC_CHTTP2_DATA_FH_0:
(intptr_t)s->id); p->frame_type = *cur;
gpr_free(msg); switch (p->frame_type) {
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); case 0:
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, p->is_frame_compressed = false; /* GPR_FALSE */
grpc_slice_from_copied_string(msg)); break;
gpr_free(msg); case 1:
p->error = p->is_frame_compressed = true; /* GPR_TRUE */
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); break;
p->state = GRPC_CHTTP2_DATA_ERROR; default:
return GRPC_ERROR_REF(p->error); gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
} p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
if (++cur == end) { p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
p->state = GRPC_CHTTP2_DATA_FH_1; (intptr_t)s->id);
return GRPC_ERROR_NONE; gpr_free(msg);
} msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
/* fallthrough */ p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
case GRPC_CHTTP2_DATA_FH_1: grpc_slice_from_copied_string(msg));
s->stats.incoming.framing_bytes++; gpr_free(msg);
p->frame_size = ((uint32_t)*cur) << 24; p->error =
if (++cur == end) { grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_FH_2; p->state = GRPC_CHTTP2_DATA_ERROR;
return GRPC_ERROR_NONE; grpc_slice_unref_internal(exec_ctx, slice);
} return GRPC_ERROR_REF(p->error);
/* fallthrough */ }
case GRPC_CHTTP2_DATA_FH_2: if (++cur == end) {
s->stats.incoming.framing_bytes++; p->state = GRPC_CHTTP2_DATA_FH_1;
p->frame_size |= ((uint32_t)*cur) << 16; grpc_slice_unref_internal(exec_ctx, slice);
if (++cur == end) { continue;
p->state = GRPC_CHTTP2_DATA_FH_3; }
return GRPC_ERROR_NONE; /* fallthrough */
} case GRPC_CHTTP2_DATA_FH_1:
/* fallthrough */ p->frame_size = ((uint32_t)*cur) << 24;
case GRPC_CHTTP2_DATA_FH_3: if (++cur == end) {
s->stats.incoming.framing_bytes++; p->state = GRPC_CHTTP2_DATA_FH_2;
p->frame_size |= ((uint32_t)*cur) << 8; grpc_slice_unref_internal(exec_ctx, slice);
if (++cur == end) { continue;
p->state = GRPC_CHTTP2_DATA_FH_4; }
return GRPC_ERROR_NONE; /* fallthrough */
} case GRPC_CHTTP2_DATA_FH_2:
/* fallthrough */ p->frame_size |= ((uint32_t)*cur) << 16;
case GRPC_CHTTP2_DATA_FH_4: if (++cur == end) {
s->stats.incoming.framing_bytes++; p->state = GRPC_CHTTP2_DATA_FH_3;
p->frame_size |= ((uint32_t)*cur); grpc_slice_unref_internal(exec_ctx, slice);
p->state = GRPC_CHTTP2_DATA_FRAME; continue;
++cur; }
message_flags = 0; /* fallthrough */
if (p->is_frame_compressed) { case GRPC_CHTTP2_DATA_FH_3:
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; p->frame_size |= ((uint32_t)*cur) << 8;
} if (++cur == end) {
p->parsing_frame = incoming_byte_stream = p->state = GRPC_CHTTP2_DATA_FH_4;
grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size, grpc_slice_unref_internal(exec_ctx, slice);
message_flags); continue;
/* fallthrough */ }
case GRPC_CHTTP2_DATA_FRAME: /* fallthrough */
if (cur == end) { case GRPC_CHTTP2_DATA_FH_4:
return GRPC_ERROR_NONE; GPR_ASSERT(stream_out != NULL);
} GPR_ASSERT(p->parsing_frame == NULL);
uint32_t remaining = (uint32_t)(end - cur); p->frame_size |= ((uint32_t)*cur);
if (remaining == p->frame_size) { p->state = GRPC_CHTTP2_DATA_FRAME;
s->stats.incoming.data_bytes += p->frame_size; ++cur;
grpc_chttp2_incoming_byte_stream_push( message_flags = 0;
exec_ctx, p->parsing_frame, if (p->is_frame_compressed) {
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, }
GRPC_ERROR_NONE); p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
p->parsing_frame = NULL; exec_ctx, t, s, p->frame_size, message_flags);
p->state = GRPC_CHTTP2_DATA_FH_0; *stream_out = &p->parsing_frame->base;
return GRPC_ERROR_NONE; if (p->parsing_frame->remaining_bytes == 0) {
} else if (remaining > p->frame_size) { GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
s->stats.incoming.data_bytes += p->frame_size; exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
grpc_chttp2_incoming_byte_stream_push( p->parsing_frame = NULL;
exec_ctx, p->parsing_frame, p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_sub(slice, (size_t)(cur - beg), }
(size_t)(cur + p->frame_size - beg))); s->pending_byte_stream = true;
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE); if (cur != end) {
p->parsing_frame = NULL; grpc_slice_buffer_undo_take_first(
cur += p->frame_size; &s->unprocessed_incoming_frames_buffer,
goto fh_0; /* loop */ grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
} else { }
GPR_ASSERT(remaining <= p->frame_size); grpc_slice_unref_internal(exec_ctx, slice);
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
p->frame_size -= remaining;
s->stats.incoming.data_bytes += remaining;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
if (GRPC_ERROR_NONE !=
(grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
} }
}
} }
GPR_UNREACHABLE_CODE( return GRPC_ERROR_NONE;
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
} }
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_chttp2_stream *s,
grpc_slice slice, int is_last) { grpc_slice slice, int is_last) {
grpc_chttp2_data_parser *p = parser; /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
grpc_error *error = parse_inner(exec_ctx, p, t, s, slice); s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} else if (s->on_next) {
GPR_ASSERT(s->frame_storage.length == 0);
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
}
if (is_last && p->is_last_frame) { if (is_last && s->received_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
return error; return GRPC_ERROR_NONE;
} }

@ -56,28 +56,16 @@ typedef enum {
typedef struct grpc_chttp2_incoming_byte_stream typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream; grpc_chttp2_incoming_byte_stream;
typedef struct grpc_chttp2_incoming_frame_queue {
grpc_chttp2_incoming_byte_stream *head;
grpc_chttp2_incoming_byte_stream *tail;
} grpc_chttp2_incoming_frame_queue;
typedef struct { typedef struct {
grpc_chttp2_stream_state state; grpc_chttp2_stream_state state;
uint8_t is_last_frame;
uint8_t frame_type; uint8_t frame_type;
uint32_t frame_size; uint32_t frame_size;
grpc_error *error; grpc_error *error;
int is_frame_compressed; bool is_frame_compressed;
grpc_chttp2_incoming_byte_stream *parsing_frame; grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser; } grpc_chttp2_data_parser;
void grpc_chttp2_incoming_frame_queue_merge(
grpc_chttp2_incoming_frame_queue *head_dst,
grpc_chttp2_incoming_frame_queue *tail_src);
grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_incoming_frame_queue *q);
/* initialize per-stream state for data frame parsing */ /* initialize per-stream state for data frame parsing */
grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser); grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
@ -87,7 +75,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */ /* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags, uint8_t flags,
uint32_t stream_id); uint32_t stream_id,
grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a /* handle a slice of a data frame - is_last indicates the last slice of a
frame */ frame */
@ -101,4 +90,11 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats, grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf); grpc_slice_buffer *outbuf);
grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s,
grpc_slice_buffer *slices,
grpc_slice *slice_out,
grpc_byte_stream **stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */

@ -195,22 +195,20 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_incoming_byte_stream { struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base; grpc_byte_stream base;
gpr_refcount refs; gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message;
grpc_error *error;
grpc_chttp2_transport *transport; grpc_chttp2_transport *transport; /* immutable */
grpc_chttp2_stream *stream; grpc_chttp2_stream *stream; /* immutable */
bool is_tail;
gpr_mu slice_mu; // protects slices, on_next /* Accessed only by transport thread when stream->pending_byte_stream == false
grpc_slice_buffer slices; * Accessed only by application thread when stream->pending_byte_stream ==
grpc_closure *on_next; * true */
grpc_slice *next;
uint32_t remaining_bytes; uint32_t remaining_bytes;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
struct { struct {
grpc_closure closure; grpc_closure closure;
grpc_slice *slice;
size_t max_size_hint; size_t max_size_hint;
grpc_closure *on_complete; grpc_closure *on_complete;
} next_action; } next_action;
@ -445,8 +443,8 @@ struct grpc_chttp2_stream {
uint32_t id; uint32_t id;
/** window available for us to send to peer, over or under the initial window /** window available for us to send to peer, over or under the initial window
* size of the transport... ie: * size of the transport... ie:
* outgoing_window = outgoing_window_delta + transport.initial_window_size */ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta; int64_t outgoing_window_delta;
/** things the upper layers would like to send */ /** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata; grpc_metadata_batch *send_initial_metadata;
@ -473,9 +471,6 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats; grpc_transport_stream_stats stats;
/** number of streams that are currently being read */
gpr_refcount active_streams;
/** Is this stream closed for writing. */ /** Is this stream closed for writing. */
bool write_closed; bool write_closed;
/** Is this stream reading half-closed. */ /** Is this stream reading half-closed. */
@ -499,7 +494,17 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
grpc_chttp2_incoming_frame_queue incoming_frames; grpc_slice_buffer frame_storage; /* protected by t combiner */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure *on_next; /* protected by t combiner */
bool pending_byte_stream; /* protected by t combiner */
grpc_closure reset_byte_stream;
grpc_error *byte_stream_error; /* protected by t combiner */
bool received_last_frame; /* protected by t combiner */
gpr_timespec deadline; gpr_timespec deadline;
@ -512,6 +517,9 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */ * incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta; int64_t incoming_window_delta;
/** parsing state for data frames */ /** parsing state for data frames */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_chttp2_data_parser data_parser; grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */ /** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes; int64_t received_bytes;
@ -790,10 +798,13 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags); uint32_t frame_size, uint32_t flags);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_incoming_byte_stream_push(
grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice); grpc_slice slice, grpc_slice *slice_out);
void grpc_chttp2_incoming_byte_stream_finished( grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, bool reset_on_error);
void grpc_chttp2_incoming_byte_stream_notify(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error); grpc_error *error);

@ -458,12 +458,13 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0); return init_skip_frame_parser(exec_ctx, t, 0);
} }
if (err == GRPC_ERROR_NONE) { if (err == GRPC_ERROR_NONE) {
err = grpc_chttp2_data_parser_begin_frame(&s->data_parser, err = grpc_chttp2_data_parser_begin_frame(
t->incoming_frame_flags, s->id); &s->data_parser, t->incoming_frame_flags, s->id, s);
} }
error_handler: error_handler:
if (err == GRPC_ERROR_NONE) { if (err == GRPC_ERROR_NONE) {
t->incoming_stream = s; t->incoming_stream = s;
/* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse; t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser; t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;

@ -973,9 +973,20 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer; grpc_slice_buffer write_slice_buffer;
grpc_slice slice; grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer); grpc_slice_buffer_init(&write_slice_buffer);
grpc_byte_stream_next( if (1 != grpc_byte_stream_next(
NULL, stream_op->payload->send_message.send_message, &slice, exec_ctx, stream_op->payload->send_message.send_message,
stream_op->payload->send_message.send_message->length, NULL); stream_op->payload->send_message.send_message->length,
NULL)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
stream_op->payload->send_message.send_message,
&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
grpc_slice_buffer_add(&write_slice_buffer, slice); grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) { if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */ /* Empty request not handled yet */

@ -221,6 +221,13 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp; grpc_call_element *elem = elemp;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice)) {
/* Should never reach here */
abort();
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem); finish_send_message(exec_ctx, elem);
@ -233,8 +240,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
while (grpc_byte_stream_next( while (grpc_byte_stream_next(
exec_ctx, calld->send_op->payload->send_message.send_message, exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
&calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { &calld->got_slice)) {
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem); finish_send_message(exec_ctx, elem);

@ -220,8 +220,11 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes; uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next( while (grpc_byte_stream_next(
exec_ctx, calld->send_op->payload->send_message.send_message, exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0,
&calld->incoming_slice, ~(size_t)0, &calld->got_slice)) { &calld->got_slice)) {
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice);
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice)); GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@ -237,6 +240,13 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp; grpc_call_element *elem = elemp;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
calld->send_message_blocked = false; calld->send_message_blocked = false;
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice)) {
/* Should never reach here */
abort();
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/ /* Pass down the original send_message op that was blocked.*/

@ -1187,6 +1187,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) { batch_control *bctl) {
grpc_error *error;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
for (;;) { for (;;) {
size_t remaining = call->receiving_stream->length - size_t remaining = call->receiving_stream->length -
@ -1198,11 +1199,22 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl); finish_batch_step(exec_ctx, bctl);
return; return;
} }
if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice, remaining,
&call->receiving_slice_ready)) { &call->receiving_slice_ready)) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
call->receiving_slice); &call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
return;
}
} else { } else {
return; return;
} }
@ -1213,12 +1225,24 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) { grpc_error *error) {
batch_control *bctl = bctlp; batch_control *bctl = bctlp;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
grpc_byte_stream *bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, grpc_slice slice;
call->receiving_slice); error = grpc_byte_stream_pull(exec_ctx, bs, &slice);
continue_receiving_slices(exec_ctx, bctl); if (error == GRPC_ERROR_NONE) {
} else { grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
slice);
continue_receiving_slices(exec_ctx, bctl);
} else {
/* Error returned by grpc_byte_stream_pull needs to be released manually
*/
release_error = true;
}
}
if (error != GRPC_ERROR_NONE) {
if (grpc_trace_operation_failures) { if (grpc_trace_operation_failures) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
} }
@ -1226,7 +1250,11 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_stream = NULL; call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer); grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL; *call->receiving_buffer = NULL;
call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl); finish_batch_step(exec_ctx, bctl);
if (release_error) {
GRPC_ERROR_UNREF(error);
}
} }
} }

@ -40,10 +40,15 @@
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_slice *slice, grpc_byte_stream *byte_stream, size_t max_size_hint,
size_t max_size_hint, grpc_closure *on_complete) { grpc_closure *on_complete) {
return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint, return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete);
on_complete); }
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
return byte_stream->pull(exec_ctx, byte_stream, slice);
} }
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -53,16 +58,24 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
/* slice_buffer_stream */ /* slice_buffer_stream */
static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
grpc_slice *slice, size_t max_size_hint, size_t max_size_hint,
grpc_closure *on_complete) { grpc_closure *on_complete) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
return true;
}
static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count); GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice = *slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++; stream->cursor++;
return 1; return GRPC_ERROR_NONE;
} }
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -75,6 +88,7 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
stream->base.length = (uint32_t)slice_buffer->length; stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags; stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next; stream->base.next = slice_buffer_stream_next;
stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy; stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer; stream->backing_buffer = slice_buffer;
stream->cursor = 0; stream->cursor = 0;

@ -49,9 +49,10 @@ typedef struct grpc_byte_stream grpc_byte_stream;
struct grpc_byte_stream { struct grpc_byte_stream {
uint32_t length; uint32_t length;
uint32_t flags; uint32_t flags;
int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice, size_t max_size_hint, size_t max_size_hint, grpc_closure *on_complete);
grpc_closure *on_complete); grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
grpc_slice *slice);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
}; };
@ -61,12 +62,20 @@ struct grpc_byte_stream {
* *
* max_size_hint can be set as a hint as to the maximum number * max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read. * of bytes that would be acceptable to read.
*/
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, size_t max_size_hint,
grpc_closure *on_complete);
/* returns the next slice in the byte stream when it is ready (indicated by
* either grpc_byte_stream_next returning 1 or on_complete passed to
* grpc_byte_stream_next is called).
* *
* once a slice is returned into *slice, it is owned by the caller. * once a slice is returned into *slice, it is owned by the caller.
*/ */
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_slice *slice, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete); grpc_slice *slice);
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream); grpc_byte_stream *byte_stream);

@ -569,12 +569,17 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
return; return;
} }
} while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice, } while (grpc_byte_stream_next(exec_ctx, recv_stream,
recv_stream->length - received, recv_stream->length - received,
drain_continue.get())); drain_continue.get()) &&
GRPC_ERROR_NONE ==
grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
(received += GRPC_SLICE_LENGTH(recv_slice),
grpc_slice_unref_internal(exec_ctx, recv_slice), true));
}); });
drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice); received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice); grpc_slice_unref_internal(exec_ctx, recv_slice);
grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);

Loading…
Cancel
Save