[memory] Re-align caches for transport, stream, and tcp (#34397)

Cache realignment saves 64 bytes per transport, 40 bytes per stream, 8
bytes per tcp endpoint, and 24 bytes per transport op.
pull/34437/head
Alisha Nanda 1 year ago committed by GitHub
parent 9ee3562d84
commit 11c8b53f18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 104
      src/core/ext/transport/chttp2/transport/internal.h
  3. 29
      src/core/lib/iomgr/tcp_posix.cc
  4. 36
      src/core/lib/transport/transport.h

@ -575,7 +575,6 @@ grpc_chttp2_transport::grpc_chttp2_transport(
combiner(grpc_combiner_create(event_engine)),
state_tracker(is_client ? "client_transport" : "server_transport",
GRPC_CHANNEL_READY),
is_client(is_client),
next_stream_id(is_client ? 1 : 2),
ping_abuse_policy(channel_args),
ping_rate_policy(channel_args, is_client),
@ -583,7 +582,8 @@ grpc_chttp2_transport::grpc_chttp2_transport(
peer_string.as_string_view(),
channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
&memory_owner),
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) {
deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
is_client(is_client) {
cl = new grpc_core::ContextList();
GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);

@ -275,17 +275,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_closure* notify_on_receive_settings = nullptr;
grpc_closure* notify_on_close = nullptr;
/// write execution state of the transport
grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
/// is the transport destroying itself?
uint8_t destroying = false;
/// has the upper layer closed the transport?
grpc_error_handle closed_with_error;
/// is there a read request to the endpoint outstanding?
uint8_t endpoint_reading = 1;
/// various lists of streams
grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
@ -322,26 +314,16 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_slice_buffer outbuf;
/// hpack encoding
grpc_core::HPackCompressor hpack_compressor;
/// is this a client?
bool is_client;
/// data to write next write
grpc_slice_buffer qbuf;
/// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
///
uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
/// Set to a grpc_error object if a goaway frame is received. By default, set
/// to absl::OkStatus()
grpc_error_handle goaway_error;
grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
/// are the local settings dirty and need to be sent?
bool dirtied_local_settings = true;
/// have local settings been sent?
bool sent_local_settings = false;
/// bitmask of setting indexes to send out
/// Hack: it's common for implementations to assume 65536 bytes initial send
/// window -- this should by rights be 0
@ -396,7 +378,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
bool is_first_frame = true;
uint32_t expect_continuation_stream_id = 0;
uint32_t incoming_frame_size = 0;
uint32_t incoming_stream_id = 0;
grpc_chttp2_stream* incoming_stream = nullptr;
// active parser
@ -412,8 +393,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_write_cb* write_cb_pool = nullptr;
// bdp estimator
bool bdp_ping_blocked =
false; // Is the BDP blocked due to not receiving any data?
grpc_closure next_bdp_ping_timer_expired_locked;
grpc_closure start_bdp_ping_locked;
grpc_closure finish_bdp_ping_locked;
@ -425,19 +404,11 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
// buffer pool state
/// have we scheduled a benign cleanup?
bool benign_reclaimer_registered = false;
/// have we scheduled a destructive cleanup?
bool destructive_reclaimer_registered = false;
/// benign cleanup closure
grpc_closure benign_reclaimer_locked;
/// destructive cleanup closure
grpc_closure destructive_reclaimer_locked;
/// If start_bdp_ping_locked has been called
bool bdp_ping_started = false;
// True if pings should be acked
bool ack_pings = true;
// next bdp ping timer handle
absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
next_bdp_ping_timer_handle;
@ -461,10 +432,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_core::Duration keepalive_time;
/// grace period for a ping to complete before watchdog kicks in
grpc_core::Duration keepalive_timeout;
/// if keepalive pings are allowed when there's no outstanding streams
bool keepalive_permit_without_calls = false;
/// If start_keepalive_ping_locked has been called
bool keepalive_ping_started = false;
/// keep-alive state machine state
grpc_chttp2_keepalive_state keepalive_state;
// Soft limit on max header size.
@ -478,6 +445,12 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// only continue reading when we are able to write to the socket again,
/// thereby reducing the number of induced frames.
uint32_t num_pending_induced_frames = 0;
uint32_t incoming_stream_id = 0;
/// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
///
uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
bool reading_paused_on_pending_induced_frames = false;
/// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
/// the peer
@ -486,6 +459,42 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// covering a write in a pollset. Such closures cannot be scheduled until
/// we can prove that the write got scheduled.
uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
/// have we scheduled a benign cleanup?
bool benign_reclaimer_registered = false;
/// have we scheduled a destructive cleanup?
bool destructive_reclaimer_registered = false;
/// if keepalive pings are allowed when there's no outstanding streams
bool keepalive_permit_without_calls = false;
/// If start_keepalive_ping_locked has been called
bool keepalive_ping_started = false;
// bdp estimator
bool bdp_ping_blocked =
false; // Is the BDP blocked due to not receiving any data?
/// is the transport destroying itself?
uint8_t destroying = false;
/// is there a read request to the endpoint outstanding?
uint8_t endpoint_reading = 1;
/// is this a client?
bool is_client;
/// are the local settings dirty and need to be sent?
bool dirtied_local_settings = true;
/// have local settings been sent?
bool sent_local_settings = false;
/// If start_bdp_ping_locked has been called
bool bdp_ping_started = false;
// True if pings should be acked
bool ack_pings = true;
/// write execution state of the transport
grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
};
typedef enum {
@ -508,7 +517,6 @@ struct grpc_chttp2_stream {
grpc_closure* destroy_stream_arg;
grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
grpc_core::BitSet<STREAM_LIST_COUNT> included;
/// HTTP2 stream id for this stream, or zero if one has not been assigned
uint32_t id = 0;
@ -534,7 +542,6 @@ struct grpc_chttp2_stream {
grpc_metadata_batch* recv_initial_metadata;
grpc_closure* recv_initial_metadata_ready = nullptr;
bool* trailing_metadata_available = nullptr;
bool parsed_trailers_only = false;
absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
uint32_t* recv_message_flags = nullptr;
bool* call_failed_before_recv_message = nullptr;
@ -562,30 +569,25 @@ struct grpc_chttp2_stream {
bool eos_received = false;
bool eos_sent = false;
grpc_core::BitSet<STREAM_LIST_COUNT> included;
/// the error that resulted in this stream being read-closed
grpc_error_handle read_closed_error;
/// the error that resulted in this stream being write-closed
grpc_error_handle write_closed_error;
grpc_published_metadata_method published_metadata[2] = {};
bool final_metadata_requested = false;
grpc_metadata_batch initial_metadata_buffer;
grpc_metadata_batch trailing_metadata_buffer;
grpc_slice_buffer frame_storage; // protected by t combiner
bool received_last_frame = false; // protected by t combiner
grpc_slice_buffer frame_storage; // protected by t combiner
grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
/// how many header frames have we received?
uint8_t header_frames_received = 0;
/// number of bytes received - reset at end of parse thread execution
int64_t received_bytes = 0;
bool sent_initial_metadata = false;
bool sent_trailing_metadata = false;
grpc_core::chttp2::StreamFlowControl flow_control;
grpc_slice_buffer flow_controlled_buffer;
@ -595,13 +597,25 @@ struct grpc_chttp2_stream {
grpc_chttp2_write_cb* finish_after_write = nullptr;
size_t sending_bytes = 0;
/// Whether the bytes needs to be traced using Fathom
bool traced = false;
/// Byte counter for number of bytes written
size_t byte_counter = 0;
// time this stream was created
gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
bool parsed_trailers_only = false;
bool final_metadata_requested = false;
bool received_last_frame = false; // protected by t combiner
/// how many header frames have we received?
uint8_t header_frames_received = 0;
bool sent_initial_metadata = false;
bool sent_trailing_metadata = false;
/// Whether the bytes needs to be traced using Fathom
bool traced = false;
};
/// Transport writing call flow:

@ -479,10 +479,7 @@ struct grpc_tcp {
grpc_endpoint base;
grpc_fd* em_fd;
int fd;
// Used by the endpoint read function to distinguish the very first read call
// from the rest
bool is_first_read;
bool has_posted_reclaimer ABSL_GUARDED_BY(read_mu) = false;
int inq; // bytes pending on the socket from the last read.
double target_length;
double bytes_read_this_round;
grpc_core::RefCount refcount;
@ -490,15 +487,12 @@ struct grpc_tcp {
int min_read_chunk_size;
int max_read_chunk_size;
int set_rcvlowat = 0;
// garbage after the last read
grpc_slice_buffer last_read_buffer;
grpc_core::Mutex read_mu;
grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr;
int inq; // bytes pending on the socket from the last read.
bool inq_capable; // cache whether kernel supports inq
grpc_slice_buffer* outgoing_buffer;
// byte within outgoing_buffer->slices[0] to write next
@ -535,17 +529,26 @@ struct grpc_tcp {
// options for collecting timestamps are set, and is incremented with each
// byte sent.
int bytes_counter;
bool socket_ts_enabled; // True if timestamping options are set on the socket
//
bool ts_capable; // Cache whether we can set timestamping options
int min_progress_size; // A hint from upper layers specifying the minimum
// number of bytes that need to be read to make
// meaningful progress
gpr_atm stop_error_notification; // Set to 1 if we do not want to be notified
// on errors anymore
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
int min_progress_size; // A hint from upper layers specifying the minimum
// number of bytes that need to be read to make
// meaningful progress
int set_rcvlowat = 0;
// Used by the endpoint read function to distinguish the very first read call
// from the rest
bool is_first_read;
bool has_posted_reclaimer ABSL_GUARDED_BY(read_mu) = false;
bool inq_capable; // cache whether kernel supports inq
bool socket_ts_enabled; // True if timestamping options are set on the socket
//
bool ts_capable; // Cache whether we can set timestamping options
};
struct backup_poller {

@ -508,7 +508,6 @@ typedef struct grpc_transport_op {
/// connectivity monitoring - set connectivity_state to NULL to unsubscribe
grpc_core::OrphanablePtr<grpc_core::ConnectivityStateWatcherInterface>
start_connectivity_watch;
grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch =
nullptr;
/// should the transport be disconnected
@ -519,27 +518,11 @@ typedef struct grpc_transport_op {
/// Error contract: the transport that gets this op must cause
/// goaway_error to be unref'ed after processing it
grpc_error_handle goaway_error;
/// set the callback for accepting new streams;
/// this is a permanent callback, unlike the other one-shot closures.
/// If true, the callback is set to set_accept_stream_fn, with its
/// user_data argument set to set_accept_stream_user_data.
/// `set_registered_method_matcher_fn` is also set with its user_data argument
/// set to set_accept_stream_user_data. The transport should invoke
/// `set_registered_method_matcher_fn` after initial metadata is received but
/// before recv_initial_metadata_ready callback is invoked. If the transport
/// detects an error in the stream, invoking
/// `set_registered_method_matcher_fn` can be skipped.
bool set_accept_stream = false;
void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport,
const void* server_data) = nullptr;
void (*set_registered_method_matcher_fn)(
void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
void* set_accept_stream_user_data = nullptr;
/// set the callback for accepting new streams based upon promises;
/// this is a permanent callback, unlike the other one-shot closures.
/// If true, the callback is set to set_make_promise_fn, with its
/// user_data argument set to set_make_promise_data
bool set_make_promise = false;
void (*set_make_promise_fn)(void* user_data, grpc_transport* transport,
const void* server_data) = nullptr;
void* set_make_promise_user_data = nullptr;
@ -555,9 +538,28 @@ typedef struct grpc_transport_op {
/// Called when the ping ack is received
grpc_closure* on_ack = nullptr;
} send_ping;
grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
// If true, will reset the channel's connection backoff.
bool reset_connect_backoff = false;
/// set the callback for accepting new streams;
/// this is a permanent callback, unlike the other one-shot closures.
/// If true, the callback is set to set_accept_stream_fn, with its
/// user_data argument set to set_accept_stream_user_data.
/// `set_registered_method_matcher_fn` is also set with its user_data argument
/// set to set_accept_stream_user_data. The transport should invoke
/// `set_registered_method_matcher_fn` after initial metadata is received but
/// before recv_initial_metadata_ready callback is invoked. If the transport
/// detects an error in the stream, invoking
/// `set_registered_method_matcher_fn` can be skipped.
bool set_accept_stream = false;
/// set the callback for accepting new streams based upon promises;
/// this is a permanent callback, unlike the other one-shot closures.
/// If true, the callback is set to set_make_promise_fn, with its
/// user_data argument set to set_make_promise_data
bool set_make_promise = false;
//**************************************************************************
// remaining fields are initialized and used at the discretion of the
// transport implementation

Loading…
Cancel
Save