Merge branch 'new_op' of github.com:ctiller/grpc into new_op

pull/4136/head
Craig Tiller 9 years ago
commit 4ab2d82c1a
  1. 2
      src/core/surface/call.c
  2. 13
      src/core/transport/chttp2/frame_data.c
  3. 3
      src/core/transport/chttp2/frame_data.h
  4. 2
      src/core/transport/chttp2/internal.h
  5. 85
      src/core/transport/chttp2_transport.c

@ -944,12 +944,12 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(gpr_uint8)(bctl->call->used_batches &
~(gpr_uint8)(1 << (bctl - bctl->call->active_batches)));
gpr_mu_unlock(&call->mu);
grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
} else {
grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,

@ -45,11 +45,16 @@
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser) {
parser->state = GRPC_CHTTP2_DATA_FH_0;
parser->parsing_frame = NULL;
return GRPC_CHTTP2_PARSE_OK;
}
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser) {
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser) {
grpc_byte_stream *bs;
if (parser->parsing_frame) {
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, parser->parsing_frame);
}
while (
(bs = grpc_chttp2_incoming_frame_queue_pop(&parser->incoming_frames))) {
grpc_byte_stream_destroy(bs);
@ -198,8 +203,8 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
}
p->parsing_frame = incoming_byte_stream =
grpc_chttp2_incoming_byte_stream_create(
transport_parsing, stream_parsing, p->frame_size, message_flags,
&p->incoming_frames);
exec_ctx, transport_parsing, stream_parsing, p->frame_size,
message_flags, &p->incoming_frames);
/* fallthrough */
case GRPC_CHTTP2_DATA_FRAME:
if (cur == end) {
@ -214,6 +219,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
exec_ctx, p->parsing_frame,
gpr_slice_sub(slice, (size_t)(cur - beg), (size_t)(end - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->parsing_frame = NULL;
p->state = GRPC_CHTTP2_DATA_FH_0;
return GRPC_CHTTP2_PARSE_OK;
} else if ((gpr_uint32)(end - cur) > p->frame_size) {
@ -222,6 +228,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse(
gpr_slice_sub(slice, (size_t)(cur - beg),
(size_t)(cur + p->frame_size - beg)));
grpc_chttp2_incoming_byte_stream_finished(exec_ctx, p->parsing_frame);
p->parsing_frame = NULL;
cur += p->frame_size;
goto fh_0; /* loop */
} else {

@ -80,7 +80,8 @@ grpc_byte_stream *grpc_chttp2_incoming_frame_queue_pop(
grpc_chttp2_parse_error grpc_chttp2_data_parser_init(
grpc_chttp2_data_parser *parser);
void grpc_chttp2_data_parser_destroy(grpc_chttp2_data_parser *parser);
void grpc_chttp2_data_parser_destroy(grpc_exec_ctx *exec_ctx,
grpc_chttp2_data_parser *parser);
/* start processing a new data frame */
grpc_chttp2_parse_error grpc_chttp2_data_parser_begin_frame(

@ -738,7 +738,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
#endif
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue);
void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,

@ -512,7 +512,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
grpc_chttp2_incoming_metadata_buffer_destroy(
@ -1364,6 +1364,46 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
* BYTE STREAM
*/
static void incoming_byte_stream_update_flow_control(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
size_t have_already) {
gpr_uint32 max_recv_bytes;
/* clamp max recv hint to an allowable size */
if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
} else {
max_recv_bytes = (gpr_uint32)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= have_already) {
max_recv_bytes -= (gpr_uint32)have_already;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
if (stream_global->max_recv_bytes < max_recv_bytes) {
gpr_uint32 add_max_recv_bytes =
max_recv_bytes - stream_global->max_recv_bytes;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_parse,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_writing,
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
}
static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream,
gpr_slice *slice, size_t max_size_hint,
@ -1372,41 +1412,11 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
(grpc_chttp2_incoming_byte_stream *)byte_stream;
grpc_chttp2_transport_global *transport_global = &bs->transport->global;
grpc_chttp2_stream_global *stream_global = &bs->stream->global;
gpr_uint32 max_recv_bytes;
lock(bs->transport);
if (bs->is_tail) {
/* clamp max recv hint to an allowable size */
if (max_size_hint >= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD) {
max_recv_bytes = GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD;
} else {
max_recv_bytes = (gpr_uint32)max_size_hint;
}
/* account for bytes already received but unknown to higher layers */
if (max_recv_bytes >= bs->slices.length) {
max_recv_bytes -= (gpr_uint32)bs->slices.length;
} else {
max_recv_bytes = 0;
}
/* add some small lookahead to keep pipelines flowing */
GPR_ASSERT(max_recv_bytes <= GPR_UINT32_MAX - GRPC_CHTTP2_STREAM_LOOKAHEAD);
max_recv_bytes += GRPC_CHTTP2_STREAM_LOOKAHEAD;
if (stream_global->max_recv_bytes < max_recv_bytes) {
gpr_uint32 add_max_recv_bytes =
max_recv_bytes - stream_global->max_recv_bytes;
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
max_recv_bytes, add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_parse,
add_max_recv_bytes);
GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
unannounced_incoming_window_for_writing,
add_max_recv_bytes);
grpc_chttp2_list_add_unannounced_incoming_window_available(
transport_global, stream_global);
grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
}
incoming_byte_stream_update_flow_control(transport_global, stream_global,
max_size_hint, bs->slices.length);
}
if (bs->slices.count > 0) {
*slice = gpr_slice_buffer_take_first(&bs->slices);
@ -1451,7 +1461,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
}
grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
grpc_chttp2_transport_parsing *transport_parsing,
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
@ -1474,6 +1484,13 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
add_to_queue->tail->next_message = incoming_byte_stream;
}
add_to_queue->tail = incoming_byte_stream;
if (frame_size == 0) {
lock(TRANSPORT_FROM_PARSING(transport_parsing));
incoming_byte_stream_update_flow_control(
&TRANSPORT_FROM_PARSING(transport_parsing)->global,
&STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
}
return incoming_byte_stream;
}

Loading…
Cancel
Save