|
|
|
@ -1362,6 +1362,241 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void send_initial_metadata_locked( |
|
|
|
|
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s, |
|
|
|
|
grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t, grpc_closure* on_complete) { |
|
|
|
|
if (!grpc_core::IsCallTracerInTransportEnabled()) { |
|
|
|
|
if (s->call_tracer != nullptr) { |
|
|
|
|
s->call_tracer->RecordAnnotation( |
|
|
|
|
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, |
|
|
|
|
gpr_now(GPR_CLOCK_REALTIME)) |
|
|
|
|
.Add(s->t->flow_control.stats()) |
|
|
|
|
.Add(s->flow_control.stats())); |
|
|
|
|
} |
|
|
|
|
} else if (grpc_core::IsTraceRecordCallopsEnabled()) { |
|
|
|
|
auto* call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>(); |
|
|
|
|
if (call_tracer != nullptr && call_tracer->IsSampled()) { |
|
|
|
|
call_tracer->RecordAnnotation( |
|
|
|
|
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, |
|
|
|
|
gpr_now(GPR_CLOCK_REALTIME)) |
|
|
|
|
.Add(s->t->flow_control.stats()) |
|
|
|
|
.Add(s->flow_control.stats())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (t->is_client && t->channelz_socket != nullptr) { |
|
|
|
|
t->channelz_socket->RecordStreamStartedFromLocal(); |
|
|
|
|
} |
|
|
|
|
CHECK_EQ(s->send_initial_metadata_finished, nullptr); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
|
|
|
|
|
s->send_initial_metadata_finished = add_closure_barrier(on_complete); |
|
|
|
|
s->send_initial_metadata = |
|
|
|
|
op_payload->send_initial_metadata.send_initial_metadata; |
|
|
|
|
if (t->is_client) { |
|
|
|
|
s->deadline = |
|
|
|
|
std::min(s->deadline, |
|
|
|
|
s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata()) |
|
|
|
|
.value_or(grpc_core::Timestamp::InfFuture())); |
|
|
|
|
} |
|
|
|
|
if (contains_non_ok_status(s->send_initial_metadata)) { |
|
|
|
|
s->seen_error = true; |
|
|
|
|
} |
|
|
|
|
if (!s->write_closed) { |
|
|
|
|
if (t->is_client) { |
|
|
|
|
if (t->closed_with_error.ok()) { |
|
|
|
|
CHECK_EQ(s->id, 0u); |
|
|
|
|
grpc_chttp2_list_add_waiting_for_concurrency(t, s); |
|
|
|
|
maybe_start_some_streams(t); |
|
|
|
|
} else { |
|
|
|
|
s->trailing_metadata_buffer.Set( |
|
|
|
|
grpc_core::GrpcStreamNetworkState(), |
|
|
|
|
grpc_core::GrpcStreamNetworkState::kNotSentOnWire); |
|
|
|
|
grpc_chttp2_cancel_stream( |
|
|
|
|
t, s, |
|
|
|
|
grpc_error_set_int( |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("Transport closed", |
|
|
|
|
&t->closed_with_error, 1), |
|
|
|
|
grpc_core::StatusIntProperty::kRpcStatus, |
|
|
|
|
GRPC_STATUS_UNAVAILABLE), |
|
|
|
|
false); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
CHECK_NE(s->id, 0u); |
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
if (!(op->send_message && |
|
|
|
|
(op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) { |
|
|
|
|
grpc_chttp2_initiate_write( |
|
|
|
|
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
s->send_initial_metadata = nullptr; |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
t, &s->send_initial_metadata_finished, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING( |
|
|
|
|
"Attempt to send initial metadata after stream was closed", |
|
|
|
|
&s->write_closed_error, 1), |
|
|
|
|
"send_initial_metadata_finished"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void send_message_locked( |
|
|
|
|
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s, |
|
|
|
|
grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t, grpc_closure* on_complete) { |
|
|
|
|
t->num_messages_in_next_write++; |
|
|
|
|
grpc_core::global_stats().IncrementHttp2SendMessageSize( |
|
|
|
|
op->payload->send_message.send_message->Length()); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
s->send_message_finished = add_closure_barrier(op->on_complete); |
|
|
|
|
const uint32_t flags = op_payload->send_message.flags; |
|
|
|
|
if (s->write_closed) { |
|
|
|
|
op->payload->send_message.stream_write_closed = true; |
|
|
|
|
// We should NOT return an error here, so as to avoid a cancel OP being
|
|
|
|
|
// started. The surface layer will notice that the stream has been closed
|
|
|
|
|
// for writes and fail the send message op.
|
|
|
|
|
grpc_chttp2_complete_closure_step(t, &s->send_message_finished, |
|
|
|
|
absl::OkStatus(), |
|
|
|
|
"fetching_send_message_finished"); |
|
|
|
|
} else { |
|
|
|
|
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer, |
|
|
|
|
GRPC_HEADER_SIZE_IN_BYTES); |
|
|
|
|
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; |
|
|
|
|
size_t len = op_payload->send_message.send_message->Length(); |
|
|
|
|
frame_hdr[1] = static_cast<uint8_t>(len >> 24); |
|
|
|
|
frame_hdr[2] = static_cast<uint8_t>(len >> 16); |
|
|
|
|
frame_hdr[3] = static_cast<uint8_t>(len >> 8); |
|
|
|
|
frame_hdr[4] = static_cast<uint8_t>(len); |
|
|
|
|
|
|
|
|
|
s->call_tracer_wrapper.RecordOutgoingBytes( |
|
|
|
|
{GRPC_HEADER_SIZE_IN_BYTES, len, 0}); |
|
|
|
|
s->next_message_end_offset = |
|
|
|
|
s->flow_controlled_bytes_written + |
|
|
|
|
static_cast<int64_t>(s->flow_controlled_buffer.length) + |
|
|
|
|
static_cast<int64_t>(len); |
|
|
|
|
if (flags & GRPC_WRITE_BUFFER_HINT) { |
|
|
|
|
s->next_message_end_offset -= t->write_buffer_size; |
|
|
|
|
s->write_buffering = true; |
|
|
|
|
} else { |
|
|
|
|
s->write_buffering = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_slice* const slices = |
|
|
|
|
op_payload->send_message.send_message->c_slice_buffer()->slices; |
|
|
|
|
grpc_slice* const end = |
|
|
|
|
slices + op_payload->send_message.send_message->Count(); |
|
|
|
|
for (grpc_slice* slice = slices; slice != end; slice++) { |
|
|
|
|
grpc_slice_buffer_add(&s->flow_controlled_buffer, |
|
|
|
|
grpc_core::CSliceRef(*slice)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int64_t notify_offset = s->next_message_end_offset; |
|
|
|
|
if (notify_offset <= s->flow_controlled_bytes_written) { |
|
|
|
|
grpc_chttp2_complete_closure_step(t, &s->send_message_finished, |
|
|
|
|
absl::OkStatus(), |
|
|
|
|
"fetching_send_message_finished"); |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_write_cb* cb = t->write_cb_pool; |
|
|
|
|
if (cb == nullptr) { |
|
|
|
|
cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb))); |
|
|
|
|
} else { |
|
|
|
|
t->write_cb_pool = cb->next; |
|
|
|
|
} |
|
|
|
|
cb->call_at_byte = notify_offset; |
|
|
|
|
cb->closure = s->send_message_finished; |
|
|
|
|
s->send_message_finished = nullptr; |
|
|
|
|
grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH |
|
|
|
|
? &s->on_write_finished_cbs |
|
|
|
|
: &s->on_flow_controlled_cbs; |
|
|
|
|
cb->next = *list; |
|
|
|
|
*list = cb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length > |
|
|
|
|
t->write_buffer_size)) { |
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void send_trailing_metadata_locked( |
|
|
|
|
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s, |
|
|
|
|
grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t, grpc_closure* on_complete) { |
|
|
|
|
CHECK_EQ(s->send_trailing_metadata_finished, nullptr); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
s->send_trailing_metadata_finished = add_closure_barrier(on_complete); |
|
|
|
|
s->send_trailing_metadata = |
|
|
|
|
op_payload->send_trailing_metadata.send_trailing_metadata; |
|
|
|
|
s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent; |
|
|
|
|
s->write_buffering = false; |
|
|
|
|
if (contains_non_ok_status(s->send_trailing_metadata)) { |
|
|
|
|
s->seen_error = true; |
|
|
|
|
} |
|
|
|
|
if (s->write_closed) { |
|
|
|
|
s->send_trailing_metadata = nullptr; |
|
|
|
|
s->sent_trailing_metadata_op = nullptr; |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
t, &s->send_trailing_metadata_finished, |
|
|
|
|
op->payload->send_trailing_metadata.send_trailing_metadata->empty() |
|
|
|
|
? absl::OkStatus() |
|
|
|
|
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after " |
|
|
|
|
"stream was closed"), |
|
|
|
|
"send_trailing_metadata_finished"); |
|
|
|
|
} else if (s->id != 0) { |
|
|
|
|
// TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
|
// bytes before going writable
|
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
grpc_chttp2_initiate_write( |
|
|
|
|
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void recv_initial_metadata_locked( |
|
|
|
|
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t) { |
|
|
|
|
CHECK_EQ(s->recv_initial_metadata_ready, nullptr); |
|
|
|
|
s->recv_initial_metadata_ready = |
|
|
|
|
op_payload->recv_initial_metadata.recv_initial_metadata_ready; |
|
|
|
|
s->recv_initial_metadata = |
|
|
|
|
op_payload->recv_initial_metadata.recv_initial_metadata; |
|
|
|
|
s->trailing_metadata_available = |
|
|
|
|
op_payload->recv_initial_metadata.trailing_metadata_available; |
|
|
|
|
if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) { |
|
|
|
|
*s->trailing_metadata_available = true; |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void recv_message_locked( |
|
|
|
|
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t) { |
|
|
|
|
CHECK_EQ(s->recv_message_ready, nullptr); |
|
|
|
|
s->recv_message_ready = op_payload->recv_message.recv_message_ready; |
|
|
|
|
s->recv_message = op_payload->recv_message.recv_message; |
|
|
|
|
s->recv_message->emplace(); |
|
|
|
|
s->recv_message_flags = op_payload->recv_message.flags; |
|
|
|
|
s->call_failed_before_recv_message = |
|
|
|
|
op_payload->recv_message.call_failed_before_recv_message; |
|
|
|
|
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void recv_trailing_metadata_locked( |
|
|
|
|
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload, |
|
|
|
|
grpc_chttp2_transport* t) { |
|
|
|
|
CHECK_EQ(s->collecting_stats, nullptr); |
|
|
|
|
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats; |
|
|
|
|
CHECK_EQ(s->recv_trailing_metadata_finished, nullptr); |
|
|
|
|
s->recv_trailing_metadata_finished = |
|
|
|
|
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
|
|
|
|
s->recv_trailing_metadata = |
|
|
|
|
op_payload->recv_trailing_metadata.recv_trailing_metadata; |
|
|
|
|
s->final_metadata_requested = true; |
|
|
|
|
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void perform_stream_op_locked(void* stream_op, |
|
|
|
|
grpc_error_handle /*error_ignored*/) { |
|
|
|
|
grpc_transport_stream_op_batch* op = |
|
|
|
@ -1405,225 +1640,27 @@ static void perform_stream_op_locked(void* stream_op, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->send_initial_metadata) { |
|
|
|
|
if (!grpc_core::IsCallTracerInTransportEnabled()) { |
|
|
|
|
if (s->call_tracer != nullptr) { |
|
|
|
|
s->call_tracer->RecordAnnotation( |
|
|
|
|
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, |
|
|
|
|
gpr_now(GPR_CLOCK_REALTIME)) |
|
|
|
|
.Add(s->t->flow_control.stats()) |
|
|
|
|
.Add(s->flow_control.stats())); |
|
|
|
|
} |
|
|
|
|
} else if (grpc_core::IsTraceRecordCallopsEnabled()) { |
|
|
|
|
auto* call_tracer = |
|
|
|
|
s->arena->GetContext<grpc_core::CallTracerInterface>(); |
|
|
|
|
if (call_tracer != nullptr && call_tracer->IsSampled()) { |
|
|
|
|
call_tracer->RecordAnnotation( |
|
|
|
|
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart, |
|
|
|
|
gpr_now(GPR_CLOCK_REALTIME)) |
|
|
|
|
.Add(s->t->flow_control.stats()) |
|
|
|
|
.Add(s->flow_control.stats())); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (t->is_client && t->channelz_socket != nullptr) { |
|
|
|
|
t->channelz_socket->RecordStreamStartedFromLocal(); |
|
|
|
|
} |
|
|
|
|
CHECK_EQ(s->send_initial_metadata_finished, nullptr); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
|
|
|
|
|
s->send_initial_metadata_finished = add_closure_barrier(on_complete); |
|
|
|
|
s->send_initial_metadata = |
|
|
|
|
op_payload->send_initial_metadata.send_initial_metadata; |
|
|
|
|
if (t->is_client) { |
|
|
|
|
s->deadline = std::min( |
|
|
|
|
s->deadline, |
|
|
|
|
s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata()) |
|
|
|
|
.value_or(grpc_core::Timestamp::InfFuture())); |
|
|
|
|
} |
|
|
|
|
if (contains_non_ok_status(s->send_initial_metadata)) { |
|
|
|
|
s->seen_error = true; |
|
|
|
|
} |
|
|
|
|
if (!s->write_closed) { |
|
|
|
|
if (t->is_client) { |
|
|
|
|
if (t->closed_with_error.ok()) { |
|
|
|
|
CHECK_EQ(s->id, 0u); |
|
|
|
|
grpc_chttp2_list_add_waiting_for_concurrency(t, s); |
|
|
|
|
maybe_start_some_streams(t); |
|
|
|
|
} else { |
|
|
|
|
s->trailing_metadata_buffer.Set( |
|
|
|
|
grpc_core::GrpcStreamNetworkState(), |
|
|
|
|
grpc_core::GrpcStreamNetworkState::kNotSentOnWire); |
|
|
|
|
grpc_chttp2_cancel_stream( |
|
|
|
|
t, s, |
|
|
|
|
grpc_error_set_int( |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING("Transport closed", |
|
|
|
|
&t->closed_with_error, 1), |
|
|
|
|
grpc_core::StatusIntProperty::kRpcStatus, |
|
|
|
|
GRPC_STATUS_UNAVAILABLE), |
|
|
|
|
false); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
CHECK_NE(s->id, 0u); |
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
if (!(op->send_message && |
|
|
|
|
(op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) { |
|
|
|
|
grpc_chttp2_initiate_write( |
|
|
|
|
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
s->send_initial_metadata = nullptr; |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
t, &s->send_initial_metadata_finished, |
|
|
|
|
GRPC_ERROR_CREATE_REFERENCING( |
|
|
|
|
"Attempt to send initial metadata after stream was closed", |
|
|
|
|
&s->write_closed_error, 1), |
|
|
|
|
"send_initial_metadata_finished"); |
|
|
|
|
} |
|
|
|
|
send_initial_metadata_locked(op, s, op_payload, t, on_complete); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->send_message) { |
|
|
|
|
t->num_messages_in_next_write++; |
|
|
|
|
grpc_core::global_stats().IncrementHttp2SendMessageSize( |
|
|
|
|
op->payload->send_message.send_message->Length()); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
s->send_message_finished = add_closure_barrier(op->on_complete); |
|
|
|
|
const uint32_t flags = op_payload->send_message.flags; |
|
|
|
|
if (s->write_closed) { |
|
|
|
|
op->payload->send_message.stream_write_closed = true; |
|
|
|
|
// We should NOT return an error here, so as to avoid a cancel OP being
|
|
|
|
|
// started. The surface layer will notice that the stream has been closed
|
|
|
|
|
// for writes and fail the send message op.
|
|
|
|
|
grpc_chttp2_complete_closure_step(t, &s->send_message_finished, |
|
|
|
|
absl::OkStatus(), |
|
|
|
|
"fetching_send_message_finished"); |
|
|
|
|
} else { |
|
|
|
|
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add( |
|
|
|
|
&s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES); |
|
|
|
|
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0; |
|
|
|
|
size_t len = op_payload->send_message.send_message->Length(); |
|
|
|
|
frame_hdr[1] = static_cast<uint8_t>(len >> 24); |
|
|
|
|
frame_hdr[2] = static_cast<uint8_t>(len >> 16); |
|
|
|
|
frame_hdr[3] = static_cast<uint8_t>(len >> 8); |
|
|
|
|
frame_hdr[4] = static_cast<uint8_t>(len); |
|
|
|
|
|
|
|
|
|
s->call_tracer_wrapper.RecordOutgoingBytes( |
|
|
|
|
{GRPC_HEADER_SIZE_IN_BYTES, len, 0}); |
|
|
|
|
s->next_message_end_offset = |
|
|
|
|
s->flow_controlled_bytes_written + |
|
|
|
|
static_cast<int64_t>(s->flow_controlled_buffer.length) + |
|
|
|
|
static_cast<int64_t>(len); |
|
|
|
|
if (flags & GRPC_WRITE_BUFFER_HINT) { |
|
|
|
|
s->next_message_end_offset -= t->write_buffer_size; |
|
|
|
|
s->write_buffering = true; |
|
|
|
|
} else { |
|
|
|
|
s->write_buffering = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
grpc_slice* const slices = |
|
|
|
|
op_payload->send_message.send_message->c_slice_buffer()->slices; |
|
|
|
|
grpc_slice* const end = |
|
|
|
|
slices + op_payload->send_message.send_message->Count(); |
|
|
|
|
for (grpc_slice* slice = slices; slice != end; slice++) { |
|
|
|
|
grpc_slice_buffer_add(&s->flow_controlled_buffer, |
|
|
|
|
grpc_core::CSliceRef(*slice)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int64_t notify_offset = s->next_message_end_offset; |
|
|
|
|
if (notify_offset <= s->flow_controlled_bytes_written) { |
|
|
|
|
grpc_chttp2_complete_closure_step(t, &s->send_message_finished, |
|
|
|
|
absl::OkStatus(), |
|
|
|
|
"fetching_send_message_finished"); |
|
|
|
|
} else { |
|
|
|
|
grpc_chttp2_write_cb* cb = t->write_cb_pool; |
|
|
|
|
if (cb == nullptr) { |
|
|
|
|
cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb))); |
|
|
|
|
} else { |
|
|
|
|
t->write_cb_pool = cb->next; |
|
|
|
|
} |
|
|
|
|
cb->call_at_byte = notify_offset; |
|
|
|
|
cb->closure = s->send_message_finished; |
|
|
|
|
s->send_message_finished = nullptr; |
|
|
|
|
grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH |
|
|
|
|
? &s->on_write_finished_cbs |
|
|
|
|
: &s->on_flow_controlled_cbs; |
|
|
|
|
cb->next = *list; |
|
|
|
|
*list = cb; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (s->id != 0 && |
|
|
|
|
(!s->write_buffering || |
|
|
|
|
s->flow_controlled_buffer.length > t->write_buffer_size)) { |
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
send_message_locked(op, s, op_payload, t, on_complete); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->send_trailing_metadata) { |
|
|
|
|
CHECK_EQ(s->send_trailing_metadata_finished, nullptr); |
|
|
|
|
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write; |
|
|
|
|
s->send_trailing_metadata_finished = add_closure_barrier(on_complete); |
|
|
|
|
s->send_trailing_metadata = |
|
|
|
|
op_payload->send_trailing_metadata.send_trailing_metadata; |
|
|
|
|
s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent; |
|
|
|
|
s->write_buffering = false; |
|
|
|
|
if (contains_non_ok_status(s->send_trailing_metadata)) { |
|
|
|
|
s->seen_error = true; |
|
|
|
|
} |
|
|
|
|
if (s->write_closed) { |
|
|
|
|
s->send_trailing_metadata = nullptr; |
|
|
|
|
s->sent_trailing_metadata_op = nullptr; |
|
|
|
|
grpc_chttp2_complete_closure_step( |
|
|
|
|
t, &s->send_trailing_metadata_finished, |
|
|
|
|
op->payload->send_trailing_metadata.send_trailing_metadata->empty() |
|
|
|
|
? absl::OkStatus() |
|
|
|
|
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after " |
|
|
|
|
"stream was closed"), |
|
|
|
|
"send_trailing_metadata_finished"); |
|
|
|
|
} else if (s->id != 0) { |
|
|
|
|
// TODO(ctiller): check if there's flow control for any outstanding
|
|
|
|
|
// bytes before going writable
|
|
|
|
|
grpc_chttp2_mark_stream_writable(t, s); |
|
|
|
|
grpc_chttp2_initiate_write( |
|
|
|
|
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA); |
|
|
|
|
} |
|
|
|
|
send_trailing_metadata_locked(op, s, op_payload, t, on_complete); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->recv_initial_metadata) { |
|
|
|
|
CHECK_EQ(s->recv_initial_metadata_ready, nullptr); |
|
|
|
|
s->recv_initial_metadata_ready = |
|
|
|
|
op_payload->recv_initial_metadata.recv_initial_metadata_ready; |
|
|
|
|
s->recv_initial_metadata = |
|
|
|
|
op_payload->recv_initial_metadata.recv_initial_metadata; |
|
|
|
|
s->trailing_metadata_available = |
|
|
|
|
op_payload->recv_initial_metadata.trailing_metadata_available; |
|
|
|
|
if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) { |
|
|
|
|
*s->trailing_metadata_available = true; |
|
|
|
|
} |
|
|
|
|
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s); |
|
|
|
|
recv_initial_metadata_locked(s, op_payload, t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->recv_message) { |
|
|
|
|
CHECK_EQ(s->recv_message_ready, nullptr); |
|
|
|
|
s->recv_message_ready = op_payload->recv_message.recv_message_ready; |
|
|
|
|
s->recv_message = op_payload->recv_message.recv_message; |
|
|
|
|
s->recv_message->emplace(); |
|
|
|
|
s->recv_message_flags = op_payload->recv_message.flags; |
|
|
|
|
s->call_failed_before_recv_message = |
|
|
|
|
op_payload->recv_message.call_failed_before_recv_message; |
|
|
|
|
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
|
|
|
|
recv_message_locked(s, op_payload, t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (op->recv_trailing_metadata) { |
|
|
|
|
CHECK_EQ(s->collecting_stats, nullptr); |
|
|
|
|
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats; |
|
|
|
|
CHECK_EQ(s->recv_trailing_metadata_finished, nullptr); |
|
|
|
|
s->recv_trailing_metadata_finished = |
|
|
|
|
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready; |
|
|
|
|
s->recv_trailing_metadata = |
|
|
|
|
op_payload->recv_trailing_metadata.recv_trailing_metadata; |
|
|
|
|
s->final_metadata_requested = true; |
|
|
|
|
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s); |
|
|
|
|
recv_trailing_metadata_locked(s, op_payload, t); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (on_complete != nullptr) { |
|
|
|
|