diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8c197f2b85f..c86fefe2cfb 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -115,6 +115,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, grpc_error *error_ignored); +static void incoming_byte_stream_publish_error(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + grpc_error *error); +static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs); static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); @@ -156,13 +161,14 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static grpc_error *deframe_unprocessed_incoming_frames( - grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices, - grpc_slice *slice_out, bool partial_deframe); -static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s); +static grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx, + grpc_chttp2_data_parser *p, + grpc_chttp2_stream *s, + grpc_slice_buffer *slices, + grpc_slice *slice_out, + grpc_byte_stream **stream_out); +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING @@ -596,7 +602,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, /* We reserve one 'active stream' that's dropped when the stream is read-closed. The others are for incoming_byte_streams that are actively reading */ - gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); @@ -606,9 +611,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s, grpc_schedule_on_exec_ctx); - s->incoming_frames = NULL; grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer); - gpr_mu_init(&s->buffer_mu); + grpc_slice_buffer_init(&s->frame_storage); + s->pending_byte_stream = false; + grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner, false)); GRPC_CHTTP2_REF_TRANSPORT(t, "stream"); @@ -638,11 +644,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->unprocessed_incoming_frames_buffer); - if (s->incoming_frames != NULL) { - grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames; - s->incoming_frames = NULL; - incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, GRPC_ERROR_NONE); - } + grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage); grpc_chttp2_list_remove_stalled_by_transport(t, s); grpc_chttp2_list_remove_stalled_by_stream(t, s); @@ -661,9 +663,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_ASSERT(s->recv_initial_metadata_ready == NULL); GPR_ASSERT(s->recv_message_ready == NULL); GPR_ASSERT(s->recv_trailing_metadata_finished == NULL); - gpr_mu_lock(&s->buffer_mu); grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser); - gpr_mu_unlock(&s->buffer_mu); grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, &s->metadata_buffer[0]); grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx, @@ -671,6 +671,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer); GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); + GRPC_ERROR_UNREF(s->byte_stream_error); if (s->incoming_window_delta > 0) { GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( @@ -1345,22 +1346,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_ASSERT(s->recv_message_ready == NULL); s->recv_message_ready = op->recv_message_ready; s->recv_message = op->recv_message; - gpr_mu_lock(&s->buffer_mu); - if (s->id != 0 && (s->incoming_frames == NULL || - s->unprocessed_incoming_frames_buffer.count == 0)) { - gpr_mu_unlock(&s->buffer_mu); + if (s->id != 0 && s->frame_storage.length == 0) { incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0); - } else { - gpr_mu_unlock(&s->buffer_mu); - } - gpr_mu_lock(&s->buffer_mu); - if (s->incoming_frames == NULL && - s->unprocessed_incoming_frames_buffer.count > 0) { - deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, t, s, - &s->unprocessed_incoming_frames_buffer, NULL, true); } - gpr_mu_unlock(&s->buffer_mu); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -1527,18 +1515,9 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, if (s->recv_initial_metadata_ready != NULL && s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) { if (s->seen_error) { - if (s->incoming_frames != NULL) { - grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames; - s->incoming_frames = NULL; - incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, - GRPC_ERROR_NONE); - } - size_t length; - gpr_mu_lock(&s->buffer_mu); - length = s->unprocessed_incoming_frames_buffer.length; - gpr_mu_unlock(&s->buffer_mu); - if (length > 0) { - clean_unprocessed_frames_buffer(exec_ctx, t, s); + grpc_slice_buffer_reset_and_unref(&s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer); } } grpc_chttp2_incoming_metadata_buffer_publish( @@ -1551,32 +1530,38 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { + grpc_error *error = GRPC_ERROR_NONE; if (s->recv_message_ready != NULL) { + *s->recv_message = NULL; if (s->final_metadata_requested && s->seen_error) { - if (s->incoming_frames != NULL) { - grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames; - s->incoming_frames = NULL; - incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, - GRPC_ERROR_NONE); + grpc_slice_buffer_reset_and_unref(&s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer); } - size_t length; - gpr_mu_lock(&s->buffer_mu); - length = s->unprocessed_incoming_frames_buffer.length; - gpr_mu_unlock(&s->buffer_mu); - if (length > 0) { - clean_unprocessed_frames_buffer(exec_ctx, t, s); + } + if (!s->pending_byte_stream) { + while (s->unprocessed_incoming_frames_buffer.length > 0 || + s->frame_storage.length > 0) { + if (s->unprocessed_incoming_frames_buffer.length == 0) { + grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage); + } + /* error handling ok? */ + error = deframe_unprocessed_incoming_frames(exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message); + if (error != GRPC_ERROR_NONE) { + s->seen_error = true; + grpc_slice_buffer_reset_and_unref(&s->frame_storage); + grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer); + break; + } else if (*s->recv_message != NULL) { + break; + } } } - if (s->incoming_frames != NULL) { - *s->recv_message = &s->incoming_frames->base; - s->incoming_frames = NULL; - GPR_ASSERT(*s->recv_message != NULL); - grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); - s->recv_message_ready = NULL; + if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) { + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) { *s->recv_message = NULL; - grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE); - s->recv_message_ready = NULL; + null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE); } } } @@ -1588,21 +1573,13 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, if (s->recv_trailing_metadata_finished != NULL && s->read_closed && s->write_closed) { if (s->seen_error) { - if (s->incoming_frames != NULL) { - grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames; - s->incoming_frames = NULL; - incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, - GRPC_ERROR_NONE); - } - size_t length; - gpr_mu_lock(&s->buffer_mu); - length = s->unprocessed_incoming_frames_buffer.length; - gpr_mu_unlock(&s->buffer_mu); - if (length > 0) { - clean_unprocessed_frames_buffer(exec_ctx, t, s); + grpc_slice_buffer_reset_and_unref(&s->frame_storage); + if (!s->pending_byte_stream) { + grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer); } } - if (s->all_incoming_byte_streams_finished && + if (s->read_closed && s->frame_storage.length == 0 && + (!s->pending_byte_stream || s->seen_error) && s->recv_trailing_metadata_finished != NULL) { grpc_chttp2_incoming_metadata_buffer_publish( exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata); @@ -1613,34 +1590,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } } -static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - size_t length; - gpr_mu_lock(&s->buffer_mu); - length = s->unprocessed_incoming_frames_buffer.length; - gpr_mu_unlock(&s->buffer_mu); - if ((s->all_incoming_byte_streams_finished = - (gpr_unref(&s->active_streams) && length == 0))) { - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); - } -} - -static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - gpr_mu_lock(&s->buffer_mu); - grpc_slice_buffer_reset_and_unref_internal( - exec_ctx, &s->unprocessed_incoming_frames_buffer); - gpr_mu_unlock(&s->buffer_mu); - // TODO (mxyan): add get ref count in sync.c? - gpr_atm active_streams = - gpr_atm_no_barrier_fetch_add(&s->active_streams.count, 0); - if ((s->all_incoming_byte_streams_finished = (active_streams == 0))) { - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); - } -} - static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, uint32_t id, grpc_error *error) { grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id); @@ -1649,24 +1598,18 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->incoming_stream = NULL; grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); } - gpr_mu_lock(&s->buffer_mu); - if (s->data_parser.parsing_frame != NULL) { - grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; - gpr_mu_lock(&bs->slice_mu); - bs->push_closed = true; - if (bs->on_next != NULL) { - gpr_mu_unlock(&bs->slice_mu); - gpr_mu_unlock(&s->buffer_mu); - grpc_chttp2_incoming_byte_stream_finished( - exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error)); + if (s->pending_byte_stream) { + if (s->on_next != NULL) { + grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame; + if (error == GRPC_ERROR_NONE) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + } + incoming_byte_stream_publish_error(exec_ctx, bs, error); + incoming_byte_stream_unref(exec_ctx, bs); s->data_parser.parsing_frame = NULL; } else { - bs->error = GRPC_ERROR_REF(error); - gpr_mu_unlock(&bs->slice_mu); - gpr_mu_unlock(&s->buffer_mu); + s->byte_stream_error = GRPC_ERROR_REF(error); } - } else { - gpr_mu_unlock(&s->buffer_mu); } if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) { @@ -1852,7 +1795,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx, s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE; } } - decrement_active_streams_locked(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s); grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); } @@ -2304,11 +2246,32 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, * BYTE STREAM */ +static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg; + + s->pending_byte_stream = false; + if (error == GRPC_ERROR_NONE) { + grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s); + } else { + GPR_ASSERT(error != GRPC_ERROR_NONE); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + s->on_next = NULL; + GRPC_ERROR_UNREF(s->byte_stream_error); + grpc_chttp2_cancel_stream(exec_ctx, s->t, s, + GRPC_ERROR_REF(error)); + s->byte_stream_error = error; + } +} + static grpc_error *deframe_unprocessed_incoming_frames( grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices, - grpc_slice *slice_out, bool partial_deframe) { - bool slice_set = false; + grpc_chttp2_stream *s, grpc_slice_buffer *slices, + grpc_slice *slice_out, grpc_byte_stream **stream_out) { + grpc_error *error = GRPC_ERROR_NONE; + grpc_chttp2_transport *t = s->t; + while (slices->count > 0) { uint8_t *beg = NULL; uint8_t *end = NULL; @@ -2332,15 +2295,7 @@ static grpc_error *deframe_unprocessed_incoming_frames( p->state = GRPC_CHTTP2_DATA_ERROR; grpc_slice_unref_internal(exec_ctx, slice); return GRPC_ERROR_REF(p->error); - fh_0: case GRPC_CHTTP2_DATA_FH_0: - if (s->incoming_frames != NULL) { - grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } p->frame_type = *cur; switch (p->frame_type) { case 0: @@ -2396,6 +2351,8 @@ static grpc_error *deframe_unprocessed_incoming_frames( } /* fallthrough */ case GRPC_CHTTP2_DATA_FH_4: + GPR_ASSERT(stream_out != NULL); + GPR_ASSERT(p->parsing_frame == NULL); p->frame_size |= ((uint32_t)*cur); p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; @@ -2403,71 +2360,69 @@ static grpc_error *deframe_unprocessed_incoming_frames( if (p->is_frame_compressed) { message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } - GPR_ASSERT(s->incoming_frames == NULL); p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( exec_ctx, t, s, p->frame_size, message_flags); - /* fallthrough */ + *stream_out = &p->parsing_frame->base; + if (p->parsing_frame->remaining_bytes == 0) { + grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE); + p->parsing_frame = NULL; + p->state = GRPC_CHTTP2_DATA_FH_0; + } else { + s->pending_byte_stream = true; + } + + if (cur != end) { + grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + } + grpc_slice_unref(slice); + return GRPC_ERROR_NONE; case GRPC_CHTTP2_DATA_FRAME: { GPR_ASSERT(p->parsing_frame != NULL); - if (partial_deframe && p->frame_size > 0) { - if (cur != end) { - grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(end - beg))); - } - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } + GPR_ASSERT(slice_out != NULL); if (cur == end) { grpc_slice_unref_internal(exec_ctx, slice); continue; } - if (slice_set) { - grpc_slice_buffer_undo_take_first( - &s->unprocessed_incoming_frames_buffer, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - grpc_slice_unref_internal(exec_ctx, slice); - return GRPC_ERROR_NONE; - } uint32_t remaining = (uint32_t)(end - cur); if (remaining == p->frame_size) { - grpc_chttp2_incoming_byte_stream_push( + if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)), - slice_out); - slice_set = true; + slice_out))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; + } grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE); p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; + grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE); grpc_slice_unref_internal(exec_ctx, slice); - continue; + return GRPC_ERROR_NONE; } else if (remaining < p->frame_size) { - grpc_chttp2_incoming_byte_stream_push( + if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push( exec_ctx, p->parsing_frame, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)), - slice_out); - slice_set = true; + slice_out))) { + return error; + } p->frame_size -= remaining; grpc_slice_unref_internal(exec_ctx, slice); - continue; + return GRPC_ERROR_NONE; } else { GPR_ASSERT(remaining > p->frame_size); - if (p->frame_size > 0) { - grpc_chttp2_incoming_byte_stream_push( - exec_ctx, p->parsing_frame, - grpc_slice_sub(slice, (size_t)(cur - beg), - (size_t)(cur + p->frame_size - beg)), - slice_out); + if (GRPC_ERROR_NONE != (grpc_chttp2_incoming_byte_stream_push(exec_ctx, p->parsing_frame, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg)), slice_out))) { + grpc_slice_unref_internal(exec_ctx, slice); + return error; } - slice_set = true; grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE); p->parsing_frame = NULL; p->state = GRPC_CHTTP2_DATA_FH_0; cur += p->frame_size; - goto fh_0; + grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); + grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE); + grpc_slice_unref(slice); return GRPC_ERROR_NONE; } } @@ -2480,7 +2435,6 @@ static grpc_error *deframe_unprocessed_incoming_frames( static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) { if (gpr_unref(&bs->refs)) { - GRPC_ERROR_UNREF(bs->error); grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices); gpr_mu_destroy(&bs->slice_mu); gpr_free(bs); @@ -2542,90 +2496,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t = bs->transport; grpc_chttp2_stream *s = bs->stream; - if (bs->is_tail) { - gpr_mu_lock(&bs->slice_mu); - size_t cur_length = bs->slices.length; - gpr_mu_unlock(&bs->slice_mu); - incoming_byte_stream_update_flow_control( - exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); - } - gpr_mu_lock(&s->buffer_mu); - gpr_mu_lock(&bs->slice_mu); - if (s->unprocessed_incoming_frames_buffer.length > 0) { + size_t cur_length = s->frame_storage.length; + incoming_byte_stream_update_flow_control( + exec_ctx, t, s, bs->next_action.max_size_hint, cur_length); + + GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0); + if (s->frame_storage.length > 0) { + grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer); grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE); - } else if (bs->error != GRPC_ERROR_NONE) { + } else if (s->byte_stream_error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(bs->error)); - } else if (bs->push_closed) { + GRPC_ERROR_REF(s->byte_stream_error)); + if (s->data_parser.parsing_frame != NULL) { + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + s->data_parser.parsing_frame = NULL; + } + } else if (s->read_closed) { if (bs->remaining_bytes != 0) { - bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); grpc_closure_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_REF(bs->error)); + GRPC_ERROR_REF(s->byte_stream_error)); + if (s->data_parser.parsing_frame != NULL) { + incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame); + s->data_parser.parsing_frame = NULL; + } } else { /* Should never reach here. */ GPR_ASSERT(false); - grpc_closure_sched(exec_ctx, bs->next_action.on_complete, - GRPC_ERROR_NONE); } } else { - bs->on_next = bs->next_action.on_complete; + s->on_next = bs->next_action.on_complete; } - gpr_mu_unlock(&bs->slice_mu); - gpr_mu_unlock(&s->buffer_mu); incoming_byte_stream_unref(exec_ctx, bs); } +static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + size_t max_size_hint, + grpc_closure *on_complete) { + GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + grpc_chttp2_stream *s = bs->stream; + if (s->unprocessed_incoming_frames_buffer.length > 0) { + return 1; + } else { + gpr_ref(&bs->refs); + bs->next_action.max_size_hint = max_size_hint; + bs->next_action.on_complete = on_complete; + grpc_closure_sched( + exec_ctx, + grpc_closure_init( + &bs->next_action.closure, incoming_byte_stream_next_locked, bs, + grpc_combiner_scheduler(bs->transport->combiner, false)), + GRPC_ERROR_NONE); + GPR_TIMER_END("incoming_byte_stream_next", 0); + return 0; + } +} + static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream, grpc_slice *slice) { GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0); grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; + (grpc_chttp2_incoming_byte_stream *)byte_stream; grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = bs->transport; - if (bs->error) { - return bs->error; - } - gpr_mu_lock(&s->buffer_mu); if (s->unprocessed_incoming_frames_buffer.length > 0) { grpc_error *error = deframe_unprocessed_incoming_frames( - exec_ctx, &s->data_parser, t, s, &s->unprocessed_incoming_frames_buffer, - slice, false); + exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, + slice, NULL); if (error != GRPC_ERROR_NONE) { - gpr_mu_unlock(&s->buffer_mu); return error; } } else { - bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); - gpr_mu_unlock(&s->buffer_mu); - return bs->error; + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message"); + grpc_closure_sched(exec_ctx, + &s->reset_byte_stream, GRPC_ERROR_REF(error)); + return error; } - gpr_mu_unlock(&s->buffer_mu); GPR_TIMER_END("incoming_byte_stream_pull", 0); return GRPC_ERROR_NONE; } -static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream, - size_t max_size_hint, - grpc_closure *on_complete) { - GPR_TIMER_BEGIN("incoming_byte_stream_next", 0); - grpc_chttp2_incoming_byte_stream *bs = - (grpc_chttp2_incoming_byte_stream *)byte_stream; - gpr_ref(&bs->refs); - bs->next_action.max_size_hint = max_size_hint; - bs->next_action.on_complete = on_complete; - grpc_closure_sched( - exec_ctx, - grpc_closure_init( - &bs->next_action.closure, incoming_byte_stream_next_locked, bs, - grpc_combiner_scheduler(bs->transport->combiner, false)), - GRPC_ERROR_NONE); - GPR_TIMER_END("incoming_byte_stream_next", 0); - return 0; -} - static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream); @@ -2634,7 +2588,6 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_error *error_ignored) { grpc_chttp2_incoming_byte_stream *bs = byte_stream; GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream); incoming_byte_stream_unref(exec_ctx, bs); } @@ -2655,34 +2608,44 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, static void incoming_byte_stream_publish_error( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { + grpc_chttp2_stream *s = bs->stream; + GPR_ASSERT(error != GRPC_ERROR_NONE); - grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error)); - bs->on_next = NULL; - GRPC_ERROR_UNREF(bs->error); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error)); + s->on_next = NULL; + GRPC_ERROR_UNREF(s->byte_stream_error); grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream, GRPC_ERROR_REF(error)); - bs->error = error; + s->byte_stream_error = error; } -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, +grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_slice slice, grpc_slice *slice_out) { + grpc_chttp2_stream *s = bs->stream; + if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) { - incoming_byte_stream_publish_error( - exec_ctx, bs, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream")); + grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"); + + grpc_closure_sched(exec_ctx, + &s->reset_byte_stream, GRPC_ERROR_REF(error)); + grpc_slice_unref_internal(exec_ctx, slice); + return error; } else { bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice); if (slice_out != NULL) { *slice_out = slice; } + return GRPC_ERROR_NONE; } } -void grpc_chttp2_incoming_byte_stream_finished( +grpc_error *grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error) { + grpc_chttp2_stream *s = bs->stream; + if (error == GRPC_ERROR_NONE) { gpr_mu_lock(&bs->slice_mu); if (bs->remaining_bytes != 0) { @@ -2691,9 +2654,11 @@ void grpc_chttp2_incoming_byte_stream_finished( gpr_mu_unlock(&bs->slice_mu); } if (error != GRPC_ERROR_NONE) { - incoming_byte_stream_publish_error(exec_ctx, bs, error); + grpc_closure_sched(exec_ctx, + &s->reset_byte_stream, GRPC_ERROR_REF(error)); } incoming_byte_stream_unref(exec_ctx, bs); + return error; } grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( @@ -2712,14 +2677,10 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->next_message = NULL; incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; - gpr_ref(&incoming_byte_stream->stream->active_streams); grpc_slice_buffer_init(&incoming_byte_stream->slices); - incoming_byte_stream->on_next = NULL; incoming_byte_stream->is_tail = 1; - incoming_byte_stream->error = GRPC_ERROR_NONE; + s->byte_stream_error = GRPC_ERROR_NONE; incoming_byte_stream->push_closed = false; - s->incoming_frames = incoming_byte_stream; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); return incoming_byte_stream; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index ecd53e2ce96..ecb941e366c 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -63,7 +63,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id) { + uint32_t stream_id, + grpc_chttp2_stream *s) { if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) { char *msg; gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags); @@ -75,9 +76,9 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, } if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { - parser->is_last_frame = 1; + s->received_last_frame = true; } else { - parser->is_last_frame = 0; + s->received_last_frame = false; } return GRPC_ERROR_NONE; @@ -144,172 +145,31 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, stats->data_bytes += write_bytes; } -static void grpc_chttp2_unprocessed_frames_buffer_push( - grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s, - grpc_slice slice) { - grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); - if (p->parsing_frame) { - grpc_chttp2_incoming_byte_stream *bs = p->parsing_frame; - // Necessary? - gpr_mu_lock(&bs->slice_mu); - if (bs->on_next != NULL) { - grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE); - bs->on_next = NULL; - } - gpr_mu_unlock(&bs->slice_mu); - } -} - -grpc_error *parse_inner_buffer(grpc_exec_ctx *exec_ctx, - grpc_chttp2_data_parser *p, - grpc_chttp2_transport *t, grpc_chttp2_stream *s, - grpc_slice slice) { - uint8_t *const beg = GRPC_SLICE_START_PTR(slice); - uint8_t *const end = GRPC_SLICE_END_PTR(slice); - uint8_t *cur = beg; - uint32_t message_flags; - char *msg; - - if (cur == end) { - return GRPC_ERROR_NONE; - } - - /* If there is already pending data, or if there is a pending - * incoming_byte_stream that is finished, append the data to unprocessed frame - * buffer. */ - gpr_mu_lock(&s->buffer_mu); - if (s->unprocessed_incoming_frames_buffer.count > 0) { - s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); - grpc_slice_ref_internal(slice); - grpc_chttp2_unprocessed_frames_buffer_push(exec_ctx, p, s, slice); - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - - switch (p->state) { - case GRPC_CHTTP2_DATA_ERROR: - p->state = GRPC_CHTTP2_DATA_ERROR; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_REF(p->error); - fh_0: - case GRPC_CHTTP2_DATA_FH_0: - if (s->incoming_frames != NULL) { - s->stats.incoming.framing_bytes += (size_t)(end - cur); - grpc_chttp2_unprocessed_frames_buffer_push( - exec_ctx, p, s, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - s->stats.incoming.framing_bytes++; - p->frame_type = *cur; - switch (p->frame_type) { - case 0: - p->is_frame_compressed = 0; /* GPR_FALSE */ - break; - case 1: - p->is_frame_compressed = 1; /* GPR_TRUE */ - break; - default: - gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type); - p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); - p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID, - (intptr_t)s->id); - gpr_free(msg); - msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES, - grpc_slice_from_copied_string(msg)); - gpr_free(msg); - p->error = - grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg); - p->state = GRPC_CHTTP2_DATA_ERROR; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_REF(p->error); - } - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_1; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_1: - s->stats.incoming.framing_bytes++; - p->frame_size = ((uint32_t)*cur) << 24; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_2; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_2: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur) << 16; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_3; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_3: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur) << 8; - if (++cur == end) { - p->state = GRPC_CHTTP2_DATA_FH_4; - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - /* fallthrough */ - case GRPC_CHTTP2_DATA_FH_4: - s->stats.incoming.framing_bytes++; - p->frame_size |= ((uint32_t)*cur); - p->state = GRPC_CHTTP2_DATA_FRAME; - ++cur; - message_flags = 0; - if (p->is_frame_compressed) { - message_flags |= GRPC_WRITE_INTERNAL_COMPRESS; - } - GPR_ASSERT(s->incoming_frames == NULL); - p->parsing_frame = grpc_chttp2_incoming_byte_stream_create( - exec_ctx, t, s, p->frame_size, message_flags); - /* fallthrough */ - case GRPC_CHTTP2_DATA_FRAME: - if (p->parsing_frame->remaining_bytes == 0) { - grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, - GRPC_ERROR_NONE); - p->parsing_frame = NULL; - p->state = GRPC_CHTTP2_DATA_FH_0; - if (cur != end) { - goto fh_0; - } - } - if (cur == end) { - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - uint32_t remaining = (uint32_t)(end - cur); - s->stats.incoming.data_bytes += remaining; - grpc_chttp2_unprocessed_frames_buffer_push( - exec_ctx, p, s, - grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg))); - gpr_mu_unlock(&s->buffer_mu); - return GRPC_ERROR_NONE; - } - - GPR_UNREACHABLE_CODE( - return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here")); -} - grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice slice, int is_last) { - grpc_chttp2_data_parser *p = parser; - grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); + /* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */ + s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice); + if (!s->pending_byte_stream) { + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->frame_storage, slice); + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + } else if (s->on_next) { + GPR_ASSERT(s->frame_storage.length == 0); + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice); + grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE); + s->on_next = NULL; + } else { + grpc_slice_ref_internal(slice); + grpc_slice_buffer_add(&s->frame_storage, slice); + } - if (is_last && p->is_last_frame) { + if (is_last && s->received_last_frame) { grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); } - return error; + return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 264ad146085..e7e459c79fa 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -63,7 +63,6 @@ typedef struct grpc_chttp2_incoming_frame_queue { typedef struct { grpc_chttp2_stream_state state; - uint8_t is_last_frame; uint8_t frame_type; uint32_t frame_size; grpc_error *error; @@ -87,7 +86,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx, /* start processing a new data frame */ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser, uint8_t flags, - uint32_t stream_id); + uint32_t stream_id, + grpc_chttp2_stream *s); /* handle a slice of a data frame - is_last indicates the last slice of a frame */ diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5604ea3e31c..917fc1b71eb 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -187,7 +187,6 @@ struct grpc_chttp2_incoming_byte_stream { grpc_byte_stream base; gpr_refcount refs; struct grpc_chttp2_incoming_byte_stream *next_message; /* unused; should be removed */ - grpc_error *error; /* protected by slice_mu */ bool push_closed; /* protected by slice_mu */ grpc_chttp2_transport *transport; /* immutable */ @@ -196,7 +195,6 @@ struct grpc_chttp2_incoming_byte_stream { gpr_mu slice_mu; grpc_slice_buffer slices; /* unused; should be removed */ - grpc_closure *on_next; /* protected by slice_mu */ uint32_t remaining_bytes; /* guaranteed one thread access */ struct { @@ -462,9 +460,6 @@ struct grpc_chttp2_stream { grpc_transport_stream_stats *collecting_stats; grpc_transport_stream_stats stats; - /** number of streams that are currently being read */ - gpr_refcount active_streams; - /** Is this stream closed for writing. */ bool write_closed; /** Is this stream reading half-closed. */ @@ -488,10 +483,13 @@ struct grpc_chttp2_stream { grpc_chttp2_incoming_metadata_buffer metadata_buffer[2]; - grpc_chttp2_incoming_byte_stream *incoming_frames; /* protected by buffer_mu */ - gpr_mu buffer_mu; /* protects unprocessed_incoming_frames_buffer and - data_parser */ - grpc_slice_buffer unprocessed_incoming_frames_buffer; /* protected by buffer_mu */ + grpc_slice_buffer frame_storage; /* protected by t combiner */ + grpc_slice_buffer unprocessed_incoming_frames_buffer; /* guaranteed one thread access */ + grpc_closure *on_next; /* protected by t combiner */ + bool pending_byte_stream; /* protected by t combiner */ + grpc_closure reset_byte_stream; + grpc_error *byte_stream_error; /* protected by t combiner */ + bool received_last_frame; /* proected by t combiner */ gpr_timespec deadline; @@ -504,7 +502,7 @@ struct grpc_chttp2_stream { * incoming_window = incoming_window_delta + transport.initial_window_size */ int64_t incoming_window_delta; /** parsing state for data frames */ - grpc_chttp2_data_parser data_parser; /* protected by buffer_mu */ + grpc_chttp2_data_parser data_parser; /* guaranteed one thread access */ /** number of bytes received - reset at end of parse thread execution */ int64_t received_bytes; @@ -782,11 +780,11 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t); grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags); -void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, - grpc_chttp2_incoming_byte_stream *bs, - grpc_slice slice, - grpc_slice *slice_out); -void grpc_chttp2_incoming_byte_stream_finished( +grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx, + grpc_chttp2_incoming_byte_stream *bs, + grpc_slice slice, + grpc_slice *slice_out); +grpc_error *grpc_chttp2_incoming_byte_stream_finished( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs, grpc_error *error); void grpc_chttp2_incoming_byte_stream_notify( diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index f1c6f96db58..2c662b67219 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -458,10 +458,8 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx, return init_skip_frame_parser(exec_ctx, t, 0); } if (err == GRPC_ERROR_NONE) { - gpr_mu_lock(&s->buffer_mu); err = grpc_chttp2_data_parser_begin_frame(&s->data_parser, - t->incoming_frame_flags, s->id); - gpr_mu_unlock(&s->buffer_mu); + t->incoming_frame_flags, s->id, s); } error_handler: if (err == GRPC_ERROR_NONE) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 963e16046a3..6ea199e84f8 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1143,6 +1143,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) { static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, batch_control *bctl) { + grpc_error *error; grpc_call *call = bctl->call; for (;;) { size_t remaining = call->receiving_stream->length - @@ -1156,10 +1157,20 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx, } if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining, &call->receiving_slice_ready)) { - grpc_byte_stream_pull(exec_ctx, call->receiving_stream, + error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream, &call->receiving_slice); - grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, - call->receiving_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer, + call->receiving_slice); + } else { + grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); + call->receiving_stream = NULL; + grpc_byte_buffer_destroy(*call->receiving_buffer); + *call->receiving_buffer = NULL; + call->receiving_message = 0; + finish_batch_step(exec_ctx, bctl); + return; + } } else { return; }