Polished with better thread safety

pull/9626/head
Muxi Yan 8 years ago
parent ef033aa6a8
commit ac39957928
  1. 431
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 184
      src/core/ext/transport/chttp2/transport/frame_data.c
  3. 4
      src/core/ext/transport/chttp2/transport/frame_data.h
  4. 28
      src/core/ext/transport/chttp2/transport/internal.h
  5. 4
      src/core/ext/transport/chttp2/transport/parsing.c
  6. 17
      src/core/lib/surface/call.c

@ -115,6 +115,11 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
void *byte_stream,
grpc_error *error_ignored);
static void incoming_byte_stream_publish_error(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs);
static void benign_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *t,
grpc_error *error);
@ -156,13 +161,14 @@ static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg,
static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
static grpc_error *deframe_unprocessed_incoming_frames(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices,
grpc_slice *slice_out, bool partial_deframe);
static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s);
static grpc_error *deframe_unprocessed_incoming_frames(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_stream *s,
grpc_slice_buffer *slices,
grpc_slice *slice_out,
grpc_byte_stream **stream_out);
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
/*******************************************************************************
* CONSTRUCTION/DESTRUCTION/REFCOUNTING
@ -596,7 +602,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
/* We reserve one 'active stream' that's dropped when the stream is
read-closed. The others are for incoming_byte_streams that are actively
reading */
gpr_ref_init(&s->active_streams, 1);
GRPC_CHTTP2_STREAM_REF(s, "chttp2");
grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena);
@ -606,9 +611,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
grpc_closure_init(&s->complete_fetch_locked, complete_fetch_locked, s,
grpc_schedule_on_exec_ctx);
s->incoming_frames = NULL;
grpc_slice_buffer_init(&s->unprocessed_incoming_frames_buffer);
gpr_mu_init(&s->buffer_mu);
grpc_slice_buffer_init(&s->frame_storage);
s->pending_byte_stream = false;
grpc_closure_init(&s->reset_byte_stream, reset_byte_stream, s, grpc_combiner_scheduler(t->combiner, false));
GRPC_CHTTP2_REF_TRANSPORT(t, "stream");
@ -638,11 +644,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx,
&s->unprocessed_incoming_frames_buffer);
if (s->incoming_frames != NULL) {
grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
s->incoming_frames = NULL;
incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base, GRPC_ERROR_NONE);
}
grpc_slice_buffer_destroy_internal(exec_ctx, &s->frame_storage);
grpc_chttp2_list_remove_stalled_by_transport(t, s);
grpc_chttp2_list_remove_stalled_by_stream(t, s);
@ -661,9 +663,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
GPR_ASSERT(s->recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->recv_message_ready == NULL);
GPR_ASSERT(s->recv_trailing_metadata_finished == NULL);
gpr_mu_lock(&s->buffer_mu);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->data_parser);
gpr_mu_unlock(&s->buffer_mu);
grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
&s->metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(exec_ctx,
@ -671,6 +671,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
grpc_slice_buffer_destroy_internal(exec_ctx, &s->flow_controlled_buffer);
GRPC_ERROR_UNREF(s->read_closed_error);
GRPC_ERROR_UNREF(s->write_closed_error);
GRPC_ERROR_UNREF(s->byte_stream_error);
if (s->incoming_window_delta > 0) {
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA(
@ -1345,22 +1346,9 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
GPR_ASSERT(s->recv_message_ready == NULL);
s->recv_message_ready = op->recv_message_ready;
s->recv_message = op->recv_message;
gpr_mu_lock(&s->buffer_mu);
if (s->id != 0 && (s->incoming_frames == NULL ||
s->unprocessed_incoming_frames_buffer.count == 0)) {
gpr_mu_unlock(&s->buffer_mu);
if (s->id != 0 && s->frame_storage.length == 0) {
incoming_byte_stream_update_flow_control(exec_ctx, t, s, 5, 0);
} else {
gpr_mu_unlock(&s->buffer_mu);
}
gpr_mu_lock(&s->buffer_mu);
if (s->incoming_frames == NULL &&
s->unprocessed_incoming_frames_buffer.count > 0) {
deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, t, s,
&s->unprocessed_incoming_frames_buffer, NULL, true);
}
gpr_mu_unlock(&s->buffer_mu);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@ -1527,18 +1515,9 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
if (s->recv_initial_metadata_ready != NULL &&
s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
if (s->seen_error) {
if (s->incoming_frames != NULL) {
grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
s->incoming_frames = NULL;
incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
GRPC_ERROR_NONE);
}
size_t length;
gpr_mu_lock(&s->buffer_mu);
length = s->unprocessed_incoming_frames_buffer.length;
gpr_mu_unlock(&s->buffer_mu);
if (length > 0) {
clean_unprocessed_frames_buffer(exec_ctx, t, s);
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@ -1551,32 +1530,38 @@ void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_exec_ctx *exec_ctx,
void grpc_chttp2_maybe_complete_recv_message(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
grpc_error *error = GRPC_ERROR_NONE;
if (s->recv_message_ready != NULL) {
*s->recv_message = NULL;
if (s->final_metadata_requested && s->seen_error) {
if (s->incoming_frames != NULL) {
grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
s->incoming_frames = NULL;
incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
GRPC_ERROR_NONE);
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
size_t length;
gpr_mu_lock(&s->buffer_mu);
length = s->unprocessed_incoming_frames_buffer.length;
gpr_mu_unlock(&s->buffer_mu);
if (length > 0) {
clean_unprocessed_frames_buffer(exec_ctx, t, s);
}
if (!s->pending_byte_stream) {
while (s->unprocessed_incoming_frames_buffer.length > 0 ||
s->frame_storage.length > 0) {
if (s->unprocessed_incoming_frames_buffer.length == 0) {
grpc_slice_buffer_swap(&s->unprocessed_incoming_frames_buffer, &s->frame_storage);
}
/* error handling ok? */
error = deframe_unprocessed_incoming_frames(exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, NULL, s->recv_message);
if (error != GRPC_ERROR_NONE) {
s->seen_error = true;
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
break;
} else if (*s->recv_message != NULL) {
break;
}
}
}
if (s->incoming_frames != NULL) {
*s->recv_message = &s->incoming_frames->base;
s->incoming_frames = NULL;
GPR_ASSERT(*s->recv_message != NULL);
grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
s->recv_message_ready = NULL;
if (error == GRPC_ERROR_NONE && *s->recv_message != NULL) {
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
} else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
*s->recv_message = NULL;
grpc_closure_sched(exec_ctx, s->recv_message_ready, GRPC_ERROR_NONE);
s->recv_message_ready = NULL;
null_then_run_closure(exec_ctx, &s->recv_message_ready, GRPC_ERROR_NONE);
}
}
}
@ -1588,21 +1573,13 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
if (s->recv_trailing_metadata_finished != NULL && s->read_closed &&
s->write_closed) {
if (s->seen_error) {
if (s->incoming_frames != NULL) {
grpc_chttp2_incoming_byte_stream *ibs = s->incoming_frames;
s->incoming_frames = NULL;
incoming_byte_stream_destroy_locked(exec_ctx, &ibs->base,
GRPC_ERROR_NONE);
}
size_t length;
gpr_mu_lock(&s->buffer_mu);
length = s->unprocessed_incoming_frames_buffer.length;
gpr_mu_unlock(&s->buffer_mu);
if (length > 0) {
clean_unprocessed_frames_buffer(exec_ctx, t, s);
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
if (!s->pending_byte_stream) {
grpc_slice_buffer_reset_and_unref(&s->unprocessed_incoming_frames_buffer);
}
}
if (s->all_incoming_byte_streams_finished &&
if (s->read_closed && s->frame_storage.length == 0 &&
(!s->pending_byte_stream || s->seen_error) &&
s->recv_trailing_metadata_finished != NULL) {
grpc_chttp2_incoming_metadata_buffer_publish(
exec_ctx, &s->metadata_buffer[1], s->recv_trailing_metadata);
@ -1613,34 +1590,6 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_exec_ctx *exec_ctx,
}
}
static void decrement_active_streams_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
size_t length;
gpr_mu_lock(&s->buffer_mu);
length = s->unprocessed_incoming_frames_buffer.length;
gpr_mu_unlock(&s->buffer_mu);
if ((s->all_incoming_byte_streams_finished =
(gpr_unref(&s->active_streams) && length == 0))) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
}
static void clean_unprocessed_frames_buffer(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s) {
gpr_mu_lock(&s->buffer_mu);
grpc_slice_buffer_reset_and_unref_internal(
exec_ctx, &s->unprocessed_incoming_frames_buffer);
gpr_mu_unlock(&s->buffer_mu);
// TODO (mxyan): add get ref count in sync.c?
gpr_atm active_streams =
gpr_atm_no_barrier_fetch_add(&s->active_streams.count, 0);
if ((s->all_incoming_byte_streams_finished = (active_streams == 0))) {
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s);
}
}
static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
uint32_t id, grpc_error *error) {
grpc_chttp2_stream *s = grpc_chttp2_stream_map_delete(&t->stream_map, id);
@ -1649,24 +1598,18 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, t);
}
gpr_mu_lock(&s->buffer_mu);
if (s->data_parser.parsing_frame != NULL) {
grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
gpr_mu_lock(&bs->slice_mu);
bs->push_closed = true;
if (bs->on_next != NULL) {
gpr_mu_unlock(&bs->slice_mu);
gpr_mu_unlock(&s->buffer_mu);
grpc_chttp2_incoming_byte_stream_finished(
exec_ctx, s->data_parser.parsing_frame, GRPC_ERROR_REF(error));
if (s->pending_byte_stream) {
if (s->on_next != NULL) {
grpc_chttp2_incoming_byte_stream *bs = s->data_parser.parsing_frame;
if (error == GRPC_ERROR_NONE) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
}
incoming_byte_stream_publish_error(exec_ctx, bs, error);
incoming_byte_stream_unref(exec_ctx, bs);
s->data_parser.parsing_frame = NULL;
} else {
bs->error = GRPC_ERROR_REF(error);
gpr_mu_unlock(&bs->slice_mu);
gpr_mu_unlock(&s->buffer_mu);
s->byte_stream_error = GRPC_ERROR_REF(error);
}
} else {
gpr_mu_unlock(&s->buffer_mu);
}
if (grpc_chttp2_stream_map_size(&t->stream_map) == 0) {
@ -1852,7 +1795,6 @@ void grpc_chttp2_mark_stream_closed(grpc_exec_ctx *exec_ctx,
s->published_metadata[i] = GPRC_METADATA_PUBLISHED_AT_CLOSE;
}
}
decrement_active_streams_locked(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
}
@ -2304,11 +2246,32 @@ static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
static void reset_byte_stream(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_stream *s = (grpc_chttp2_stream *)arg;
s->pending_byte_stream = false;
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_maybe_complete_recv_message(exec_ctx, s->t, s);
grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, s->t, s);
} else {
GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_cancel_stream(exec_ctx, s->t, s,
GRPC_ERROR_REF(error));
s->byte_stream_error = error;
}
}
static grpc_error *deframe_unprocessed_incoming_frames(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p,
grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_slice_buffer *slices,
grpc_slice *slice_out, bool partial_deframe) {
bool slice_set = false;
grpc_chttp2_stream *s, grpc_slice_buffer *slices,
grpc_slice *slice_out, grpc_byte_stream **stream_out) {
grpc_error *error = GRPC_ERROR_NONE;
grpc_chttp2_transport *t = s->t;
while (slices->count > 0) {
uint8_t *beg = NULL;
uint8_t *end = NULL;
@ -2332,15 +2295,7 @@ static grpc_error *deframe_unprocessed_incoming_frames(
p->state = GRPC_CHTTP2_DATA_ERROR;
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_REF(p->error);
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
if (s->incoming_frames != NULL) {
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
@ -2396,6 +2351,8 @@ static grpc_error *deframe_unprocessed_incoming_frames(
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
GPR_ASSERT(stream_out != NULL);
GPR_ASSERT(p->parsing_frame == NULL);
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
@ -2403,71 +2360,69 @@ static grpc_error *deframe_unprocessed_incoming_frames(
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
GPR_ASSERT(s->incoming_frames == NULL);
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
/* fallthrough */
*stream_out = &p->parsing_frame->base;
if (p->parsing_frame->remaining_bytes == 0) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame, GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
} else {
s->pending_byte_stream = true;
}
if (cur != end) {
grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
}
grpc_slice_unref(slice);
return GRPC_ERROR_NONE;
case GRPC_CHTTP2_DATA_FRAME: {
GPR_ASSERT(p->parsing_frame != NULL);
if (partial_deframe && p->frame_size > 0) {
if (cur != end) {
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(end - beg)));
}
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
GPR_ASSERT(slice_out != NULL);
if (cur == end) {
grpc_slice_unref_internal(exec_ctx, slice);
continue;
}
if (slice_set) {
grpc_slice_buffer_undo_take_first(
&s->unprocessed_incoming_frames_buffer,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_slice_unref_internal(exec_ctx, slice);
return GRPC_ERROR_NONE;
}
uint32_t remaining = (uint32_t)(end - cur);
if (remaining == p->frame_size) {
grpc_chttp2_incoming_byte_stream_push(
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)),
slice_out);
slice_set = true;
slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE);
grpc_slice_unref_internal(exec_ctx, slice);
continue;
return GRPC_ERROR_NONE;
} else if (remaining < p->frame_size) {
grpc_chttp2_incoming_byte_stream_push(
if (GRPC_ERROR_NONE != (error = grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)),
slice_out);
slice_set = true;
slice_out))) {
return error;
}
p->frame_size -= remaining;
grpc_slice_unref_internal(exec_ctx, slice);
continue;
return GRPC_ERROR_NONE;
} else {
GPR_ASSERT(remaining > p->frame_size);
if (p->frame_size > 0) {
grpc_chttp2_incoming_byte_stream_push(
exec_ctx, p->parsing_frame,
grpc_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)),
slice_out);
if (GRPC_ERROR_NONE != (grpc_chttp2_incoming_byte_stream_push(exec_ctx, p->parsing_frame, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(cur + p->frame_size - beg)), slice_out))) {
grpc_slice_unref_internal(exec_ctx, slice);
return error;
}
slice_set = true;
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
cur += p->frame_size;
goto fh_0;
grpc_slice_buffer_undo_take_first(&s->unprocessed_incoming_frames_buffer, grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_closure_sched(exec_ctx, &s->reset_byte_stream, GRPC_ERROR_NONE);
grpc_slice_unref(slice);
return GRPC_ERROR_NONE;
}
}
@ -2480,7 +2435,6 @@ static grpc_error *deframe_unprocessed_incoming_frames(
static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs) {
if (gpr_unref(&bs->refs)) {
GRPC_ERROR_UNREF(bs->error);
grpc_slice_buffer_destroy_internal(exec_ctx, &bs->slices);
gpr_mu_destroy(&bs->slice_mu);
gpr_free(bs);
@ -2542,90 +2496,90 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t = bs->transport;
grpc_chttp2_stream *s = bs->stream;
if (bs->is_tail) {
gpr_mu_lock(&bs->slice_mu);
size_t cur_length = bs->slices.length;
gpr_mu_unlock(&bs->slice_mu);
incoming_byte_stream_update_flow_control(
exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
}
gpr_mu_lock(&s->buffer_mu);
gpr_mu_lock(&bs->slice_mu);
if (s->unprocessed_incoming_frames_buffer.length > 0) {
size_t cur_length = s->frame_storage.length;
incoming_byte_stream_update_flow_control(
exec_ctx, t, s, bs->next_action.max_size_hint, cur_length);
GPR_ASSERT(s->unprocessed_incoming_frames_buffer.length == 0);
if (s->frame_storage.length > 0) {
grpc_slice_buffer_swap(&s->frame_storage, &s->unprocessed_incoming_frames_buffer);
grpc_closure_sched(exec_ctx, bs->next_action.on_complete, GRPC_ERROR_NONE);
} else if (bs->error != GRPC_ERROR_NONE) {
} else if (s->byte_stream_error != GRPC_ERROR_NONE) {
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(bs->error));
} else if (bs->push_closed) {
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = NULL;
}
} else if (s->read_closed) {
if (bs->remaining_bytes != 0) {
bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
s->byte_stream_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_REF(bs->error));
GRPC_ERROR_REF(s->byte_stream_error));
if (s->data_parser.parsing_frame != NULL) {
incoming_byte_stream_unref(exec_ctx, s->data_parser.parsing_frame);
s->data_parser.parsing_frame = NULL;
}
} else {
/* Should never reach here. */
GPR_ASSERT(false);
grpc_closure_sched(exec_ctx, bs->next_action.on_complete,
GRPC_ERROR_NONE);
}
} else {
bs->on_next = bs->next_action.on_complete;
s->on_next = bs->next_action.on_complete;
}
gpr_mu_unlock(&bs->slice_mu);
gpr_mu_unlock(&s->buffer_mu);
incoming_byte_stream_unref(exec_ctx, bs);
}
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
size_t max_size_hint,
grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
if (s->unprocessed_incoming_frames_buffer.length > 0) {
return 1;
} else {
gpr_ref(&bs->refs);
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(
&bs->next_action.closure, incoming_byte_stream_next_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
}
static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
grpc_slice *slice) {
GPR_TIMER_BEGIN("incoming_byte_stream_pull", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_stream *s = bs->stream;
grpc_chttp2_transport *t = bs->transport;
if (bs->error) {
return bs->error;
}
gpr_mu_lock(&s->buffer_mu);
if (s->unprocessed_incoming_frames_buffer.length > 0) {
grpc_error *error = deframe_unprocessed_incoming_frames(
exec_ctx, &s->data_parser, t, s, &s->unprocessed_incoming_frames_buffer,
slice, false);
exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer,
slice, NULL);
if (error != GRPC_ERROR_NONE) {
gpr_mu_unlock(&s->buffer_mu);
return error;
}
} else {
bs->error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
gpr_mu_unlock(&s->buffer_mu);
return bs->error;
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Truncated message");
grpc_closure_sched(exec_ctx,
&s->reset_byte_stream, GRPC_ERROR_REF(error));
return error;
}
gpr_mu_unlock(&s->buffer_mu);
GPR_TIMER_END("incoming_byte_stream_pull", 0);
return GRPC_ERROR_NONE;
}
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
size_t max_size_hint,
grpc_closure *on_complete) {
GPR_TIMER_BEGIN("incoming_byte_stream_next", 0);
grpc_chttp2_incoming_byte_stream *bs =
(grpc_chttp2_incoming_byte_stream *)byte_stream;
gpr_ref(&bs->refs);
bs->next_action.max_size_hint = max_size_hint;
bs->next_action.on_complete = on_complete;
grpc_closure_sched(
exec_ctx,
grpc_closure_init(
&bs->next_action.closure, incoming_byte_stream_next_locked, bs,
grpc_combiner_scheduler(bs->transport->combiner, false)),
GRPC_ERROR_NONE);
GPR_TIMER_END("incoming_byte_stream_next", 0);
return 0;
}
static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream);
@ -2634,7 +2588,6 @@ static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx,
grpc_error *error_ignored) {
grpc_chttp2_incoming_byte_stream *bs = byte_stream;
GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy);
decrement_active_streams_locked(exec_ctx, bs->transport, bs->stream);
incoming_byte_stream_unref(exec_ctx, bs);
}
@ -2655,34 +2608,44 @@ static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
static void incoming_byte_stream_publish_error(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
grpc_chttp2_stream *s = bs->stream;
GPR_ASSERT(error != GRPC_ERROR_NONE);
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error));
bs->on_next = NULL;
GRPC_ERROR_UNREF(bs->error);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_REF(error));
s->on_next = NULL;
GRPC_ERROR_UNREF(s->byte_stream_error);
grpc_chttp2_cancel_stream(exec_ctx, bs->transport, bs->stream,
GRPC_ERROR_REF(error));
bs->error = error;
s->byte_stream_error = error;
}
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice,
grpc_slice *slice_out) {
grpc_chttp2_stream *s = bs->stream;
if (bs->remaining_bytes < GRPC_SLICE_LENGTH(slice)) {
incoming_byte_stream_publish_error(
exec_ctx, bs,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream"));
grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Too many bytes in stream");
grpc_closure_sched(exec_ctx,
&s->reset_byte_stream, GRPC_ERROR_REF(error));
grpc_slice_unref_internal(exec_ctx, slice);
return error;
} else {
bs->remaining_bytes -= (uint32_t)GRPC_SLICE_LENGTH(slice);
if (slice_out != NULL) {
*slice_out = slice;
}
return GRPC_ERROR_NONE;
}
}
void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error) {
grpc_chttp2_stream *s = bs->stream;
if (error == GRPC_ERROR_NONE) {
gpr_mu_lock(&bs->slice_mu);
if (bs->remaining_bytes != 0) {
@ -2691,9 +2654,11 @@ void grpc_chttp2_incoming_byte_stream_finished(
gpr_mu_unlock(&bs->slice_mu);
}
if (error != GRPC_ERROR_NONE) {
incoming_byte_stream_publish_error(exec_ctx, bs, error);
grpc_closure_sched(exec_ctx,
&s->reset_byte_stream, GRPC_ERROR_REF(error));
}
incoming_byte_stream_unref(exec_ctx, bs);
return error;
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
@ -2712,14 +2677,10 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
incoming_byte_stream->next_message = NULL;
incoming_byte_stream->transport = t;
incoming_byte_stream->stream = s;
gpr_ref(&incoming_byte_stream->stream->active_streams);
grpc_slice_buffer_init(&incoming_byte_stream->slices);
incoming_byte_stream->on_next = NULL;
incoming_byte_stream->is_tail = 1;
incoming_byte_stream->error = GRPC_ERROR_NONE;
s->byte_stream_error = GRPC_ERROR_NONE;
incoming_byte_stream->push_closed = false;
s->incoming_frames = incoming_byte_stream;
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
return incoming_byte_stream;
}

@ -63,7 +63,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
uint32_t stream_id) {
uint32_t stream_id,
grpc_chttp2_stream *s) {
if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
char *msg;
gpr_asprintf(&msg, "unsupported data flags: 0x%02x", flags);
@ -75,9 +76,9 @@ grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
}
if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
parser->is_last_frame = 1;
s->received_last_frame = true;
} else {
parser->is_last_frame = 0;
s->received_last_frame = false;
}
return GRPC_ERROR_NONE;
@ -144,172 +145,31 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf,
stats->data_bytes += write_bytes;
}
static void grpc_chttp2_unprocessed_frames_buffer_push(
grpc_exec_ctx *exec_ctx, grpc_chttp2_data_parser *p, grpc_chttp2_stream *s,
grpc_slice slice) {
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
if (p->parsing_frame) {
grpc_chttp2_incoming_byte_stream *bs = p->parsing_frame;
// Necessary?
gpr_mu_lock(&bs->slice_mu);
if (bs->on_next != NULL) {
grpc_closure_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE);
bs->on_next = NULL;
}
gpr_mu_unlock(&bs->slice_mu);
}
}
grpc_error *parse_inner_buffer(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *p,
grpc_chttp2_transport *t, grpc_chttp2_stream *s,
grpc_slice slice) {
uint8_t *const beg = GRPC_SLICE_START_PTR(slice);
uint8_t *const end = GRPC_SLICE_END_PTR(slice);
uint8_t *cur = beg;
uint32_t message_flags;
char *msg;
if (cur == end) {
return GRPC_ERROR_NONE;
}
/* If there is already pending data, or if there is a pending
* incoming_byte_stream that is finished, append the data to unprocessed frame
* buffer. */
gpr_mu_lock(&s->buffer_mu);
if (s->unprocessed_incoming_frames_buffer.count > 0) {
s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
grpc_slice_ref_internal(slice);
grpc_chttp2_unprocessed_frames_buffer_push(exec_ctx, p, s, slice);
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
switch (p->state) {
case GRPC_CHTTP2_DATA_ERROR:
p->state = GRPC_CHTTP2_DATA_ERROR;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_REF(p->error);
fh_0:
case GRPC_CHTTP2_DATA_FH_0:
if (s->incoming_frames != NULL) {
s->stats.incoming.framing_bytes += (size_t)(end - cur);
grpc_chttp2_unprocessed_frames_buffer_push(
exec_ctx, p, s,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
s->stats.incoming.framing_bytes++;
p->frame_type = *cur;
switch (p->frame_type) {
case 0:
p->is_frame_compressed = 0; /* GPR_FALSE */
break;
case 1:
p->is_frame_compressed = 1; /* GPR_TRUE */
break;
default:
gpr_asprintf(&msg, "Bad GRPC frame type 0x%02x", p->frame_type);
p->error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
p->error = grpc_error_set_int(p->error, GRPC_ERROR_INT_STREAM_ID,
(intptr_t)s->id);
gpr_free(msg);
msg = grpc_dump_slice(slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
p->error = grpc_error_set_str(p->error, GRPC_ERROR_STR_RAW_BYTES,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
p->error =
grpc_error_set_int(p->error, GRPC_ERROR_INT_OFFSET, cur - beg);
p->state = GRPC_CHTTP2_DATA_ERROR;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_REF(p->error);
}
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_1;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_1:
s->stats.incoming.framing_bytes++;
p->frame_size = ((uint32_t)*cur) << 24;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_2;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_2:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 16;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_3;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_3:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur) << 8;
if (++cur == end) {
p->state = GRPC_CHTTP2_DATA_FH_4;
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
/* fallthrough */
case GRPC_CHTTP2_DATA_FH_4:
s->stats.incoming.framing_bytes++;
p->frame_size |= ((uint32_t)*cur);
p->state = GRPC_CHTTP2_DATA_FRAME;
++cur;
message_flags = 0;
if (p->is_frame_compressed) {
message_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
GPR_ASSERT(s->incoming_frames == NULL);
p->parsing_frame = grpc_chttp2_incoming_byte_stream_create(
exec_ctx, t, s, p->frame_size, message_flags);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (p->parsing_frame->remaining_bytes == 0) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame,
GRPC_ERROR_NONE);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
if (cur != end) {
goto fh_0;
}
}
if (cur == end) {
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
uint32_t remaining = (uint32_t)(end - cur);
s->stats.incoming.data_bytes += remaining;
grpc_chttp2_unprocessed_frames_buffer_push(
exec_ctx, p, s,
grpc_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
gpr_mu_unlock(&s->buffer_mu);
return GRPC_ERROR_NONE;
}
GPR_UNREACHABLE_CODE(
return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Should never reach here"));
}
grpc_error *grpc_chttp2_data_parser_parse(grpc_exec_ctx *exec_ctx, void *parser,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s,
grpc_slice slice, int is_last) {
grpc_chttp2_data_parser *p = parser;
grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice);
/* grpc_error *error = parse_inner_buffer(exec_ctx, p, t, s, slice); */
s->stats.incoming.framing_bytes += GRPC_SLICE_LENGTH(slice);
if (!s->pending_byte_stream) {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s);
} else if (s->on_next) {
GPR_ASSERT(s->frame_storage.length == 0);
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->unprocessed_incoming_frames_buffer, slice);
grpc_closure_sched(exec_ctx, s->on_next, GRPC_ERROR_NONE);
s->on_next = NULL;
} else {
grpc_slice_ref_internal(slice);
grpc_slice_buffer_add(&s->frame_storage, slice);
}
if (is_last && p->is_last_frame) {
if (is_last && s->received_last_frame) {
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false,
GRPC_ERROR_NONE);
}
return error;
return GRPC_ERROR_NONE;
}

@ -63,7 +63,6 @@ typedef struct grpc_chttp2_incoming_frame_queue {
typedef struct {
grpc_chttp2_stream_state state;
uint8_t is_last_frame;
uint8_t frame_type;
uint32_t frame_size;
grpc_error *error;
@ -87,7 +86,8 @@ void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
/* start processing a new data frame */
grpc_error *grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser *parser,
uint8_t flags,
uint32_t stream_id);
uint32_t stream_id,
grpc_chttp2_stream *s);
/* handle a slice of a data frame - is_last indicates the last slice of a
frame */

@ -187,7 +187,6 @@ struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
gpr_refcount refs;
struct grpc_chttp2_incoming_byte_stream *next_message; /* unused; should be removed */
grpc_error *error; /* protected by slice_mu */
bool push_closed; /* protected by slice_mu */
grpc_chttp2_transport *transport; /* immutable */
@ -196,7 +195,6 @@ struct grpc_chttp2_incoming_byte_stream {
gpr_mu slice_mu;
grpc_slice_buffer slices; /* unused; should be removed */
grpc_closure *on_next; /* protected by slice_mu */
uint32_t remaining_bytes; /* guaranteed one thread access */
struct {
@ -462,9 +460,6 @@ struct grpc_chttp2_stream {
grpc_transport_stream_stats *collecting_stats;
grpc_transport_stream_stats stats;
/** number of streams that are currently being read */
gpr_refcount active_streams;
/** Is this stream closed for writing. */
bool write_closed;
/** Is this stream reading half-closed. */
@ -488,10 +483,13 @@ struct grpc_chttp2_stream {
grpc_chttp2_incoming_metadata_buffer metadata_buffer[2];
grpc_chttp2_incoming_byte_stream *incoming_frames; /* protected by buffer_mu */
gpr_mu buffer_mu; /* protects unprocessed_incoming_frames_buffer and
data_parser */
grpc_slice_buffer unprocessed_incoming_frames_buffer; /* protected by buffer_mu */
grpc_slice_buffer frame_storage; /* protected by t combiner */
grpc_slice_buffer unprocessed_incoming_frames_buffer; /* guaranteed one thread access */
grpc_closure *on_next; /* protected by t combiner */
bool pending_byte_stream; /* protected by t combiner */
grpc_closure reset_byte_stream;
grpc_error *byte_stream_error; /* protected by t combiner */
bool received_last_frame; /* proected by t combiner */
gpr_timespec deadline;
@ -504,7 +502,7 @@ struct grpc_chttp2_stream {
* incoming_window = incoming_window_delta + transport.initial_window_size */
int64_t incoming_window_delta;
/** parsing state for data frames */
grpc_chttp2_data_parser data_parser; /* protected by buffer_mu */
grpc_chttp2_data_parser data_parser; /* guaranteed one thread access */
/** number of bytes received - reset at end of parse thread execution */
int64_t received_bytes;
@ -782,11 +780,11 @@ void grpc_chttp2_ref_transport(grpc_chttp2_transport *t);
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s,
uint32_t frame_size, uint32_t flags);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice,
grpc_slice *slice_out);
void grpc_chttp2_incoming_byte_stream_finished(
grpc_error *grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
grpc_chttp2_incoming_byte_stream *bs,
grpc_slice slice,
grpc_slice *slice_out);
grpc_error *grpc_chttp2_incoming_byte_stream_finished(
grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs,
grpc_error *error);
void grpc_chttp2_incoming_byte_stream_notify(

@ -458,10 +458,8 @@ static grpc_error *init_data_frame_parser(grpc_exec_ctx *exec_ctx,
return init_skip_frame_parser(exec_ctx, t, 0);
}
if (err == GRPC_ERROR_NONE) {
gpr_mu_lock(&s->buffer_mu);
err = grpc_chttp2_data_parser_begin_frame(&s->data_parser,
t->incoming_frame_flags, s->id);
gpr_mu_unlock(&s->buffer_mu);
t->incoming_frame_flags, s->id, s);
}
error_handler:
if (err == GRPC_ERROR_NONE) {

@ -1143,6 +1143,7 @@ static void finish_batch_step(grpc_exec_ctx *exec_ctx, batch_control *bctl) {
static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_error *error;
grpc_call *call = bctl->call;
for (;;) {
size_t remaining = call->receiving_stream->length -
@ -1156,10 +1157,20 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
if (grpc_byte_stream_next(exec_ctx, call->receiving_stream, remaining,
&call->receiving_slice_ready)) {
grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
error = grpc_byte_stream_pull(exec_ctx, call->receiving_stream,
&call->receiving_slice);
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
call->receiving_slice);
} else {
grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
call->receiving_stream = NULL;
grpc_byte_buffer_destroy(*call->receiving_buffer);
*call->receiving_buffer = NULL;
call->receiving_message = 0;
finish_batch_step(exec_ctx, bctl);
return;
}
} else {
return;
}

Loading…
Cancel
Save