Merge github.com:grpc/grpc into minimal_test

pull/10449/head
Craig Tiller 8 years ago
commit 09b02847f1
  1. 14
      src/core/ext/filters/http/client/http_client_filter.c
  2. 14
      src/core/ext/filters/http/message_compress/message_compress_filter.c
  3. 338
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  4. 365
      src/core/ext/transport/chttp2/transport/frame_data.c
  5. 24
      src/core/ext/transport/chttp2/transport/frame_data.h
  6. 51
      src/core/ext/transport/chttp2/transport/internal.h
  7. 5
      src/core/ext/transport/chttp2/transport/parsing.c
  8. 17
      src/core/ext/transport/cronet/transport/cronet_transport.c
  9. 44
      src/core/lib/surface/call.c
  10. 32
      src/core/lib/transport/byte_stream.c
  11. 21
      src/core/lib/transport/byte_stream.h
  12. 9
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  13. 23
      tools/gce/linux_performance_worker_init.sh
  14. 12
      tools/gce/linux_worker_init.sh

@ -220,11 +220,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
uint8_t *wrptr = calld->payload_bytes; uint8_t *wrptr = calld->payload_bytes;
while (grpc_byte_stream_next( while (grpc_byte_stream_next(
exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, exec_ctx, calld->send_op->payload->send_message.send_message,
&calld->got_slice)) { &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice);
memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice),
GRPC_SLICE_LENGTH(calld->incoming_slice)); GRPC_SLICE_LENGTH(calld->incoming_slice));
wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice);
@ -240,13 +237,6 @@ static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp; grpc_call_element *elem = elemp;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
calld->send_message_blocked = false; calld->send_message_blocked = false;
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice)) {
/* Should never reach here */
abort();
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
/* Pass down the original send_message op that was blocked.*/ /* Pass down the original send_message op that was blocked.*/

@ -239,13 +239,6 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) {
grpc_call_element *elem = elemp; grpc_call_element *elem = elemp;
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice)) {
/* Should never reach here */
abort();
}
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem); finish_send_message(exec_ctx, elem);
@ -258,11 +251,8 @@ static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem) { grpc_call_element *elem) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
while (grpc_byte_stream_next( while (grpc_byte_stream_next(
exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, exec_ctx, calld->send_op->payload->send_message.send_message,
&calld->got_slice)) { &calld->incoming_slice, ~(size_t)0, &calld->got_slice)) {
grpc_byte_stream_pull(exec_ctx,
calld->send_op->payload->send_message.send_message,
&calld->incoming_slice);
grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); grpc_slice_buffer_add(&calld->slices, calld->incoming_slice);
if (calld->send_length == calld->slices.length) { if (calld->send_length == calld->slices.length) {
finish_send_message(exec_ctx, elem); finish_send_message(exec_ctx, elem);

@ -44,7 +44,6 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/frame_data.h"
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/ext/transport/chttp2/transport/varint.h"
#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args.h"
@ -130,11 +129,6 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored); grpc_error *error_ignored);
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error); grpc_error *error);
@ -180,9 +174,6 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error); grpc_error *error);
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/******************************************************************************* /*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING * CONSTRUCTION/DESTRUCTION/REFCOUNTING
*/ */
@ -664,6 +655,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is /* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively read-closed. The others are for incoming_byte_streams that are actively
reading */ reading */
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2"); GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@ -673,11 +665,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s,
grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@ -695,6 +682,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_error *error) { grpc_error *error) {
grpc_byte_stream *bs;
grpc_chttp2_stream *s = sp; grpc_chttp2_stream *s = sp;
grpc_chttp2_transport *t = s->t; grpc_chttp2_transport *t = s->t;
@ -705,9 +693,9 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL); GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == NULL);
} }
grpc_slice_buffer_destroy_internal(exec_ctx, while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames))) {
&s->unprocessed_incoming_frames_buffer); incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); }
grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s);
@ -734,7 +722,6 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error); GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) { if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@ -1188,9 +1175,8 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx,
s->fetching_send_message = NULL; s->fetching_send_message = NULL;
return; /* early out */ return; /* early out */
} else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message,
UINT32_MAX, &s->complete_fetch_locked)) { &s->fetching_slice, UINT32_MAX,
grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, &s->complete_fetch_locked)) {
&s->fetching_slice);
add_fetched_slice_locked(exec_ctx, t, s); add_fetched_slice_locked(exec_ctx, t, s);
} }
} }
@ -1201,15 +1187,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs,
grpc_chttp2_stream *s = gs; grpc_chttp2_stream *s = gs;
grpc_chttp2_transport *t = s->t; grpc_chttp2_transport *t = s->t;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
error = grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, add_fetched_slice_locked(exec_ctx, t, s);
&s->fetching_slice); continue_fetching_send_locked(exec_ctx, t, s);
if (error == GRPC_ERROR_NONE) { } else {
add_fetched_slice_locked(exec_ctx, t, s);
continue_fetching_send_locked(exec_ctx, t, s);
}
}
if (error != GRPC_ERROR_NONE) {
/* TODO(ctiller): what to do here */ /* TODO(ctiller): what to do here */
abort(); abort();
} }
@ -1444,7 +1424,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(s->recv_message_ready == NULL);
s->recv_message_ready = op_payload->recv_message.recv_message_ready; s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message; s->recv_message = op_payload->recv_message.recv_message;
if (s->id != 0 && s->frame_storage.length == 0) { if (s->id != 0 &&
(s->incoming_frames.head == NULL || s->incoming_frames.head->is_tail)) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
} }
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
@ -1633,13 +1614,13 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_byte_stream *bs;
if (s->recv_initial_metadata_ready != NULL && if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) { if (s->seen_error) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
if (!s->pending_byte_stream) { NULL) {
grpc_slice_buffer_reset_and_unref_internal( incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
exec_ctx, &s->unprocessed_incoming_frames_buffer);
} }
} }
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
@ -1652,65 +1633,39 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_error *error = GRPC_ERROR_NONE; grpc_byte_stream *bs;
if (s->recv_message_ready != NULL) { if (s->recv_message_ready != NULL) {
*s->recv_message = NULL; while (s->final_metadata_requested && s->seen_error &&
if (s->final_metadata_requested && s->seen_error) { (bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); NULL) {
if (!s->pending_byte_stream) { incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
}
} }
if (!s->pending_byte_stream) { if (s->incoming_frames.head != NULL) {
while (s->unprocessed_incoming_frames_buffer.length > 0 || *s->recv_message =
s->frame_storage.length > 0) { grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames);
if (s->unprocessed_incoming_frames_buffer.length == 0) { GPR_ASSERT(*s->recv_message != NULL);
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer,
&s->frame_storage);
}
error = deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s,
&s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
&s->frame_storage);
grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
break;
} else if (*s->recv_message != NULL) {
break;
}
}
}
if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL; *s->recv_message = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} }
GRPC_ERROR_UNREF(error);
} }
} }
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s) { grpc_chttp2_stream *s) {
grpc_byte_stream *bs;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
if (s->recv_trailing_metadata_finished != NULL && s->read_closed && if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) { s->write_closed) {
if (s->seen_error) { if (s->seen_error) {
grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &s->frame_storage); while ((bs = grpc_chttp2_incoming_frame_queue_pop(&s->incoming_frames)) !=
if (!s->pending_byte_stream) { NULL) {
grpc_slice_buffer_reset_and_unref_internal( incoming_byte_stream_destroy_locked(exec_ctx, bs, GRPC_ERROR_NONE);
exec_ctx, &s->unprocessed_incoming_frames_buffer);
} }
} }
bool pending_data = s->pending_byte_stream || if (s->all_incoming_byte_streams_finished &&
s->unprocessed_incoming_frames_buffer.length > 0;
if (s->read_closed && s->frame_storage.length == 0 &&
(!pending_data || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) { s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish( grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@ -1721,6 +1676,14 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
} }
} }
static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
if ((s->all_incoming_byte_streams_finished = gpr_unref(&s->active_streams))) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
}
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) { uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@ -1729,19 +1692,10 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL; t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
} }
if (s->pending_byte_stream) { if (s->data_parser.parsing_frame != NULL) {
if (s->on_next != NULL) { grpc_chttp2_incoming_byte_stream_finished(
grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
if (error == GRPC_ERROR_NONE) { s->data_parser.parsing_frame = NULL;
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
incoming_byte_stream_publish_error(exec_ctx, bs, error);
incoming_byte_stream_unref(exec_ctx, bs);
s->data_parser.parsing_frame = NULL;
} else {
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_REF(error);
}
} }
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@ -1927,6 +1881,7 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
} }
} }
decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} }
@ -2464,28 +2419,12 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM * BYTE STREAM
*/ */
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
s->byte_stream_error = GRPC_ERROR_NONE;
grpc_chttp2_cancel_stream(exec_ctx, s->t, s, GRPC_ERROR_REF(error));
s->byte_stream_error = error;
}
}
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) { grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) { if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs); gpr_free(bs);
} }
} }
@ -2545,90 +2484,47 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport; grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream; grpc_chttp2_stream *s = bs->stream;
size_t cur_length = s->frame_storage.length; if (bs->is_tail) {
incoming_byte_stream_update_flow_control( gpr_mu_lock(&bs->slice_mu);
exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); size_t cur_length = bs->slices.length;
gpr_mu_unlock(&bs->slice_mu);
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); incoming_byte_stream_update_flow_control(
if (s->frame_storage.length > 0) { exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
grpc_slice_buffer_swap(&s->frame_storage, }
&s->unprocessed_incoming_frames_buffer); gpr_mu_lock(&bs->slice_mu);
grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); if (bs->slices.count > 0) {
} else if (s->byte_stream_error != GRPC_ERROR_NONE) { *bs->next_action.slice = grpc_slice_buffer_take_first(&bs->slices);
grpc_closure_sched(exec_ctx, bs->next_action.on_complete, grpc_closure_run(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
GRPC_ERROR_REF(s->byte_stream_error)); } else if (bs->error != GRPC_ERROR_NONE) {
if (s->data_parser.parsing_frame != NULL) { grpc_closure_run(exec_ctx, bs->next_action.on_complete,
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); GRPC_ERROR_REF(bs->error));
s->data_parser.parsing_frame = NULL;
}
} else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
s->byte_stream_error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = NULL;
}
} else {
/* Should never reach here. */
GPR_ASSERT(false);
}
} else { } else {
s->on_next = bs->next_action.on_complete; bs->on_next = bs->next_action.on_complete;
bs->next = bs->next_action.slice;
} }
gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
} }
static bool incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) { grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs = grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream; (grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream; gpr_ref(&bs->refs);
if (s->unprocessed_incoming_frames_buffer.length > 0) { bs->next_action.slice = slice;
return true; bs->next_action.max_size_hint = max_size_hint;
} else { bs->next_action.on_complete = on_complete;
gpr_ref(&bs->refs); grpc_closure_sched(
bs->next_action.max_size_hint = max_size_hint; exec_ctx,
bs->next_action.on_complete = on_complete; grpc_closure_init(
grpc_closure_sched( &bs->next_action.closure, incoming_byte_stream_next_locked, bs,
exec_ctx, grpc_combiner_scheduler(bs->transport->combiner, false)),
grpc_closure_init( GRPC_ERROR_NONE);
&bs->next_action.closure, incoming_byte_stream_next_locked, bs, GPR_TIMER_END("incoming_byte_stream_next", 0);
grpc_combiner_scheduler(bs->transport->combiner, false)), return 0;
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return false;
}
}
static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
grpc_error *error = deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
return error;
}
} else {
grpc_error *error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
GPR_TIMER_END("incoming_byte_stream_pull", 0);
return GRPC_ERROR_NONE;
} }
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -2638,14 +2534,9 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream, void *byte_stream,
grpc_error *error_ignored) { grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream; grpc_chttp2_incoming_byte_stream *bs = byte_stream;
grpc_chttp2_stream *s = bs->stream;
grpc_chttp2_transport *t = s->t;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
s->pending_byte_stream = false;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
} }
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -2665,53 +2556,50 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error( static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) { grpc_error *error) {
grpc_chttp2_stream *s = bs->stream;
GPR_ASSERT(error != GRPC_ERROR_NONE); GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL; bs->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error); GRPC_ERROR_UNREF(bs->error);
s->byte_stream_error = GRPC_ERROR_REF(error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error)); GRPC_ERROR_REF(error));
bs->error = error;
} }
grpc_error *grpc_chttp2_incoming_byte_stream_push( void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice, grpc_slice *slice_out) { grpc_slice slice) {
grpc_chttp2_stream *s = bs->stream; gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
grpc_error *error = incoming_byte_stream_publish_error(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); exec_ctx, bs,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(exec_ctx, slice);
return error;
} else { } else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (slice_out != NULL) { if (bs->on_next != NULL) {
*slice_out = slice; *bs->next = slice;
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
} else {
grpc_slice_buffer_add(&bs->slices, slice);
} }
return GRPC_ERROR_NONE;
} }
gpr_mu_unlock(&bs->slice_mu);
} }
grpc_error *grpc_chttp2_incoming_byte_stream_finished( void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, bool reset_on_error) { grpc_error *error) {
grpc_chttp2_stream *s = bs->stream;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) { if (bs->remaining_bytes != 0) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
} }
gpr_mu_unlock(&bs->slice_mu);
} }
if (error != GRPC_ERROR_NONE && reset_on_error) { if (error != GRPC_ERROR_NONE) {
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_REF(error)); incoming_byte_stream_publish_error(exec_ctx, bs, error);
} }
incoming_byte_stream_unref(exec_ctx, bs); incoming_byte_stream_unref(exec_ctx, bs);
return error;
} }
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@ -2723,12 +2611,26 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->remaining_bytes = frame_size;
incoming_byte_stream->base.flags = flags; incoming_byte_stream->base.flags = flags;
incoming_byte_stream->base.next = incoming_byte_stream_next; incoming_byte_stream->base.next = incoming_byte_stream_next;
incoming_byte_stream->base.pull = incoming_byte_stream_pull;
incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
gpr_mu_init(&incoming_byte_stream->slice_mu);
gpr_ref_init(&incoming_byte_stream->refs, 2); gpr_ref_init(&incoming_byte_stream->refs, 2);
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t; incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s; incoming_byte_stream->stream = s;
s->byte_stream_error = GRPC_ERROR_NONE; gpr_ref(&incoming_byte_stream->stream->active_streams);
grpc_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
incoming_byte_stream->error = GRPC_ERROR_NONE;
grpc_chttp2_incoming_frame_queue *q = &s->incoming_frames;
if (q->head == NULL) {
q->head = incoming_byte_stream;
} else {
q->tail->is_tail = 0;
q->tail->next_message = incoming_byte_stream;
}
q->tail = incoming_byte_stream;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
return incoming_byte_stream; return incoming_byte_stream;
} }

@ -40,7 +40,6 @@
#include <grpc/support/string_util.h> #include <grpc/support/string_util.h>
#include <grpc/support/useful.h> #include <grpc/support/useful.h>
#include "src/core/ext/transport/chttp2/transport/internal.h" #include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h" #include "src/core/lib/support/string.h"
#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport.h"
@ -54,17 +53,16 @@ grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) { grpc_chttp2_data_parser *parser) {
if (parser->parsing_frame != NULL) { if (parser->parsing_frame != NULL) {
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, parser->parsing_frame, exec_ctx, parser->parsing_frame,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"), false)); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Parser destroyed"));
} }
GRPC_ERROR_UNREF(parser->error); GRPC_ERROR_UNREF(parser->error);
} }
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags, uint8_t flags,
uint32_t stream_id, uint32_t stream_id) {
grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg; char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@ -76,14 +74,47 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
} }
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
s->received_last_frame = true; parser->is_last_frame = 1;
} else { } else {
s->received_last_frame = false; parser->is_last_frame = 0;
} }
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_chttp2_incoming_frame_queue_merge(
grpc_chttp2_incoming_frame_queue *head_dst,
grpc_chttp2_incoming_frame_queue *tail_src) {
if (tail_src->head == NULL) {
return;
}
if (head_dst->head == NULL) {
*head_dst = *tail_src;
memset(tail_src, 0, sizeof(*tail_src));
return;
}
head_dst->tail->next_message = tail_src->head;
head_dst->tail = tail_src->tail;
memset(tail_src, 0, sizeof(*tail_src));
}
grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_incoming_frame_queue *q) {
grpc_byte_stream *out;
if (q->head == NULL) {
return NULL;
}
out = &q->head->base;
if (q->head == q->tail) {
memset(q, 0, sizeof(*q));
} else {
q->head = q->head->next_message;
}
return out;
}
void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
uint32_t write_bytes, int is_eof, uint32_t write_bytes, int is_eof,
grpc_transport_one_way_stats *stats, grpc_transport_one_way_stats *stats,
@ -112,217 +143,145 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes; stats->data_bytes += write_bytes;
} }
grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, static grpc_error *parse_inner(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p, grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_slice_buffer *slices, grpc_slice slice) {
grpc_slice *slice_out, uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
grpc_byte_stream **stream_out) { uint8_t *const end = GRPC_SLICE_END_PTR(slice);
grpc_error *error = GRPC_ERROR_NONE; uint8_t *cur = beg;
grpc_chttp2_transport *t = s->t; uint32_t message_flags;
grpc_chttp2_incoming_byte_stream *incoming_byte_stream;
while (slices->count > 0) { char *msg;
uint8_t *beg = NULL;
uint8_t *end = NULL;
uint8_t *cur = NULL;
grpc_slice slice = grpc_slice_buffer_take_first(slices);
beg = GRPC_SLICE_START_PTR(slice); if (cur == end) {
end = GRPC_SLICE_END_PTR(slice); return GRPC_ERROR_NONE;
cur = beg; }
uint32_t message_flags;
char *msg;
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
case GRPC_CHTTP2_DATA_FH_0:
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
p->is_frame_compressed = false; /* GPR_FALSE */
break;
case 1:
p->is_frame_compressed = true; /* GPR_TRUE */
break;
default:
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)s->id);
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
GPR_ASSERT(stream_out != NULL);
GPR_ASSERT(p->parsing_frame == NULL);
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
*stream_out = &p->parsing_frame->base;
if (p->parsing_frame->remaining_bytes == 0) {
GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true));
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
}
s->pending_byte_stream = true;
if (cur != end) { switch (p->state) {
grpc_slice_buffer_undo_take_first( case GRPC_CHTTP2_DATA_ERROR:
&s->unprocessed_incoming_frames_buffer, p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); return GRPC_ERROR_REF(p->error);
} fh_0:
grpc_slice_unref_internal(exec_ctx, slice); case GRPC_CHTTP2_DATA_FH_0:
s->stats.incoming.framing_bytes++;
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
p->is_frame_compressed = 0; /* GPR_FALSE */
break;
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)s->id);
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
s->stats.incoming.framing_bytes++;
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(exec_ctx, t, s, p->frame_size,
message_flags);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
return GRPC_ERROR_NONE;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
s->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_ERROR_NONE;
} else if (remaining > p->frame_size) {
s->stats.incoming.data_bytes += p->frame_size;
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {
GPR_ASSERT(remaining <= p->frame_size);
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
p->frame_size -= remaining;
s->stats.incoming.data_bytes += remaining;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)),
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
if (GRPC_ERROR_NONE !=
(grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)),
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
if (GRPC_ERROR_NONE !=
(error = grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, p->parsing_frame, GRPC_ERROR_NONE, true))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
} }
}
} }
return GRPC_ERROR_NONE; GPR_UNREACHABLE_CODE(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
} }
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_chttp2_stream *s,
grpc_slice slice, int is_last) { grpc_slice slice, int is_last) {
/* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ grpc_chttp2_data_parser *p = parser;
s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); grpc_error *error = parse_inner(exec_ctx, p, t, s, slice);
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} else if (s->on_next) {
GPR_ASSERT(s->frame_storage.length == 0);
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
}
if (is_last && s->received_last_frame) { if (is_last && p->is_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
} }
return GRPC_ERROR_NONE; return error;
} }

@ -56,16 +56,28 @@ typedef enum {
typedef struct grpc_chttp2_incoming_byte_stream typedef struct grpc_chttp2_incoming_byte_stream
grpc_chttp2_incoming_byte_stream; grpc_chttp2_incoming_byte_stream;
typedef struct grpc_chttp2_incoming_frame_queue {
grpc_chttp2_incoming_byte_stream *head;
grpc_chttp2_incoming_byte_stream *tail;
} grpc_chttp2_incoming_frame_queue;
typedef struct { typedef struct {
grpc_chttp2_stream_state state; grpc_chttp2_stream_state state;
uint8_t is_last_frame;
uint8_t frame_type; uint8_t frame_type;
uint32_t frame_size; uint32_t frame_size;
grpc_error *error; grpc_error *error;
bool is_frame_compressed; int is_frame_compressed;
grpc_chttp2_incoming_byte_stream *parsing_frame; grpc_chttp2_incoming_byte_stream *parsing_frame;
} grpc_chttp2_data_parser; } grpc_chttp2_data_parser;
void grpc_chttp2_incoming_frame_queue_merge(
grpc_chttp2_incoming_frame_queue *head_dst,
grpc_chttp2_incoming_frame_queue *tail_src);
grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_incoming_frame_queue *q);
/* initialize per-stream state for data frame parsing */ /* initialize per-stream state for data frame parsing */
grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser); grpc_error *grpc_chttp2_data_parser_init(grpc_chttp2_data_parser *parser);
@ -75,8 +87,7 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */ /* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags, uint8_t flags,
uint32_t stream_id, uint32_t stream_id);
grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a /* handle a slice of a data frame - is_last indicates the last slice of a
frame */ frame */
@ -90,11 +101,4 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
grpc_transport_one_way_stats *stats, grpc_transport_one_way_stats *stats,
grpc_slice_buffer *outbuf); grpc_slice_buffer *outbuf);
grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s,
grpc_slice_buffer *slices,
grpc_slice *slice_out,
grpc_byte_stream **stream_out);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */ #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FRAME_DATA_H */

@ -195,20 +195,22 @@ typedef struct grpc_chttp2_write_cb {
struct grpc_chttp2_incoming_byte_stream { struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base; grpc_byte_stream base;
gpr_refcount refs; gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message;
grpc_error *error;
grpc_chttp2_transport *transport; /* immutable */ grpc_chttp2_transport *transport;
grpc_chttp2_stream *stream; /* immutable */ grpc_chttp2_stream *stream;
bool is_tail;
/* Accessed only by transport thread when stream->pending_byte_stream == false gpr_mu slice_mu; // protects slices, on_next
* Accessed only by application thread when stream->pending_byte_stream == grpc_slice_buffer slices;
* true */ grpc_closure *on_next;
grpc_slice *next;
uint32_t remaining_bytes; uint32_t remaining_bytes;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
struct { struct {
grpc_closure closure; grpc_closure closure;
grpc_slice *slice;
size_t max_size_hint; size_t max_size_hint;
grpc_closure *on_complete; grpc_closure *on_complete;
} next_action; } next_action;
@ -443,8 +445,8 @@ struct grpc_chttp2_stream {
uint32_t id; uint32_t id;
/** window available for us to send to peer, over or under the initial window /** window available for us to send to peer, over or under the initial window
* size of the transport... ie: * size of the transport... ie:
* outgoing_window = outgoing_window_delta + transport.initial_window_size */ * outgoing_window = outgoing_window_delta + transport.initial_window_size */
int64_t outgoing_window_delta; int64_t outgoing_window_delta;
/** things the upper layers would like to send */ /** things the upper layers would like to send */
grpc_metadata_batch *send_initial_metadata; grpc_metadata_batch *send_initial_metadata;
@ -471,6 +473,9 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats; grpc_transport_stream_stats stats;
/** number of streams that are currently being read */
gpr_refcount active_streams;
/** Is this stream closed for writing. */ /** Is this stream closed for writing. */
bool write_closed; bool write_closed;
/** Is this stream reading half-closed. */ /** Is this stream reading half-closed. */
@ -494,17 +499,7 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
grpc_slice_buffer frame_storage; /* protected by t combiner */ grpc_chttp2_incoming_frame_queue incoming_frames;
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_slice_buffer unprocessed_incoming_frames_buffer;
grpc_closure *on_next; /* protected by t combiner */
bool pending_byte_stream; /* protected by t combiner */
grpc_closure reset_byte_stream;
grpc_error *byte_stream_error; /* protected by t combiner */
bool received_last_frame; /* protected by t combiner */
gpr_timespec deadline; gpr_timespec deadline;
@ -517,9 +512,6 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */ * incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta; int64_t incoming_window_delta;
/** parsing state for data frames */ /** parsing state for data frames */
/* Accessed only by transport thread when stream->pending_byte_stream == false
* Accessed only by application thread when stream->pending_byte_stream ==
* true */
grpc_chttp2_data_parser data_parser; grpc_chttp2_data_parser data_parser;
/** number of bytes received - reset at end of parse thread execution */ /** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes; int64_t received_bytes;
@ -798,13 +790,10 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags); uint32_t frame_size, uint32_t flags);
grpc_error *grpc_chttp2_incoming_byte_stream_push( void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice, grpc_slice *slice_out); grpc_slice slice);
grpc_error *grpc_chttp2_incoming_byte_stream_finished( void grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error, bool reset_on_error);
void grpc_chttp2_incoming_byte_stream_notify(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error); grpc_error *error);

@ -458,13 +458,12 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0); return init_skip_frame_parser(exec_ctx, t, 0);
} }
if (err == GRPC_ERROR_NONE) { if (err == GRPC_ERROR_NONE) {
err = grpc_chttp2_data_parser_begin_frame( err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
&s->data_parser, t->incoming_frame_flags, s->id, s); t->incoming_frame_flags, s->id);
} }
error_handler: error_handler:
if (err == GRPC_ERROR_NONE) { if (err == GRPC_ERROR_NONE) {
t->incoming_stream = s; t->incoming_stream = s;
/* t->parser = grpc_chttp2_data_parser_parse;*/
t->parser = grpc_chttp2_data_parser_parse; t->parser = grpc_chttp2_data_parser_parse;
t->parser_data = &s->data_parser; t->parser_data = &s->data_parser;
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;

@ -973,20 +973,9 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
grpc_slice_buffer write_slice_buffer; grpc_slice_buffer write_slice_buffer;
grpc_slice slice; grpc_slice slice;
grpc_slice_buffer_init(&write_slice_buffer); grpc_slice_buffer_init(&write_slice_buffer);
if (1 != grpc_byte_stream_next( grpc_byte_stream_next(
exec_ctx, stream_op->payload->send_message.send_message, NULL, stream_op->payload->send_message.send_message, &slice,
stream_op->payload->send_message.send_message->length, stream_op->payload->send_message.send_message->length, NULL);
NULL)) {
/* Should never reach here */
GPR_ASSERT(false);
}
if (GRPC_ERROR_NONE !=
grpc_byte_stream_pull(exec_ctx,
stream_op->payload->send_message.send_message,
&slice)) {
/* Should never reach here */
GPR_ASSERT(false);
}
grpc_slice_buffer_add(&write_slice_buffer, slice); grpc_slice_buffer_add(&write_slice_buffer, slice);
if (write_slice_buffer.count != 1) { if (write_slice_buffer.count != 1) {
/* Empty request not handled yet */ /* Empty request not handled yet */

@ -1187,7 +1187,6 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) { batch_control *bctl) {
grpc_error *error;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
for (;;) { for (;;) {
size_t remaining = call->receiving_stream->length - size_t remaining = call->receiving_stream->length -
@ -1199,22 +1198,11 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
finish_batch_step(exec_ctx, bctl); finish_batch_step(exec_ctx, bctl);
return; return;
} }
if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
&call->receiving_slice, remaining,
&call->receiving_slice_ready)) { &call->receiving_slice_ready)) {
error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
&call->receiving_slice); call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
return;
}
} else { } else {
return; return;
} }
@ -1225,24 +1213,12 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) { grpc_error *error) {
batch_control *bctl = bctlp; batch_control *bctl = bctlp;
grpc_call *call = bctl->call; grpc_call *call = bctl->call;
grpc_byte_stream *bs = call->receiving_stream;
bool release_error = false;
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
grpc_slice slice; grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
error = grpc_byte_stream_pull(exec_ctx, bs, &slice); call->receiving_slice);
if (error == GRPC_ERROR_NONE) { continue_receiving_slices(exec_ctx, bctl);
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, } else {
slice);
continue_receiving_slices(exec_ctx, bctl);
} else {
/* Error returned by grpc_byte_stream_pull needs to be released manually
*/
release_error = true;
}
}
if (error != GRPC_ERROR_NONE) {
if (grpc_trace_operation_failures) { if (grpc_trace_operation_failures) {
GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error)); GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
} }
@ -1250,11 +1226,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
call->receiving_stream = NULL; call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer); grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL; *call->receiving_buffer = NULL;
call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl); finish_batch_step(exec_ctx, bctl);
if (release_error) {
GRPC_ERROR_UNREF(error);
}
} }
} }

@ -40,15 +40,10 @@
#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_internal.h"
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, size_t max_size_hint, grpc_byte_stream *byte_stream, grpc_slice *slice,
grpc_closure *on_complete) { size_t max_size_hint, grpc_closure *on_complete) {
return byte_stream->next(exec_ctx, byte_stream, max_size_hint, on_complete); return byte_stream->next(exec_ctx, byte_stream, slice, max_size_hint,
} on_complete);
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
return byte_stream->pull(exec_ctx, byte_stream, slice);
} }
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -58,24 +53,16 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
/* slice_buffer_stream */ /* slice_buffer_stream */
static bool slice_buffer_stream_next(grpc_exec_ctx *exec_ctx, static int slice_buffer_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_slice *slice, size_t max_size_hint,
grpc_closure *on_complete) { grpc_closure *on_complete) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
return true;
}
static grpc_error *slice_buffer_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream; grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
GPR_ASSERT(stream->cursor < stream->backing_buffer->count); GPR_ASSERT(stream->cursor < stream->backing_buffer->count);
*slice = *slice =
grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]); grpc_slice_ref_internal(stream->backing_buffer->slices[stream->cursor]);
stream->cursor++; stream->cursor++;
return GRPC_ERROR_NONE; return 1;
} }
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx, static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
@ -88,7 +75,6 @@ void grpc_slice_buffer_stream_init(grpc_slice_buffer_stream *stream,
stream->base.length = (uint32_t)slice_buffer->length; stream->base.length = (uint32_t)slice_buffer->length;
stream->base.flags = flags; stream->base.flags = flags;
stream->base.next = slice_buffer_stream_next; stream->base.next = slice_buffer_stream_next;
stream->base.pull = slice_buffer_stream_pull;
stream->base.destroy = slice_buffer_stream_destroy; stream->base.destroy = slice_buffer_stream_destroy;
stream->backing_buffer = slice_buffer; stream->backing_buffer = slice_buffer;
stream->cursor = 0; stream->cursor = 0;

@ -49,10 +49,9 @@ typedef struct grpc_byte_stream grpc_byte_stream;
struct grpc_byte_stream { struct grpc_byte_stream {
uint32_t length; uint32_t length;
uint32_t flags; uint32_t flags;
bool (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, int (*next)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream,
size_t max_size_hint, grpc_closure *on_complete); grpc_slice *slice, size_t max_size_hint,
grpc_error *(*pull)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_closure *on_complete);
grpc_slice *slice);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream);
}; };
@ -62,20 +61,12 @@ struct grpc_byte_stream {
* *
* max_size_hint can be set as a hint as to the maximum number * max_size_hint can be set as a hint as to the maximum number
* of bytes that would be acceptable to read. * of bytes that would be acceptable to read.
*/
int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, size_t max_size_hint,
grpc_closure *on_complete);
/* returns the next slice in the byte stream when it is ready (indicated by
* either grpc_byte_stream_next returning 1 or on_complete passed to
* grpc_byte_stream_next is called).
* *
* once a slice is returned into *slice, it is owned by the caller. * once a slice is returned into *slice, it is owned by the caller.
*/ */
grpc_error *grpc_byte_stream_pull(grpc_exec_ctx *exec_ctx, int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream, grpc_byte_stream *byte_stream, grpc_slice *slice,
grpc_slice *slice); size_t max_size_hint, grpc_closure *on_complete);
void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx, void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream); grpc_byte_stream *byte_stream);

@ -569,17 +569,12 @@ static void BM_TransportStreamRecv(benchmark::State &state) {
grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE); grpc_closure_sched(exec_ctx, c.get(), GRPC_ERROR_NONE);
return; return;
} }
} while (grpc_byte_stream_next(exec_ctx, recv_stream, } while (grpc_byte_stream_next(exec_ctx, recv_stream, &recv_slice,
recv_stream->length - received, recv_stream->length - received,
drain_continue.get()) && drain_continue.get()));
GRPC_ERROR_NONE ==
grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice) &&
(received += GRPC_SLICE_LENGTH(recv_slice),
grpc_slice_unref_internal(exec_ctx, recv_slice), true));
}); });
drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) { drain_continue = MakeClosure([&](grpc_exec_ctx *exec_ctx, grpc_error *error) {
grpc_byte_stream_pull(exec_ctx, recv_stream, &recv_slice);
received += GRPC_SLICE_LENGTH(recv_slice); received += GRPC_SLICE_LENGTH(recv_slice);
grpc_slice_unref_internal(exec_ctx, recv_slice); grpc_slice_unref_internal(exec_ctx, recv_slice);
grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE); grpc_closure_run(exec_ctx, drain.get(), GRPC_ERROR_NONE);

@ -40,11 +40,6 @@ sudo apt-get update
sudo apt-get install -y openjdk-8-jdk sudo apt-get install -y openjdk-8-jdk
sudo apt-get install -y unzip lsof sudo apt-get install -y unzip lsof
# Add pubkey of jenkins@grpc-jenkins-master to authorized keys of jenkins@
# This needs to happen as the last step to prevent Jenkins master from connecting
# to a machine that hasn't been properly setup yet.
cat jenkins_master.pub | sudo tee --append ~jenkins/.ssh/authorized_keys
sudo apt-get install -y \ sudo apt-get install -y \
autoconf \ autoconf \
autotools-dev \ autotools-dev \
@ -169,3 +164,21 @@ git clone -v https://github.com/brendangregg/FlameGraph ~/FlameGraph
# Install scipy and numpy for benchmarking scripts # Install scipy and numpy for benchmarking scripts
sudo apt-get install python-scipy python-numpy sudo apt-get install python-scipy python-numpy
# Update Linux kernel to 4.9
wget \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-headers-4.9.20-040920_4.9.20-040920.201703310531_all.deb \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-headers-4.9.20-040920-generic_4.9.20-040920.201703310531_amd64.deb \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-image-4.9.20-040920-generic_4.9.20-040920.201703310531_amd64.deb
sudo dpkg -i linux-headers-4.9*.deb linux-image-4.9*.deb
rm linux-*
# Add pubkey of jenkins@grpc-jenkins-master to authorized keys of jenkins@
# This needs to happen as the last step to prevent Jenkins master from connecting
# to a machine that hasn't been properly setup yet.
cat jenkins_master.pub | sudo tee --append ~jenkins/.ssh/authorized_keys
# Restart for VM to pick up kernel update
echo 'Successfully initialized the linux worker, going for reboot in 10 seconds'
sleep 10
sudo reboot

@ -59,19 +59,27 @@ sudo usermod -aG docker jenkins
# Use "overlay" storage driver for docker # Use "overlay" storage driver for docker
# see https://github.com/grpc/grpc/issues/4988 # see https://github.com/grpc/grpc/issues/4988
echo 'DOCKER_OPTS="${DOCKER_OPTS} --storage-driver=overlay"' | sudo tee --append /etc/default/docker printf "{\n\t\"storage-driver\": \"overlay\"\n}" | sudo tee /etc/docker/daemon.json
# Install RVM # Install RVM
# TODO(jtattermusch): why is RVM needed? # TODO(jtattermusch): why is RVM needed?
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
curl -sSL https://get.rvm.io | bash -s stable --ruby curl -sSL https://get.rvm.io | bash -s stable --ruby
# Upgrade Linux kernel to 4.9
wget \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-headers-4.9.20-040920_4.9.20-040920.201703310531_all.deb \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-headers-4.9.20-040920-generic_4.9.20-040920.201703310531_amd64.deb \
kernel.ubuntu.com/~kernel-ppa/mainline/v4.9.20/linux-image-4.9.20-040920-generic_4.9.20-040920.201703310531_amd64.deb
sudo dpkg -i linux-headers-4.9*.deb linux-image-4.9*.deb
rm linux-*
# Add pubkey of jenkins@grpc-jenkins-master to authorized keys of jenkins@ # Add pubkey of jenkins@grpc-jenkins-master to authorized keys of jenkins@
# This needs to happen as the last step to prevent Jenkins master from connecting # This needs to happen as the last step to prevent Jenkins master from connecting
# to a machine that hasn't been properly setup yet. # to a machine that hasn't been properly setup yet.
cat jenkins_master.pub | sudo tee --append ~jenkins/.ssh/authorized_keys cat jenkins_master.pub | sudo tee --append ~jenkins/.ssh/authorized_keys
# Restart for docker to pickup the config changes. # Restart for docker to pick up the config changes.
echo 'Successfully initialized the linux worker, going for reboot in 10 seconds' echo 'Successfully initialized the linux worker, going for reboot in 10 seconds'
sleep 10 sleep 10

Loading…
Cancel
Save