[PH2][Refactor][FunctionSplit][chttp2_transport.cc] Splitting large function (#37453)

[PH2][Refactor][FunctionSplit][chttp2_transport.cc]

Splitting very large function perform_stream_op_locked into smaller functions.

Adding new functions :

1. send_initial_metadata_locked
2. send_message_locked
3. send_trailing_metadata_locked
4. recv_initial_metadata_locked
5. recv_message_locked
6. recv_trailing_metadata_locked

All the above functions get called from perform_stream_op_locked .

Why was this done?

I ran into test failures while working on this : https://github.com/grpc/grpc/pull/37186/files .
The test failed because the function execute_stream_op was too large. So I had to split the function as well in the same PR (which is messy).
Decided to do the same type of splitting here.

Closes #37453

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37453 from tanvi-jagtap:split_function 5e06594e81
PiperOrigin-RevId: 663570972
pull/37483/head
Tanvi Jagtap 3 months ago committed by Copybara-Service
parent 3063716cef
commit 44382df0fc
  1. 445
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc

@ -1362,6 +1362,241 @@ static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
});
}
static void send_initial_metadata_locked(
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t, grpc_closure* on_complete) {
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s->call_tracer != nullptr) {
s->call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
} else if (grpc_core::IsTraceRecordCallopsEnabled()) {
auto* call_tracer = s->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
}
if (t->is_client && t->channelz_socket != nullptr) {
t->channelz_socket->RecordStreamStartedFromLocal();
}
CHECK_EQ(s->send_initial_metadata_finished, nullptr);
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata =
op_payload->send_initial_metadata.send_initial_metadata;
if (t->is_client) {
s->deadline =
std::min(s->deadline,
s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
.value_or(grpc_core::Timestamp::InfFuture()));
}
if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
if (t->is_client) {
if (t->closed_with_error.ok()) {
CHECK_EQ(s->id, 0u);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(t);
} else {
s->trailing_metadata_buffer.Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Transport closed",
&t->closed_with_error, 1),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE),
false);
}
} else {
CHECK_NE(s->id, 0u);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
(op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
}
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
t, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
"send_initial_metadata_finished");
}
}
static void send_message_locked(
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t, grpc_closure* on_complete) {
t->num_messages_in_next_write++;
grpc_core::global_stats().IncrementHttp2SendMessageSize(
op->payload->send_message.send_message->Length());
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_message_finished = add_closure_barrier(op->on_complete);
const uint32_t flags = op_payload->send_message.flags;
if (s->write_closed) {
op->payload->send_message.stream_write_closed = true;
// We should NOT return an error here, so as to avoid a cancel OP being
// started. The surface layer will notice that the stream has been closed
// for writes and fail the send message op.
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(&s->flow_controlled_buffer,
GRPC_HEADER_SIZE_IN_BYTES);
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->Length();
frame_hdr[1] = static_cast<uint8_t>(len >> 24);
frame_hdr[2] = static_cast<uint8_t>(len >> 16);
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
s->call_tracer_wrapper.RecordOutgoingBytes(
{GRPC_HEADER_SIZE_IN_BYTES, len, 0});
s->next_message_end_offset =
s->flow_controlled_bytes_written +
static_cast<int64_t>(s->flow_controlled_buffer.length) +
static_cast<int64_t>(len);
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
} else {
s->write_buffering = false;
}
grpc_slice* const slices =
op_payload->send_message.send_message->c_slice_buffer()->slices;
grpc_slice* const end =
slices + op_payload->send_message.send_message->Count();
for (grpc_slice* slice = slices; slice != end; slice++) {
grpc_slice_buffer_add(&s->flow_controlled_buffer,
grpc_core::CSliceRef(*slice));
}
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
grpc_chttp2_write_cb* cb = t->write_cb_pool;
if (cb == nullptr) {
cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
} else {
t->write_cb_pool = cb->next;
}
cb->call_at_byte = notify_offset;
cb->closure = s->send_message_finished;
s->send_message_finished = nullptr;
grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
? &s->on_write_finished_cbs
: &s->on_flow_controlled_cbs;
cb->next = *list;
*list = cb;
}
if (s->id != 0 && (!s->write_buffering || s->flow_controlled_buffer.length >
t->write_buffer_size)) {
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
}
}
}
static void send_trailing_metadata_locked(
grpc_transport_stream_op_batch* op, grpc_chttp2_stream* s,
grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t, grpc_closure* on_complete) {
CHECK_EQ(s->send_trailing_metadata_finished, nullptr);
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata =
op_payload->send_trailing_metadata.send_trailing_metadata;
s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
s->write_buffering = false;
if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(
t, &s->send_trailing_metadata_finished,
op->payload->send_trailing_metadata.send_trailing_metadata->empty()
? absl::OkStatus()
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
"stream was closed"),
"send_trailing_metadata_finished");
} else if (s->id != 0) {
// TODO(ctiller): check if there's flow control for any outstanding
// bytes before going writable
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
}
static void recv_initial_metadata_locked(
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t) {
CHECK_EQ(s->recv_initial_metadata_ready, nullptr);
s->recv_initial_metadata_ready =
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
s->recv_initial_metadata =
op_payload->recv_initial_metadata.recv_initial_metadata;
s->trailing_metadata_available =
op_payload->recv_initial_metadata.trailing_metadata_available;
if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
*s->trailing_metadata_available = true;
}
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
}
static void recv_message_locked(
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t) {
CHECK_EQ(s->recv_message_ready, nullptr);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
s->recv_message->emplace();
s->recv_message_flags = op_payload->recv_message.flags;
s->call_failed_before_recv_message =
op_payload->recv_message.call_failed_before_recv_message;
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
}
static void recv_trailing_metadata_locked(
grpc_chttp2_stream* s, grpc_transport_stream_op_batch_payload* op_payload,
grpc_chttp2_transport* t) {
CHECK_EQ(s->collecting_stats, nullptr);
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
CHECK_EQ(s->recv_trailing_metadata_finished, nullptr);
s->recv_trailing_metadata_finished =
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
}
static void perform_stream_op_locked(void* stream_op,
grpc_error_handle /*error_ignored*/) {
grpc_transport_stream_op_batch* op =
@ -1405,225 +1640,27 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->send_initial_metadata) {
if (!grpc_core::IsCallTracerInTransportEnabled()) {
if (s->call_tracer != nullptr) {
s->call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
} else if (grpc_core::IsTraceRecordCallopsEnabled()) {
auto* call_tracer =
s->arena->GetContext<grpc_core::CallTracerInterface>();
if (call_tracer != nullptr && call_tracer->IsSampled()) {
call_tracer->RecordAnnotation(
grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
gpr_now(GPR_CLOCK_REALTIME))
.Add(s->t->flow_control.stats())
.Add(s->flow_control.stats()));
}
}
if (t->is_client && t->channelz_socket != nullptr) {
t->channelz_socket->RecordStreamStartedFromLocal();
}
CHECK_EQ(s->send_initial_metadata_finished, nullptr);
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata =
op_payload->send_initial_metadata.send_initial_metadata;
if (t->is_client) {
s->deadline = std::min(
s->deadline,
s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
.value_or(grpc_core::Timestamp::InfFuture()));
}
if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
if (t->is_client) {
if (t->closed_with_error.ok()) {
CHECK_EQ(s->id, 0u);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(t);
} else {
s->trailing_metadata_buffer.Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Transport closed",
&t->closed_with_error, 1),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE),
false);
}
} else {
CHECK_NE(s->id, 0u);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
(op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
}
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
t, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
"send_initial_metadata_finished");
}
send_initial_metadata_locked(op, s, op_payload, t, on_complete);
}
if (op->send_message) {
t->num_messages_in_next_write++;
grpc_core::global_stats().IncrementHttp2SendMessageSize(
op->payload->send_message.send_message->Length());
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_message_finished = add_closure_barrier(op->on_complete);
const uint32_t flags = op_payload->send_message.flags;
if (s->write_closed) {
op->payload->send_message.stream_write_closed = true;
// We should NOT return an error here, so as to avoid a cancel OP being
// started. The surface layer will notice that the stream has been closed
// for writes and fail the send message op.
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
&s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
size_t len = op_payload->send_message.send_message->Length();
frame_hdr[1] = static_cast<uint8_t>(len >> 24);
frame_hdr[2] = static_cast<uint8_t>(len >> 16);
frame_hdr[3] = static_cast<uint8_t>(len >> 8);
frame_hdr[4] = static_cast<uint8_t>(len);
s->call_tracer_wrapper.RecordOutgoingBytes(
{GRPC_HEADER_SIZE_IN_BYTES, len, 0});
s->next_message_end_offset =
s->flow_controlled_bytes_written +
static_cast<int64_t>(s->flow_controlled_buffer.length) +
static_cast<int64_t>(len);
if (flags & GRPC_WRITE_BUFFER_HINT) {
s->next_message_end_offset -= t->write_buffer_size;
s->write_buffering = true;
} else {
s->write_buffering = false;
}
grpc_slice* const slices =
op_payload->send_message.send_message->c_slice_buffer()->slices;
grpc_slice* const end =
slices + op_payload->send_message.send_message->Count();
for (grpc_slice* slice = slices; slice != end; slice++) {
grpc_slice_buffer_add(&s->flow_controlled_buffer,
grpc_core::CSliceRef(*slice));
}
int64_t notify_offset = s->next_message_end_offset;
if (notify_offset <= s->flow_controlled_bytes_written) {
grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
absl::OkStatus(),
"fetching_send_message_finished");
} else {
grpc_chttp2_write_cb* cb = t->write_cb_pool;
if (cb == nullptr) {
cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
} else {
t->write_cb_pool = cb->next;
}
cb->call_at_byte = notify_offset;
cb->closure = s->send_message_finished;
s->send_message_finished = nullptr;
grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
? &s->on_write_finished_cbs
: &s->on_flow_controlled_cbs;
cb->next = *list;
*list = cb;
}
if (s->id != 0 &&
(!s->write_buffering ||
s->flow_controlled_buffer.length > t->write_buffer_size)) {
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
}
}
send_message_locked(op, s, op_payload, t, on_complete);
}
if (op->send_trailing_metadata) {
CHECK_EQ(s->send_trailing_metadata_finished, nullptr);
on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
s->send_trailing_metadata =
op_payload->send_trailing_metadata.send_trailing_metadata;
s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
s->write_buffering = false;
if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(
t, &s->send_trailing_metadata_finished,
op->payload->send_trailing_metadata.send_trailing_metadata->empty()
? absl::OkStatus()
: GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
"stream was closed"),
"send_trailing_metadata_finished");
} else if (s->id != 0) {
// TODO(ctiller): check if there's flow control for any outstanding
// bytes before going writable
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
send_trailing_metadata_locked(op, s, op_payload, t, on_complete);
}
if (op->recv_initial_metadata) {
CHECK_EQ(s->recv_initial_metadata_ready, nullptr);
s->recv_initial_metadata_ready =
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
s->recv_initial_metadata =
op_payload->recv_initial_metadata.recv_initial_metadata;
s->trailing_metadata_available =
op_payload->recv_initial_metadata.trailing_metadata_available;
if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
*s->trailing_metadata_available = true;
}
grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
recv_initial_metadata_locked(s, op_payload, t);
}
if (op->recv_message) {
CHECK_EQ(s->recv_message_ready, nullptr);
s->recv_message_ready = op_payload->recv_message.recv_message_ready;
s->recv_message = op_payload->recv_message.recv_message;
s->recv_message->emplace();
s->recv_message_flags = op_payload->recv_message.flags;
s->call_failed_before_recv_message =
op_payload->recv_message.call_failed_before_recv_message;
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
recv_message_locked(s, op_payload, t);
}
if (op->recv_trailing_metadata) {
CHECK_EQ(s->collecting_stats, nullptr);
s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
CHECK_EQ(s->recv_trailing_metadata_finished, nullptr);
s->recv_trailing_metadata_finished =
op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
s->recv_trailing_metadata =
op_payload->recv_trailing_metadata.recv_trailing_metadata;
s->final_metadata_requested = true;
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
recv_trailing_metadata_locked(s, op_payload, t);
}
if (on_complete != nullptr) {

Loading…
Cancel
Save