reviewable/pr8008/r2
Craig Tiller 9 years ago
parent 4e5b452147
commit 0b8e0d5071
  1. 29
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 36
      src/core/ext/transport/chttp2/transport/internal.h
  3. 503
      src/core/ext/transport/chttp2/transport/writing.c

@ -865,13 +865,34 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
}
if (op->send_message != NULL) {
GPR_ASSERT(s->send_message_finished == NULL);
GPR_ASSERT(s->send_message == NULL);
s->send_message_finished = add_closure_barrier(on_complete);
if (s->write_closed) {
grpc_closure *temp_barrier = add_closure_barrier(op->send_message);
grpc_chttp2_complete_closure_step(
exec_ctx, t, s, &s->send_message_finished,
exec_ctx, t, s, &temp_barrier,
GRPC_ERROR_CREATE("Attempt to send message after stream was closed"));
} else {
uint8_t *frame_hdr =
gpr_slice_buffer_tiny_add(&s->flow_controlled_buffer, 5);
uint32_t flags = op->send_message->flags;
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op->send_message->length;
frame_hdr[1] = (uint8_t)(len >> 24);
frame_hdr[2] = (uint8_t)(len >> 16);
frame_hdr[3] = (uint8_t)(len >> 8);
frame_hdr[4] = (uint8_t)(len);
grpc_chttp2_write_cb *write_cb = t->write_cb_pool;
if (write_cb != NULL) {
t->write_cb_pool = write_cb->next;
} else {
write_cb = gpr_malloc(sizeof(*write_cb));
}
write_cb->next = &s->on_write_finished_cbs;
write_cb->call_at_byte =
add_send_completion(t, s, (ssize_t)() - backup, true);
}
s->send_message_finished = add_closure_barrier(on_complete);
if (s->write_closed) {
} else {
s->send_message = op->send_message;
if (s->id != 0) {

@ -150,6 +150,17 @@ typedef struct grpc_chttp2_outstanding_ping {
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
typedef struct grpc_chttp2_write_cb {
size_t call_at_byte;
grpc_closure *closure;
struct grpc_chttp2_write_cb *next;
} grpc_chttp2_write_cb;
typedef struct grpc_chttp2_write_cb_list {
grpc_chttp2_write_cb *head;
grpc_chttp2_write_cb *tail;
} grpc_chttp2_write_cb_list;
/* forward declared in frame_data.h */
struct grpc_chttp2_incoming_byte_stream {
grpc_byte_stream base;
@ -318,23 +329,9 @@ struct grpc_chttp2_transport {
uint32_t goaway_last_stream_index;
gpr_slice goaway_text;
/* closures to finish after writing */
grpc_closure **finish_after_writing;
size_t finish_after_writing_count;
size_t finish_after_writing_capacity;
grpc_chttp2_write_cb *write_cb_pool;
};
typedef enum {
GRPC_CHTTP2_CALL_WHEN_SCHEDULED,
GRPC_CHTTP2_CALL_WHEN_WRITTEN,
} grpc_chttp2_call_write_cb_when;
typedef struct grpc_chttp2_write_cb {
size_t call_at_byte;
grpc_closure *closure;
grpc_chttp2_call_write_cb_when when;
} grpc_chttp2_write_cb;
struct grpc_chttp2_stream {
grpc_chttp2_transport *t;
grpc_stream_refcount *refcount;
@ -422,8 +419,7 @@ struct grpc_chttp2_stream {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint8_t fetching;
bool sent_initial_metadata;
uint8_t sent_message;
uint8_t sent_trailing_metadata;
bool sent_trailing_metadata;
/** how much window should we announce? */
uint32_t announce_window;
gpr_slice_buffer flow_controlled_buffer;
@ -431,9 +427,9 @@ struct grpc_chttp2_stream {
size_t stream_fetched;
grpc_closure finished_fetch;
grpc_chttp2_write_cb *write_cbs;
size_t write_cb_count;
size_t write_cb_capacity;
grpc_chttp2_write_cb_list on_write_scheduled_cbs;
grpc_chttp2_write_cb_list on_write_finished_cbs;
grpc_chttp2_write_cb_list finish_after_write;
};
/** Transport writing call flow:

@ -40,18 +40,44 @@
#include "src/core/ext/transport/chttp2/transport/http2_errors.h"
#include "src/core/lib/profiling/timers.h"
static void queue_write_callback(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_closure **c,
grpc_error *error,
grpc_chttp2_call_write_cb_when when) {
switch (when) {
case GRPC_CHTTP2_CALL_WHEN_SCHEDULED:
grpc_chttp2_complete_closure_step(exec_ctx, t, s, c, error);
break;
case GRPC_CHTTP2_CALL_WHEN_WRITTEN:
break;
static void add_to_write_list(grpc_chttp2_write_cb_list *list,
grpc_chttp2_write_cb *cb) {
if (list->head == NULL) {
list->head = list->tail = cb;
} else {
list->tail->next = cb;
list->tail = cb;
}
cb->next = NULL;
}
static void finish_write_cb(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, grpc_chttp2_write_cb *cb,
grpc_error *error) {
grpc_chttp2_complete_closure_step(exec_ctx, t, s, &cb->closure, error);
cb->next = t->write_cb_pool;
t->write_cb_pool = cb;
}
static void update_list(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
grpc_chttp2_stream *s, uint32_t send_bytes,
grpc_chttp2_write_cb_list *list,
grpc_chttp2_write_cb_list *done_target_or_null,
grpc_error *error) {
grpc_chttp2_write_cb *cb = list->head;
list->head = list->tail = NULL;
while (cb) {
grpc_chttp2_write_cb *next = cb->next;
if (cb->call_at_byte <= send_bytes) {
if (done_target_or_null != NULL) {
add_to_write_list(done_target_or_null, cb);
} else {
finish_write_cb(exec_ctx, t, s, cb, GRPC_ERROR_REF(error));
}
} else {
cb->call_at_byte -= send_bytes;
add_to_write_list(list, cb);
}
}
}
@ -91,7 +117,6 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
(according to available window sizes) and add to the output buffer */
while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
bool sent_initial_metadata = s->sent_initial_metadata;
bool become_writable = false;
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", t, s, outgoing_window, s,
outgoing_window);
@ -101,10 +126,10 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
s->send_initial_metadata, 0, &s->stats.outgoing,
&t->outbuf);
s->send_initial_metadata = NULL;
become_writable = true;
s->sent_initial_metadata = true;
sent_initial_metadata = true;
grpc_chttp2_list_add_writing_stream(t, s);
}
/* send any window updates */
if (s->announce_window > 0 && s->send_initial_metadata == NULL) {
@ -122,24 +147,47 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(s->outgoing_window, t->outgoing_window));
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
bool is_last_data_frame =
s->fetching_send_message == NULL &&
send_bytes == s->flow_controlled_buffer.length;
bool is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing, &t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = 1;
if (max_outgoing > 0) {
uint32_t send_bytes =
(uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
bool is_last_data_frame =
s->fetching_send_message == NULL &&
send_bytes == s->flow_controlled_buffer.length;
bool is_last_frame =
is_last_data_frame && s->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(s->send_trailing_metadata);
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
is_last_frame, &s->stats.outgoing,
&t->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
send_bytes);
if (is_last_frame) {
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = 1;
}
update_list(exec_ctx, t, s, send_bytes, &s->on_write_finished_cbs,
&s->finish_after_write, GRPC_ERROR_NONE);
update_list(exec_ctx, t, s, send_bytes, &s->on_write_scheduled_cbs,
NULL, GRPC_ERROR_NONE);
grpc_chttp2_list_add_writing_stream(t, s);
} else if (transport->outgoing_window == 0) {
grpc_chttp2_list_add_writing_stalled_by_transport(t, s);
grpc_chttp2_list_add_writing_stream(t, s);
}
}
if (s->send_trailing_metadata && s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0) {
grpc_chttp2_encode_header(&t->hpack_compressor, s->id,
s->send_trailing_metadata, 0,
&s->stats.outgoing, &t->outbuf);
s->send_trailing_metadata = NULL;
s->sent_trailing_metadata = true;
become_writable = true;
sent_initial_metadata = true;
grpc_chttp2_list_add_writing_stream(t, s);
}
#if 0
if (s->send_message != NULL) {
gpr_slice hdr = gpr_slice_malloc(5);
@ -169,231 +217,226 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
}
}
#endif
if (stream_global->send_trailing_metadata) {
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
become_writable = true;
}
}
if (!stream_global->read_closed &&
stream_global->unannounced_incoming_window_for_writing > 1024) {
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
if (stream_global->send_trailing_metadata) {
stream_writing->send_trailing_metadata =
stream_global->send_trailing_metadata;
stream_global->send_trailing_metadata = NULL;
become_writable = true;
}
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->announce_incoming_window > 0) {
uint32_t announced = (uint32_t)GPR_MIN(
transport_global->announce_incoming_window, UINT32_MAX);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
announce_incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
if (!stream_global->read_closed &&
stream_global->unannounced_incoming_window_for_writing > 1024) {
GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
announce_window, stream_global,
unannounced_incoming_window_for_writing);
become_writable = true;
}
GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
if (become_writable) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
}
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
/* if the grpc_chttp2_transport is ready to send a window update, do so here
also; 3/4 is a magic number that will likely get tuned soon */
if (transport_global->announce_incoming_window > 0) {
uint32_t announced = (uint32_t)GPR_MIN(
transport_global->announce_incoming_window, UINT32_MAX);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
announce_incoming_window, announced);
grpc_transport_one_way_stats throwaway_stats;
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_window_update_create(0, announced, &throwaway_stats));
}
void grpc_chttp2_perform_writes(
grpc_exec_ctx * exec_ctx,
grpc_chttp2_transport_writing * transport_writing,
grpc_endpoint * endpoint) {
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
finalize_outbuf(exec_ctx, transport_writing);
return transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing);
}
GPR_ASSERT(endpoint);
void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint) {
GPR_ASSERT(transport_writing->outbuf.count > 0 ||
grpc_chttp2_list_have_writing_streams(transport_writing));
if (transport_writing->outbuf.count > 0) {
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb,
GRPC_ERROR_NONE, NULL);
}
finalize_outbuf(exec_ctx, transport_writing);
GPR_ASSERT(endpoint);
if (transport_writing->outbuf.count > 0) {
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE,
NULL);
}
}
static void finalize_outbuf(
grpc_exec_ctx * exec_ctx,
grpc_chttp2_transport_writing * transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
GPR_TIMER_BEGIN("finalize_outbuf", 0);
bool is_first_data_frame = true;
while (grpc_chttp2_list_pop_writing_stream(transport_writing,
&stream_writing)) {
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(stream_writing->outgoing_window,
transport_writing->outgoing_window));
/* fetch any body bytes */
while (!stream_writing->fetching && stream_writing->send_message &&
stream_writing->flow_controlled_buffer.length < max_outgoing &&
stream_writing->stream_fetched <
stream_writing->send_message->length) {
if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
&stream_writing->fetching_slice, max_outgoing,
&stream_writing->finished_fetch)) {
stream_writing->stream_fetched +=
GPR_SLICE_LENGTH(stream_writing->fetching_slice);
if (stream_writing->stream_fetched ==
stream_writing->send_message->length) {
stream_writing->send_message = NULL;
}
gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
stream_writing->fetching_slice);
} else {
stream_writing->fetching = 1;
static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_writing *stream_writing;
GPR_TIMER_BEGIN("finalize_outbuf", 0);
bool is_first_data_frame = true;
while (
grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
uint32_t max_outgoing =
(uint32_t)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
GPR_MIN(stream_writing->outgoing_window,
transport_writing->outgoing_window));
/* fetch any body bytes */
while (!stream_writing->fetching && stream_writing->send_message &&
stream_writing->flow_controlled_buffer.length < max_outgoing &&
stream_writing->stream_fetched <
stream_writing->send_message->length) {
if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
&stream_writing->fetching_slice, max_outgoing,
&stream_writing->finished_fetch)) {
stream_writing->stream_fetched +=
GPR_SLICE_LENGTH(stream_writing->fetching_slice);
if (stream_writing->stream_fetched ==
stream_writing->send_message->length) {
stream_writing->send_message = NULL;
}
gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
stream_writing->fetching_slice);
} else {
stream_writing->fetching = 1;
}
/* send any body bytes */
if (stream_writing->flow_controlled_buffer.length > 0) {
if (max_outgoing > 0) {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, stream_writing->flow_controlled_buffer.length);
int is_last_data_frame =
stream_writing->send_message == NULL &&
send_bytes == stream_writing->flow_controlled_buffer.length;
int is_last_frame = is_last_data_frame &&
stream_writing->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata);
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer,
send_bytes, is_last_frame, &stream_writing->stats,
&transport_writing->outbuf);
if (is_first_data_frame) {
/* TODO(dgq): this is a hack. It'll be fix in a future refactoring
*/
stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
is_first_data_frame = false;
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
stream_writing, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
outgoing_window, send_bytes);
if (is_last_frame) {
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
if (is_last_data_frame) {
GPR_ASSERT(stream_writing->send_message == NULL);
stream_writing->sent_message = 1;
}
} else if (transport_writing->outgoing_window == 0) {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing,
stream_writing);
}
/* send any body bytes */
if (stream_writing->flow_controlled_buffer.length > 0) {
if (max_outgoing > 0) {
uint32_t send_bytes = (uint32_t)GPR_MIN(
max_outgoing, stream_writing->flow_controlled_buffer.length);
int is_last_data_frame =
stream_writing->send_message == NULL &&
send_bytes == stream_writing->flow_controlled_buffer.length;
int is_last_frame = is_last_data_frame &&
stream_writing->send_trailing_metadata != NULL &&
grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata);
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer,
send_bytes, is_last_frame, &stream_writing->stats,
&transport_writing->outbuf);
if (is_first_data_frame) {
/* TODO(dgq): this is a hack. It'll be fix in a future refactoring
*/
stream_writing->stats.data_bytes -= 5; /* discount grpc framing */
is_first_data_frame = false;
}
}
/* send trailing metadata if it's available and we're ready for it */
if (stream_writing->send_message == NULL &&
stream_writing->flow_controlled_buffer.length == 0 &&
stream_writing->send_trailing_metadata != NULL) {
if (grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata)) {
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
&stream_writing->stats, &transport_writing->outbuf);
} else {
grpc_chttp2_encode_header(
&transport_writing->hpack_compressor, stream_writing->id,
stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
&transport_writing->outbuf);
GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
stream_writing, outgoing_window,
send_bytes);
GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
outgoing_window, send_bytes);
if (is_last_frame) {
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
if (!transport_writing->is_client && !stream_writing->read_closed) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(
stream_writing->id, GRPC_CHTTP2_NO_ERROR,
&stream_writing->stats));
if (is_last_data_frame) {
GPR_ASSERT(stream_writing->send_message == NULL);
stream_writing->sent_message = 1;
}
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
} else if (transport_writing->outgoing_window == 0) {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
/* if there's more to write, then loop, otherwise prepare to finish the
* write */
if ((stream_writing->flow_controlled_buffer.length > 0 ||
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
grpc_chttp2_list_add_writing_stream(transport_writing,
stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing,
stream_writing);
}
}
/* send trailing metadata if it's available and we're ready for it */
if (stream_writing->send_message == NULL &&
stream_writing->flow_controlled_buffer.length == 0 &&
stream_writing->send_trailing_metadata != NULL) {
if (grpc_metadata_batch_is_empty(
stream_writing->send_trailing_metadata)) {
grpc_chttp2_encode_data(
stream_writing->id, &stream_writing->flow_controlled_buffer, 0, 1,
&stream_writing->stats, &transport_writing->outbuf);
} else {
grpc_chttp2_encode_header(
&transport_writing->hpack_compressor, stream_writing->id,
stream_writing->send_trailing_metadata, 1, &stream_writing->stats,
&transport_writing->outbuf);
}
if (!transport_writing->is_client && !stream_writing->read_closed) {
gpr_slice_buffer_add(&transport_writing->outbuf,
grpc_chttp2_rst_stream_create(
stream_writing->id, GRPC_CHTTP2_NO_ERROR,
&stream_writing->stats));
}
stream_writing->send_trailing_metadata = NULL;
stream_writing->sent_trailing_metadata = 1;
}
/* if there's more to write, then loop, otherwise prepare to finish the
* write */
if ((stream_writing->flow_controlled_buffer.length > 0 ||
(stream_writing->send_message && !stream_writing->fetching)) &&
stream_writing->outgoing_window > 0) {
if (transport_writing->outgoing_window > 0) {
grpc_chttp2_list_add_writing_stream(transport_writing, stream_writing);
} else {
grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
stream_writing);
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
} else {
grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
}
}
GPR_TIMER_END("finalize_outbuf", 0);
}
GPR_TIMER_END("finalize_outbuf", 0);
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing) {
GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx,
transport_writing)) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"resume_stalled_stream");
}
void grpc_chttp2_cleanup_writing(
grpc_exec_ctx * exec_ctx, grpc_chttp2_transport_global * transport_global,
grpc_chttp2_transport_writing * transport_writing) {
GPR_TIMER_BEGIN("grpc_chttp2_cleanup_writing", 0);
grpc_chttp2_stream_writing *stream_writing;
grpc_chttp2_stream_global *stream_global;
if (grpc_chttp2_list_flush_writing_stalled_by_transport(
exec_ctx, transport_writing)) {
grpc_chttp2_initiate_write(exec_ctx, transport_global, false,
"resume_stalled_stream");
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
}
while (grpc_chttp2_list_pop_written_stream(
transport_global, transport_writing, &stream_global, &stream_writing)) {
if (stream_writing->sent_initial_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_initial_metadata_finished, GRPC_ERROR_NONE);
}
grpc_transport_move_one_way_stats(&stream_writing->stats,
&stream_global->stats.outgoing);
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_message_finished, GRPC_ERROR_NONE);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(
exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1, GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
grpc_transport_move_one_way_stats(&stream_writing->stats,
&stream_global->stats.outgoing);
if (stream_writing->sent_message) {
GPR_ASSERT(stream_writing->send_message == NULL);
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_message_finished, GRPC_ERROR_NONE);
stream_writing->sent_message = 0;
}
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_complete_closure_step(
exec_ctx, transport_global, stream_global,
&stream_global->send_trailing_metadata_finished, GRPC_ERROR_NONE);
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
if (stream_writing->sent_trailing_metadata) {
grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
!transport_global->is_client, 1,
GRPC_ERROR_NONE);
}
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
}
gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
GPR_TIMER_END("grpc_chttp2_cleanup_writing", 0);
}

Loading…
Cancel
Save