[chttp2] Make outbuf a SliceBuffer (#34495)

Really minimal change to make the output buffer for chttp2 be a
`grpc_core::SliceBuffer` so that we can start mixing in the new framer
code.

---------

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/34536/head
Craig Tiller 2 years ago committed by GitHub
parent 232611bfc2
commit b517a3d792
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 2
      src/core/ext/transport/chttp2/transport/internal.h
  3. 58
      src/core/ext/transport/chttp2/transport/writing.cc

@ -327,8 +327,6 @@ grpc_chttp2_transport::~grpc_chttp2_transport() {
grpc_slice_buffer_destroy(&qbuf);
grpc_slice_buffer_destroy(&outbuf);
grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
// ContextList::Execute follows semantics of a callback function and does not
// take a ref on error
@ -590,10 +588,10 @@ grpc_chttp2_transport::grpc_chttp2_transport(
base.vtable = get_vtable();
grpc_slice_buffer_init(&read_buffer);
grpc_slice_buffer_init(&outbuf);
if (is_client) {
grpc_slice_buffer_add(&outbuf, grpc_slice_from_copied_string(
GRPC_CHTTP2_CLIENT_CONNECT_STRING));
grpc_slice_buffer_add(
outbuf.c_slice_buffer(),
grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
grpc_slice_buffer_init(&qbuf);
// copy in initial settings to all setting sets
@ -1016,7 +1014,7 @@ static void write_action(grpc_chttp2_transport* t) {
if (max_frame_size == 0) {
max_frame_size = INT_MAX;
}
grpc_endpoint_write(t->ep, &t->outbuf,
grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(),
grpc_core::InitTransportClosure<write_action_end>(
t->Ref(), &t->write_action_end_locked),
cl, max_frame_size);

@ -311,7 +311,7 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_core::ConnectivityStateTracker state_tracker;
/// data to write now
grpc_slice_buffer outbuf;
grpc_core::SliceBuffer outbuf;
/// hpack encoding
grpc_core::HPackCompressor hpack_compressor;

@ -64,6 +64,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/bdp_estimator.h"
#include "src/core/lib/transport/http2_errors.h"
#include "src/core/lib/transport/metadata_batch.h"
@ -134,7 +135,7 @@ static void maybe_initiate_ping(grpc_chttp2_transport* t) {
&pq->lists[GRPC_CHTTP2_PCL_INITIATE]);
grpc_closure_list_move(&pq->lists[GRPC_CHTTP2_PCL_NEXT],
&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT]);
grpc_slice_buffer_add(&t->outbuf,
grpc_slice_buffer_add(t->outbuf.c_slice_buffer(),
grpc_chttp2_ping_create(false, pq->inflight_id));
grpc_core::global_stats().IncrementHttp2PingsSent();
if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
@ -270,10 +271,11 @@ class WriteContext {
void FlushSettings() {
if (t_->dirtied_local_settings && !t_->sent_local_settings) {
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_settings_create(
t_->settings[GRPC_SENT_SETTINGS],
t_->settings[GRPC_LOCAL_SETTINGS],
t_->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
t_->outbuf.c_slice_buffer(),
grpc_chttp2_settings_create(t_->settings[GRPC_SENT_SETTINGS],
t_->settings[GRPC_LOCAL_SETTINGS],
t_->force_send_settings,
GRPC_CHTTP2_NUM_SETTINGS));
t_->force_send_settings = false;
t_->dirtied_local_settings = false;
t_->sent_local_settings = true;
@ -284,26 +286,26 @@ class WriteContext {
void FlushQueuedBuffers() {
// simple writes are queued to qbuf, and flushed here
grpc_slice_buffer_move_into(&t_->qbuf, &t_->outbuf);
grpc_slice_buffer_move_into(&t_->qbuf, t_->outbuf.c_slice_buffer());
t_->num_pending_induced_frames = 0;
GPR_ASSERT(t_->qbuf.count == 0);
}
void FlushWindowUpdates() {
uint32_t transport_announce =
t_->flow_control.MaybeSendUpdate(t_->outbuf.count > 0);
uint32_t transport_announce = t_->flow_control.MaybeSendUpdate(
t_->outbuf.c_slice_buffer()->count > 0);
if (transport_announce) {
grpc_transport_one_way_stats throwaway_stats;
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_window_update_create(0, transport_announce,
&throwaway_stats));
grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(
0, transport_announce, &throwaway_stats));
grpc_chttp2_reset_ping_clock(t_);
}
}
void FlushPingAcks() {
for (size_t i = 0; i < t_->ping_ack_count; i++) {
grpc_slice_buffer_add(&t_->outbuf,
grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
grpc_chttp2_ping_create(true, t_->ping_acks[i]));
}
t_->ping_ack_count = 0;
@ -328,7 +330,7 @@ class WriteContext {
}
grpc_chttp2_stream* NextStream() {
if (t_->outbuf.length > target_write_size(t_)) {
if (t_->outbuf.c_slice_buffer()->length > target_write_size(t_)) {
result_.partial = true;
return nullptr;
}
@ -351,7 +353,7 @@ class WriteContext {
grpc_chttp2_transport* transport() const { return t_; }
grpc_chttp2_begin_write_result Result() {
result_.writing = t_->outbuf.count > 0;
result_.writing = t_->outbuf.c_slice_buffer()->count > 0;
return result_;
}
@ -403,7 +405,8 @@ class DataSendContext {
s_->send_trailing_metadata != nullptr &&
s_->send_trailing_metadata->empty();
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
is_last_frame_, &s_->stats.outgoing,
t_->outbuf.c_slice_buffer());
sfc_upd_.SentData(send_bytes);
s_->sending_bytes += send_bytes;
}
@ -468,7 +471,7 @@ class StreamWriteContext {
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
&s_->stats.outgoing // stats
},
*s_->send_initial_metadata, &t_->outbuf);
*s_->send_initial_metadata, t_->outbuf.c_slice_buffer());
grpc_chttp2_reset_ping_clock(t_);
write_context_->IncInitialMetadataWrites();
}
@ -488,9 +491,9 @@ class StreamWriteContext {
const uint32_t stream_announce = s_->flow_control.MaybeSendUpdate();
if (stream_announce == 0) return;
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_window_update_create(s_->id, stream_announce,
&s_->stats.outgoing));
grpc_slice_buffer_add(t_->outbuf.c_slice_buffer(),
grpc_chttp2_window_update_create(
s_->id, stream_announce, &s_->stats.outgoing));
grpc_chttp2_reset_ping_clock(t_);
write_context_->IncWindowUpdateWrites();
}
@ -543,7 +546,7 @@ class StreamWriteContext {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (s_->send_trailing_metadata->empty()) {
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
&s_->stats.outgoing, &t_->outbuf);
&s_->stats.outgoing, t_->outbuf.c_slice_buffer());
} else {
if (send_status_.has_value()) {
s_->send_trailing_metadata->Set(grpc_core::HttpStatusMetadata(),
@ -563,7 +566,7 @@ class StreamWriteContext {
t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
&s_->stats.outgoing},
*s_->send_trailing_metadata, &t_->outbuf);
*s_->send_trailing_metadata, t_->outbuf.c_slice_buffer());
}
write_context_->IncTrailingMetadataWrites();
grpc_chttp2_reset_ping_clock(t_);
@ -600,8 +603,9 @@ class StreamWriteContext {
if (!t_->is_client && !s_->read_closed) {
grpc_slice_buffer_add(
&t_->outbuf, grpc_chttp2_rst_stream_create(
s_->id, GRPC_HTTP2_NO_ERROR, &s_->stats.outgoing));
t_->outbuf.c_slice_buffer(),
grpc_chttp2_rst_stream_create(s_->id, GRPC_HTTP2_NO_ERROR,
&s_->stats.outgoing));
}
grpc_chttp2_mark_stream_closed(t_, s_, !t_->is_client, true,
absl::OkStatus());
@ -634,15 +638,15 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
// (according to available window sizes) and add to the output buffer
while (grpc_chttp2_stream* s = ctx.NextStream()) {
StreamWriteContext stream_ctx(&ctx, s);
size_t orig_len = t->outbuf.length;
size_t orig_len = t->outbuf.c_slice_buffer()->length;
int64_t num_stream_bytes = 0;
stream_ctx.FlushInitialMetadata();
stream_ctx.FlushWindowUpdates();
stream_ctx.FlushData();
stream_ctx.FlushTrailingMetadata();
if (t->outbuf.length > orig_len) {
if (t->outbuf.c_slice_buffer()->length > orig_len) {
// Add this stream to the list of the contexts to be traced at TCP
num_stream_bytes = t->outbuf.length - orig_len;
num_stream_bytes = t->outbuf.c_slice_buffer()->length - orig_len;
s->byte_counter += static_cast<size_t>(num_stream_bytes);
if (s->traced && grpc_endpoint_can_track_err(t->ep)) {
grpc_core::CopyContextFn copy_context_fn =
@ -692,5 +696,5 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error) {
}
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:end");
}
grpc_slice_buffer_reset_and_unref(&t->outbuf);
grpc_slice_buffer_reset_and_unref(t->outbuf.c_slice_buffer());
}

Loading…
Cancel
Save