|
|
|
@ -174,343 +174,451 @@ static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) { |
|
|
|
|
return initial_metadata->list.default_count == initial_metadata->list.count; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_chttp2_begin_write_result grpc_chttp2_begin_write( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { |
|
|
|
|
grpc_chttp2_stream *s; |
|
|
|
|
|
|
|
|
|
/* stats histogram counters: we increment these throughout this function,
|
|
|
|
|
and at the end publish to the central stats histograms */ |
|
|
|
|
int flow_control_writes = 0; |
|
|
|
|
int initial_metadata_writes = 0; |
|
|
|
|
int trailing_metadata_writes = 0; |
|
|
|
|
int message_writes = 0; |
|
|
|
|
namespace { |
|
|
|
|
class StreamWriteContext; |
|
|
|
|
|
|
|
|
|
class WriteContext { |
|
|
|
|
public: |
|
|
|
|
WriteContext(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) : t_(t) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_WRITES_BEGUN(exec_ctx); |
|
|
|
|
|
|
|
|
|
GPR_TIMER_BEGIN("grpc_chttp2_begin_write", 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (t->dirtied_local_settings && !t->sent_local_settings) { |
|
|
|
|
// TODO(ctiller): make this the destructor
|
|
|
|
|
void FlushStats(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE( |
|
|
|
|
exec_ctx, initial_metadata_writes_); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes_); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE( |
|
|
|
|
exec_ctx, trailing_metadata_writes_); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, flow_control_writes_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FlushSettings(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
if (t_->dirtied_local_settings && !t_->sent_local_settings) { |
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t->outbuf, |
|
|
|
|
grpc_chttp2_settings_create( |
|
|
|
|
t->settings[GRPC_SENT_SETTINGS], t->settings[GRPC_LOCAL_SETTINGS], |
|
|
|
|
t->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); |
|
|
|
|
t->force_send_settings = 0; |
|
|
|
|
t->dirtied_local_settings = 0; |
|
|
|
|
t->sent_local_settings = 1; |
|
|
|
|
&t_->outbuf, grpc_chttp2_settings_create( |
|
|
|
|
t_->settings[GRPC_SENT_SETTINGS], |
|
|
|
|
t_->settings[GRPC_LOCAL_SETTINGS], |
|
|
|
|
t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS)); |
|
|
|
|
t_->force_send_settings = false; |
|
|
|
|
t_->dirtied_local_settings = false; |
|
|
|
|
t_->sent_local_settings = true; |
|
|
|
|
GRPC_STATS_INC_HTTP2_SETTINGS_WRITES(exec_ctx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < t->ping_ack_count; i++) { |
|
|
|
|
grpc_slice_buffer_add(&t->outbuf, |
|
|
|
|
grpc_chttp2_ping_create(1, t->ping_acks[i])); |
|
|
|
|
} |
|
|
|
|
t->ping_ack_count = 0; |
|
|
|
|
|
|
|
|
|
void FlushQueuedBuffers(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
/* simple writes are queued to qbuf, and flushed here */ |
|
|
|
|
grpc_slice_buffer_move_into(&t->qbuf, &t->outbuf); |
|
|
|
|
GPR_ASSERT(t->qbuf.count == 0); |
|
|
|
|
grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf); |
|
|
|
|
GPR_ASSERT(t_->qbuf.count == 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
uint32_t transport_announce = |
|
|
|
|
grpc_chttp2_flowctl_maybe_send_transport_update(&t_->flow_control, |
|
|
|
|
t_->outbuf.count > 0); |
|
|
|
|
if (transport_announce) { |
|
|
|
|
grpc_transport_one_way_stats throwaway_stats; |
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce, |
|
|
|
|
&throwaway_stats)); |
|
|
|
|
ResetPingRecvClock(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FlushPingAcks() { |
|
|
|
|
for (size_t i = 0; i < t_->ping_ack_count; i++) { |
|
|
|
|
grpc_slice_buffer_add(&t_->outbuf, |
|
|
|
|
grpc_chttp2_ping_create(true, t_->ping_acks[i])); |
|
|
|
|
} |
|
|
|
|
t_->ping_ack_count = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void EnactHpackSettings(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
grpc_chttp2_hpack_compressor_set_max_table_size( |
|
|
|
|
&t->hpack_compressor, |
|
|
|
|
t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); |
|
|
|
|
&t_->hpack_compressor, |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (t->flow_control.remote_window > 0) { |
|
|
|
|
while (grpc_chttp2_list_pop_stalled_by_transport(t, &s)) { |
|
|
|
|
if (!t->closed && grpc_chttp2_list_add_writable_stream(t, s)) { |
|
|
|
|
stream_ref_if_not_destroyed(&s->refcount->refs); |
|
|
|
|
void UpdateStreamsNoLongerStalled() { |
|
|
|
|
grpc_chttp2_stream *s; |
|
|
|
|
while (grpc_chttp2_list_pop_stalled_by_transport(t_, &s)) { |
|
|
|
|
if (!t_->closed && grpc_chttp2_list_add_writable_stream(t_, s)) { |
|
|
|
|
if (!stream_ref_if_not_destroyed(&s->refcount->refs)) { |
|
|
|
|
grpc_chttp2_list_remove_writable_stream(t_, s); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_chttp2_begin_write_result result = {false, false, false}; |
|
|
|
|
grpc_chttp2_stream *NextStream() { |
|
|
|
|
if (t_->outbuf.length > target_write_size(t_)) { |
|
|
|
|
result_.partial = true; |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* for each grpc_chttp2_stream that's become writable, frame it's data
|
|
|
|
|
(according to available window sizes) and add to the output buffer */ |
|
|
|
|
while (true) { |
|
|
|
|
if (t->outbuf.length > target_write_size(t)) { |
|
|
|
|
result.partial = true; |
|
|
|
|
break; |
|
|
|
|
grpc_chttp2_stream *s; |
|
|
|
|
if (!grpc_chttp2_list_pop_writable_stream(t_, &s)) { |
|
|
|
|
return nullptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void ResetPingRecvClock() { |
|
|
|
|
if (!t_->is_client) { |
|
|
|
|
t_->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; |
|
|
|
|
t_->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!grpc_chttp2_list_pop_writable_stream(t, &s)) { |
|
|
|
|
break; |
|
|
|
|
void IncInitialMetadataWrites() { ++initial_metadata_writes_; } |
|
|
|
|
void IncWindowUpdateWrites() { ++flow_control_writes_; } |
|
|
|
|
void IncMessageWrites() { ++message_writes_; } |
|
|
|
|
void IncTrailingMetadataWrites() { ++trailing_metadata_writes_; } |
|
|
|
|
|
|
|
|
|
void NoteScheduledResults() { result_.early_results_scheduled = true; } |
|
|
|
|
|
|
|
|
|
grpc_chttp2_transport *transport() const { return t_; } |
|
|
|
|
|
|
|
|
|
grpc_chttp2_begin_write_result Result() { |
|
|
|
|
result_.writing = t_->outbuf.count > 0; |
|
|
|
|
return result_; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
grpc_chttp2_transport *const t_; |
|
|
|
|
|
|
|
|
|
/* stats histogram counters: we increment these throughout this function,
|
|
|
|
|
and at the end publish to the central stats histograms */ |
|
|
|
|
int flow_control_writes_ = 0; |
|
|
|
|
int initial_metadata_writes_ = 0; |
|
|
|
|
int trailing_metadata_writes_ = 0; |
|
|
|
|
int message_writes_ = 0; |
|
|
|
|
grpc_chttp2_begin_write_result result_ = {false, false, false}; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class DataSendContext { |
|
|
|
|
public: |
|
|
|
|
DataSendContext(WriteContext *write_context, grpc_chttp2_transport *t, |
|
|
|
|
grpc_chttp2_stream *s) |
|
|
|
|
: write_context_(write_context), |
|
|
|
|
t_(t), |
|
|
|
|
s_(s), |
|
|
|
|
sending_bytes_before_(s_->sending_bytes) {} |
|
|
|
|
|
|
|
|
|
uint32_t stream_remote_window() const { |
|
|
|
|
return (uint32_t)GPR_MAX( |
|
|
|
|
0, s_->flow_control.remote_window_delta + |
|
|
|
|
(int64_t)t_->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uint32_t max_outgoing() const { |
|
|
|
|
return (uint32_t)GPR_MIN( |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], |
|
|
|
|
GPR_MIN(stream_remote_window(), t_->flow_control.remote_window)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool AnyOutgoing() const { return max_outgoing() != 0; } |
|
|
|
|
|
|
|
|
|
void FlushCompressedBytes() { |
|
|
|
|
uint32_t send_bytes = |
|
|
|
|
(uint32_t)GPR_MIN(max_outgoing(), s_->compressed_data_buffer.length); |
|
|
|
|
bool is_last_data_frame = |
|
|
|
|
(send_bytes == s_->compressed_data_buffer.length && |
|
|
|
|
s_->flow_controlled_buffer.length == 0 && |
|
|
|
|
s_->fetching_send_message == NULL); |
|
|
|
|
if (is_last_data_frame && s_->send_trailing_metadata != NULL && |
|
|
|
|
s_->stream_compression_ctx != NULL) { |
|
|
|
|
if (!grpc_stream_compress(s_->stream_compression_ctx, |
|
|
|
|
&s_->flow_controlled_buffer, |
|
|
|
|
&s_->compressed_data_buffer, NULL, MAX_SIZE_T, |
|
|
|
|
GRPC_STREAM_COMPRESSION_FLUSH_FINISH)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream compression failed."); |
|
|
|
|
} |
|
|
|
|
grpc_stream_compression_context_destroy(s_->stream_compression_ctx); |
|
|
|
|
s_->stream_compression_ctx = NULL; |
|
|
|
|
/* After finish, bytes in s->compressed_data_buffer may be
|
|
|
|
|
* more than max_outgoing. Start another round of the current |
|
|
|
|
* while loop so that send_bytes and is_last_data_frame are |
|
|
|
|
* recalculated. */ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
is_last_frame_ = is_last_data_frame && s_->send_trailing_metadata != NULL && |
|
|
|
|
grpc_metadata_batch_is_empty(s_->send_trailing_metadata); |
|
|
|
|
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes, |
|
|
|
|
is_last_frame_, &s_->stats.outgoing, &t_->outbuf); |
|
|
|
|
grpc_chttp2_flowctl_sent_data(&t_->flow_control, &s_->flow_control, |
|
|
|
|
send_bytes); |
|
|
|
|
if (s_->compressed_data_buffer.length == 0) { |
|
|
|
|
s_->sending_bytes += s_->uncompressed_data_size; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool sent_initial_metadata = s->sent_initial_metadata; |
|
|
|
|
bool now_writing = false; |
|
|
|
|
void CompressMoreBytes() { |
|
|
|
|
if (s_->stream_compression_ctx == NULL) { |
|
|
|
|
s_->stream_compression_ctx = |
|
|
|
|
grpc_stream_compression_context_create(s_->stream_compression_method); |
|
|
|
|
} |
|
|
|
|
s_->uncompressed_data_size = s_->flow_controlled_buffer.length; |
|
|
|
|
if (!grpc_stream_compress(s_->stream_compression_ctx, |
|
|
|
|
&s_->flow_controlled_buffer, |
|
|
|
|
&s_->compressed_data_buffer, NULL, MAX_SIZE_T, |
|
|
|
|
GRPC_STREAM_COMPRESSION_FLUSH_SYNC)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream compression failed."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool is_last_frame() const { return is_last_frame_; } |
|
|
|
|
|
|
|
|
|
void CallCallbacks(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
if (update_list(exec_ctx, t_, s_, |
|
|
|
|
(int64_t)(s_->sending_bytes - sending_bytes_before_), |
|
|
|
|
&s_->on_flow_controlled_cbs, |
|
|
|
|
&s_->flow_controlled_bytes_flowed, GRPC_ERROR_NONE)) { |
|
|
|
|
write_context_->NoteScheduledResults(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
WriteContext *write_context_; |
|
|
|
|
grpc_chttp2_transport *t_; |
|
|
|
|
grpc_chttp2_stream *s_; |
|
|
|
|
const size_t sending_bytes_before_; |
|
|
|
|
bool is_last_frame_ = false; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class StreamWriteContext { |
|
|
|
|
public: |
|
|
|
|
StreamWriteContext(WriteContext *write_context, grpc_chttp2_stream *s) |
|
|
|
|
: write_context_(write_context), t_(write_context->transport()), s_(s) { |
|
|
|
|
GRPC_CHTTP2_IF_TRACING( |
|
|
|
|
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t, |
|
|
|
|
t->is_client ? "CLIENT" : "SERVER", s->id, |
|
|
|
|
sent_initial_metadata, s->send_initial_metadata != NULL, |
|
|
|
|
gpr_log(GPR_DEBUG, "W:%p %s[%d] im-(sent,send)=(%d,%d) announce=%d", t_, |
|
|
|
|
t_->is_client ? "CLIENT" : "SERVER", s->id, |
|
|
|
|
s->sent_initial_metadata, s->send_initial_metadata != NULL, |
|
|
|
|
(int)(s->flow_control.local_window_delta - |
|
|
|
|
s->flow_control.announced_window_delta))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_mdelem *extra_headers_for_trailing_metadata[2]; |
|
|
|
|
size_t num_extra_headers_for_trailing_metadata = 0; |
|
|
|
|
|
|
|
|
|
void FlushInitialMetadata(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
/* send initial metadata if it's available */ |
|
|
|
|
if (!sent_initial_metadata && s->send_initial_metadata != NULL) { |
|
|
|
|
if (s_->sent_initial_metadata) return; |
|
|
|
|
if (s_->send_initial_metadata == nullptr) return; |
|
|
|
|
|
|
|
|
|
// We skip this on the server side if there is no custom initial
|
|
|
|
|
// metadata, there are no messages to send, and we are also sending
|
|
|
|
|
// trailing metadata. This results in a Trailers-Only response,
|
|
|
|
|
// which is required for retries, as per:
|
|
|
|
|
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
|
|
|
|
|
if (t->is_client || s->fetching_send_message != NULL || |
|
|
|
|
s->flow_controlled_buffer.length != 0 || |
|
|
|
|
s->send_trailing_metadata == NULL || |
|
|
|
|
!is_default_initial_metadata(s->send_initial_metadata)) { |
|
|
|
|
if (!t_->is_client && s_->fetching_send_message == nullptr && |
|
|
|
|
s_->flow_controlled_buffer.length == 0 && |
|
|
|
|
s_->compressed_data_buffer.length == 0 && |
|
|
|
|
s_->send_trailing_metadata != nullptr && |
|
|
|
|
is_default_initial_metadata(s_->send_initial_metadata)) { |
|
|
|
|
ConvertInitialMetadataToTrailingMetadata(); |
|
|
|
|
} else { |
|
|
|
|
grpc_encode_header_options hopt = { |
|
|
|
|
s->id, // stream_id
|
|
|
|
|
s_->id, // stream_id
|
|
|
|
|
false, // is_eof
|
|
|
|
|
t->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != |
|
|
|
|
0, // use_true_binary_metadata
|
|
|
|
|
t->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
|
|
|
|
|
&s->stats.outgoing // stats
|
|
|
|
|
&s_->stats.outgoing // stats
|
|
|
|
|
}; |
|
|
|
|
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0, |
|
|
|
|
s->send_initial_metadata, &hopt, &t->outbuf); |
|
|
|
|
now_writing = true; |
|
|
|
|
if (!t->is_client) { |
|
|
|
|
t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; |
|
|
|
|
t->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
initial_metadata_writes++; |
|
|
|
|
} else { |
|
|
|
|
GRPC_CHTTP2_IF_TRACING( |
|
|
|
|
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)")); |
|
|
|
|
// When sending Trailers-Only, we need to move the :status and
|
|
|
|
|
// content-type headers to the trailers.
|
|
|
|
|
if (s->send_initial_metadata->idx.named.status != NULL) { |
|
|
|
|
extra_headers_for_trailing_metadata |
|
|
|
|
[num_extra_headers_for_trailing_metadata++] = |
|
|
|
|
&s->send_initial_metadata->idx.named.status->md; |
|
|
|
|
} |
|
|
|
|
if (s->send_initial_metadata->idx.named.content_type != NULL) { |
|
|
|
|
extra_headers_for_trailing_metadata |
|
|
|
|
[num_extra_headers_for_trailing_metadata++] = |
|
|
|
|
&s->send_initial_metadata->idx.named.content_type->md; |
|
|
|
|
} |
|
|
|
|
trailing_metadata_writes++; |
|
|
|
|
} |
|
|
|
|
s->send_initial_metadata = NULL; |
|
|
|
|
s->sent_initial_metadata = true; |
|
|
|
|
sent_initial_metadata = true; |
|
|
|
|
result.early_results_scheduled = true; |
|
|
|
|
grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, NULL, 0, |
|
|
|
|
s_->send_initial_metadata, &hopt, &t_->outbuf); |
|
|
|
|
write_context_->ResetPingRecvClock(); |
|
|
|
|
write_context_->IncInitialMetadataWrites(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
s_->send_initial_metadata = NULL; |
|
|
|
|
s_->sent_initial_metadata = true; |
|
|
|
|
write_context_->NoteScheduledResults(); |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, t, s, &s->send_initial_metadata_finished, GRPC_ERROR_NONE, |
|
|
|
|
exec_ctx, t_, s_, &s_->send_initial_metadata_finished, GRPC_ERROR_NONE, |
|
|
|
|
"send_initial_metadata_finished"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void FlushWindowUpdates(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
/* send any window updates */ |
|
|
|
|
uint32_t stream_announce = grpc_chttp2_flowctl_maybe_send_stream_update( |
|
|
|
|
&t->flow_control, &s->flow_control); |
|
|
|
|
if (stream_announce > 0) { |
|
|
|
|
&t_->flow_control, &s_->flow_control); |
|
|
|
|
if (stream_announce == 0) return; |
|
|
|
|
|
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t->outbuf, grpc_chttp2_window_update_create(s->id, stream_announce, |
|
|
|
|
&s->stats.outgoing)); |
|
|
|
|
if (!t->is_client) { |
|
|
|
|
t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; |
|
|
|
|
t->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
flow_control_writes++; |
|
|
|
|
} |
|
|
|
|
if (sent_initial_metadata) { |
|
|
|
|
/* send any body bytes, if allowed by flow control */ |
|
|
|
|
if (s->flow_controlled_buffer.length > 0 || |
|
|
|
|
s->compressed_data_buffer.length > 0) { |
|
|
|
|
uint32_t stream_remote_window = (uint32_t)GPR_MAX( |
|
|
|
|
0, |
|
|
|
|
s->flow_control.remote_window_delta + |
|
|
|
|
(int64_t)t->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); |
|
|
|
|
uint32_t max_outgoing = (uint32_t)GPR_MIN( |
|
|
|
|
t->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], |
|
|
|
|
GPR_MIN(stream_remote_window, t->flow_control.remote_window)); |
|
|
|
|
if (max_outgoing > 0) { |
|
|
|
|
bool is_last_data_frame = false; |
|
|
|
|
bool is_last_frame = false; |
|
|
|
|
size_t sending_bytes_before = s->sending_bytes; |
|
|
|
|
while ((s->flow_controlled_buffer.length > 0 || |
|
|
|
|
s->compressed_data_buffer.length > 0) && |
|
|
|
|
max_outgoing > 0) { |
|
|
|
|
if (s->compressed_data_buffer.length > 0) { |
|
|
|
|
uint32_t send_bytes = (uint32_t)GPR_MIN( |
|
|
|
|
max_outgoing, s->compressed_data_buffer.length); |
|
|
|
|
is_last_data_frame = |
|
|
|
|
(send_bytes == s->compressed_data_buffer.length && |
|
|
|
|
s->flow_controlled_buffer.length == 0 && |
|
|
|
|
s->fetching_send_message == NULL); |
|
|
|
|
if (is_last_data_frame && s->send_trailing_metadata != NULL && |
|
|
|
|
s->stream_compression_ctx != NULL) { |
|
|
|
|
if (!grpc_stream_compress( |
|
|
|
|
s->stream_compression_ctx, &s->flow_controlled_buffer, |
|
|
|
|
&s->compressed_data_buffer, NULL, MAX_SIZE_T, |
|
|
|
|
GRPC_STREAM_COMPRESSION_FLUSH_FINISH)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream compression failed."); |
|
|
|
|
&t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce, |
|
|
|
|
&s_->stats.outgoing)); |
|
|
|
|
write_context_->ResetPingRecvClock(); |
|
|
|
|
write_context_->IncWindowUpdateWrites(); |
|
|
|
|
} |
|
|
|
|
grpc_stream_compression_context_destroy( |
|
|
|
|
s->stream_compression_ctx); |
|
|
|
|
s->stream_compression_ctx = NULL; |
|
|
|
|
/* After finish, bytes in s->compressed_data_buffer may be
|
|
|
|
|
* more than max_outgoing. Start another round of the current |
|
|
|
|
* while loop so that send_bytes and is_last_data_frame are |
|
|
|
|
* recalculated. */ |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
is_last_frame = |
|
|
|
|
is_last_data_frame && s->send_trailing_metadata != NULL && |
|
|
|
|
grpc_metadata_batch_is_empty(s->send_trailing_metadata); |
|
|
|
|
grpc_chttp2_encode_data(s->id, &s->compressed_data_buffer, |
|
|
|
|
send_bytes, is_last_frame, |
|
|
|
|
&s->stats.outgoing, &t->outbuf); |
|
|
|
|
grpc_chttp2_flowctl_sent_data(&t->flow_control, &s->flow_control, |
|
|
|
|
send_bytes); |
|
|
|
|
max_outgoing -= send_bytes; |
|
|
|
|
if (s->compressed_data_buffer.length == 0) { |
|
|
|
|
s->sending_bytes += s->uncompressed_data_size; |
|
|
|
|
|
|
|
|
|
void FlushData(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
if (!s_->sent_initial_metadata) return; |
|
|
|
|
|
|
|
|
|
if (s_->flow_controlled_buffer.length == 0 && |
|
|
|
|
s_->compressed_data_buffer.length == 0) { |
|
|
|
|
return; // early out: nothing to do
|
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
if (s->stream_compression_ctx == NULL) { |
|
|
|
|
s->stream_compression_ctx = |
|
|
|
|
grpc_stream_compression_context_create( |
|
|
|
|
s->stream_compression_method); |
|
|
|
|
} |
|
|
|
|
s->uncompressed_data_size = s->flow_controlled_buffer.length; |
|
|
|
|
if (!grpc_stream_compress( |
|
|
|
|
s->stream_compression_ctx, &s->flow_controlled_buffer, |
|
|
|
|
&s->compressed_data_buffer, NULL, MAX_SIZE_T, |
|
|
|
|
GRPC_STREAM_COMPRESSION_FLUSH_SYNC)) { |
|
|
|
|
gpr_log(GPR_ERROR, "Stream compression failed."); |
|
|
|
|
|
|
|
|
|
DataSendContext data_send_context(write_context_, t_, s_); |
|
|
|
|
|
|
|
|
|
if (!data_send_context.AnyOutgoing()) { |
|
|
|
|
if (t_->flow_control.remote_window == 0) { |
|
|
|
|
report_stall(t_, s_, "transport"); |
|
|
|
|
grpc_chttp2_list_add_stalled_by_transport(t_, s_); |
|
|
|
|
} else if (data_send_context.stream_remote_window() == 0) { |
|
|
|
|
report_stall(t_, s_, "stream"); |
|
|
|
|
grpc_chttp2_list_add_stalled_by_stream(t_, s_); |
|
|
|
|
} |
|
|
|
|
return; // early out: nothing to do
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
while ((s_->flow_controlled_buffer.length > 0 || |
|
|
|
|
s_->compressed_data_buffer.length > 0) && |
|
|
|
|
data_send_context.max_outgoing() > 0) { |
|
|
|
|
if (s_->compressed_data_buffer.length > 0) { |
|
|
|
|
data_send_context.FlushCompressedBytes(); |
|
|
|
|
} else { |
|
|
|
|
data_send_context.CompressMoreBytes(); |
|
|
|
|
} |
|
|
|
|
if (!t->is_client) { |
|
|
|
|
t->ping_recv_state.last_ping_recv_time = 0; |
|
|
|
|
t->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
if (is_last_frame) { |
|
|
|
|
s->send_trailing_metadata = NULL; |
|
|
|
|
s->sent_trailing_metadata = true; |
|
|
|
|
if (!t->is_client && !s->read_closed) { |
|
|
|
|
grpc_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create( |
|
|
|
|
s->id, GRPC_HTTP2_NO_ERROR, |
|
|
|
|
&s->stats.outgoing)); |
|
|
|
|
write_context_->ResetPingRecvClock(); |
|
|
|
|
if (data_send_context.is_last_frame()) { |
|
|
|
|
SentLastFrame(exec_ctx); |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
data_send_context.CallCallbacks(exec_ctx); |
|
|
|
|
stream_became_writable_ = true; |
|
|
|
|
if (s_->flow_controlled_buffer.length > 0 || |
|
|
|
|
s_->compressed_data_buffer.length > 0) { |
|
|
|
|
GRPC_CHTTP2_STREAM_REF(s_, "chttp2_writing:fork"); |
|
|
|
|
grpc_chttp2_list_add_writable_stream(t_, s_); |
|
|
|
|
} |
|
|
|
|
write_context_->IncMessageWrites(); |
|
|
|
|
} |
|
|
|
|
result.early_results_scheduled |= |
|
|
|
|
update_list(exec_ctx, t, s, |
|
|
|
|
(int64_t)(s->sending_bytes - sending_bytes_before), |
|
|
|
|
&s->on_flow_controlled_cbs, |
|
|
|
|
&s->flow_controlled_bytes_flowed, GRPC_ERROR_NONE); |
|
|
|
|
now_writing = true; |
|
|
|
|
if (s->flow_controlled_buffer.length > 0 || |
|
|
|
|
s->compressed_data_buffer.length > 0) { |
|
|
|
|
GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:fork"); |
|
|
|
|
grpc_chttp2_list_add_writable_stream(t, s); |
|
|
|
|
} |
|
|
|
|
message_writes++; |
|
|
|
|
} else if (t->flow_control.remote_window == 0) { |
|
|
|
|
report_stall(t, s, "transport"); |
|
|
|
|
grpc_chttp2_list_add_stalled_by_transport(t, s); |
|
|
|
|
now_writing = true; |
|
|
|
|
} else if (stream_remote_window == 0) { |
|
|
|
|
report_stall(t, s, "stream"); |
|
|
|
|
grpc_chttp2_list_add_stalled_by_stream(t, s); |
|
|
|
|
now_writing = true; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (s->send_trailing_metadata != NULL && |
|
|
|
|
s->fetching_send_message == NULL && |
|
|
|
|
s->flow_controlled_buffer.length == 0 && |
|
|
|
|
s->compressed_data_buffer.length == 0) { |
|
|
|
|
|
|
|
|
|
void FlushTrailingMetadata(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
if (!s_->sent_initial_metadata) return; |
|
|
|
|
|
|
|
|
|
if (s_->send_trailing_metadata == NULL) return; |
|
|
|
|
if (s_->fetching_send_message != NULL) return; |
|
|
|
|
if (s_->flow_controlled_buffer.length != 0) return; |
|
|
|
|
if (s_->compressed_data_buffer.length != 0) return; |
|
|
|
|
|
|
|
|
|
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata")); |
|
|
|
|
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) { |
|
|
|
|
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true, |
|
|
|
|
&s->stats.outgoing, &t->outbuf); |
|
|
|
|
if (grpc_metadata_batch_is_empty(s_->send_trailing_metadata)) { |
|
|
|
|
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true, |
|
|
|
|
&s_->stats.outgoing, &t_->outbuf); |
|
|
|
|
} else { |
|
|
|
|
grpc_encode_header_options hopt = { |
|
|
|
|
s->id, true, |
|
|
|
|
|
|
|
|
|
t->settings |
|
|
|
|
[GRPC_PEER_SETTINGS] |
|
|
|
|
s_->id, true, |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != |
|
|
|
|
0, |
|
|
|
|
|
|
|
|
|
t->settings[GRPC_PEER_SETTINGS] |
|
|
|
|
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], |
|
|
|
|
&s->stats.outgoing}; |
|
|
|
|
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, |
|
|
|
|
extra_headers_for_trailing_metadata, |
|
|
|
|
num_extra_headers_for_trailing_metadata, |
|
|
|
|
s->send_trailing_metadata, &hopt, |
|
|
|
|
&t->outbuf); |
|
|
|
|
trailing_metadata_writes++; |
|
|
|
|
} |
|
|
|
|
s->send_trailing_metadata = NULL; |
|
|
|
|
s->sent_trailing_metadata = true; |
|
|
|
|
if (!t->is_client) { |
|
|
|
|
t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; |
|
|
|
|
t->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
if (!t->is_client && !s->read_closed) { |
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t->outbuf, grpc_chttp2_rst_stream_create( |
|
|
|
|
s->id, GRPC_HTTP2_NO_ERROR, &s->stats.outgoing)); |
|
|
|
|
t_->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], |
|
|
|
|
&s_->stats.outgoing}; |
|
|
|
|
grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, |
|
|
|
|
extra_headers_for_trailing_metadata_, |
|
|
|
|
num_extra_headers_for_trailing_metadata_, |
|
|
|
|
s_->send_trailing_metadata, &hopt, &t_->outbuf); |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, t, s, !t->is_client, 1, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
now_writing = true; |
|
|
|
|
result.early_results_scheduled = true; |
|
|
|
|
write_context_->IncTrailingMetadataWrites(); |
|
|
|
|
write_context_->ResetPingRecvClock(); |
|
|
|
|
SentLastFrame(exec_ctx); |
|
|
|
|
|
|
|
|
|
write_context_->NoteScheduledResults(); |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
exec_ctx, t, s, &s->send_trailing_metadata_finished, |
|
|
|
|
GRPC_ERROR_NONE, "send_trailing_metadata_finished"); |
|
|
|
|
exec_ctx, t_, s_, &s_->send_trailing_metadata_finished, GRPC_ERROR_NONE, |
|
|
|
|
"send_trailing_metadata_finished"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
bool stream_became_writable() { return stream_became_writable_; } |
|
|
|
|
|
|
|
|
|
private: |
|
|
|
|
void ConvertInitialMetadataToTrailingMetadata() { |
|
|
|
|
GRPC_CHTTP2_IF_TRACING( |
|
|
|
|
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)")); |
|
|
|
|
// When sending Trailers-Only, we need to move the :status and
|
|
|
|
|
// content-type headers to the trailers.
|
|
|
|
|
if (s_->send_initial_metadata->idx.named.status != NULL) { |
|
|
|
|
extra_headers_for_trailing_metadata_ |
|
|
|
|
[num_extra_headers_for_trailing_metadata_++] = |
|
|
|
|
&s_->send_initial_metadata->idx.named.status->md; |
|
|
|
|
} |
|
|
|
|
if (s_->send_initial_metadata->idx.named.content_type != NULL) { |
|
|
|
|
extra_headers_for_trailing_metadata_ |
|
|
|
|
[num_extra_headers_for_trailing_metadata_++] = |
|
|
|
|
&s_->send_initial_metadata->idx.named.content_type->md; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (now_writing) { |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_INITIAL_METADATA_PER_WRITE( |
|
|
|
|
exec_ctx, initial_metadata_writes); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_MESSAGE_PER_WRITE(exec_ctx, message_writes); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_TRAILING_METADATA_PER_WRITE( |
|
|
|
|
exec_ctx, trailing_metadata_writes); |
|
|
|
|
GRPC_STATS_INC_HTTP2_SEND_FLOWCTL_PER_WRITE(exec_ctx, |
|
|
|
|
flow_control_writes); |
|
|
|
|
void SentLastFrame(grpc_exec_ctx *exec_ctx) { |
|
|
|
|
s_->send_trailing_metadata = NULL; |
|
|
|
|
s_->sent_trailing_metadata = true; |
|
|
|
|
|
|
|
|
|
if (!t_->is_client && !s_->read_closed) { |
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t_->outbuf, grpc_chttp2_rst_stream_create( |
|
|
|
|
s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing)); |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_mark_stream_closed(exec_ctx, t_, s_, !t_->is_client, true, |
|
|
|
|
GRPC_ERROR_NONE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
WriteContext *const write_context_; |
|
|
|
|
grpc_chttp2_transport *const t_; |
|
|
|
|
grpc_chttp2_stream *const s_; |
|
|
|
|
bool stream_became_writable_ = false; |
|
|
|
|
grpc_mdelem *extra_headers_for_trailing_metadata_[2]; |
|
|
|
|
size_t num_extra_headers_for_trailing_metadata_ = 0; |
|
|
|
|
}; |
|
|
|
|
} // namespace
|
|
|
|
|
|
|
|
|
|
grpc_chttp2_begin_write_result grpc_chttp2_begin_write( |
|
|
|
|
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { |
|
|
|
|
WriteContext ctx(exec_ctx, t); |
|
|
|
|
ctx.FlushSettings(exec_ctx); |
|
|
|
|
ctx.FlushPingAcks(); |
|
|
|
|
ctx.FlushQueuedBuffers(exec_ctx); |
|
|
|
|
ctx.EnactHpackSettings(exec_ctx); |
|
|
|
|
|
|
|
|
|
if (t->flow_control.remote_window > 0) { |
|
|
|
|
ctx.UpdateStreamsNoLongerStalled(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* for each grpc_chttp2_stream that's become writable, frame it's data
|
|
|
|
|
(according to available window sizes) and add to the output buffer */ |
|
|
|
|
while (grpc_chttp2_stream *s = ctx.NextStream()) { |
|
|
|
|
StreamWriteContext stream_ctx(&ctx, s); |
|
|
|
|
stream_ctx.FlushInitialMetadata(exec_ctx); |
|
|
|
|
stream_ctx.FlushWindowUpdates(exec_ctx); |
|
|
|
|
stream_ctx.FlushData(exec_ctx); |
|
|
|
|
stream_ctx.FlushTrailingMetadata(exec_ctx); |
|
|
|
|
|
|
|
|
|
if (stream_ctx.stream_became_writable()) { |
|
|
|
|
if (!grpc_chttp2_list_add_writing_stream(t, s)) { |
|
|
|
|
/* already in writing list: drop ref */ |
|
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:already_writing"); |
|
|
|
|
} else { |
|
|
|
|
/* ref will be dropped at end of write */ |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "chttp2_writing:no_write"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
maybe_initiate_ping(exec_ctx, t); |
|
|
|
|
ctx.FlushWindowUpdates(exec_ctx); |
|
|
|
|
|
|
|
|
|
uint32_t transport_announce = grpc_chttp2_flowctl_maybe_send_transport_update( |
|
|
|
|
&t->flow_control, t->outbuf.count > 0); |
|
|
|
|
if (transport_announce) { |
|
|
|
|
grpc_transport_one_way_stats throwaway_stats; |
|
|
|
|
grpc_slice_buffer_add( |
|
|
|
|
&t->outbuf, grpc_chttp2_window_update_create(0, transport_announce, |
|
|
|
|
&throwaway_stats)); |
|
|
|
|
if (!t->is_client) { |
|
|
|
|
t->ping_recv_state.last_ping_recv_time = GRPC_MILLIS_INF_PAST; |
|
|
|
|
t->ping_recv_state.ping_strikes = 0; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
maybe_initiate_ping(exec_ctx, t); |
|
|
|
|
|
|
|
|
|
GPR_TIMER_END("grpc_chttp2_begin_write", 0); |
|
|
|
|
|
|
|
|
|
result.writing = t->outbuf.count > 0; |
|
|
|
|
return result; |
|
|
|
|
return ctx.Result(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void grpc_chttp2_end_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, |
|
|
|
|