diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index e68a65e6a54..e17858d3998 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -575,7 +575,6 @@ grpc_chttp2_transport::grpc_chttp2_transport( combiner(grpc_combiner_create(event_engine)), state_tracker(is_client ? "client_transport" : "server_transport", GRPC_CHANNEL_READY), - is_client(is_client), next_stream_id(is_client ? 1 : 2), ping_abuse_policy(channel_args), ping_rate_policy(channel_args, is_client), @@ -583,7 +582,8 @@ grpc_chttp2_transport::grpc_chttp2_transport( peer_string.as_string_view(), channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true), &memory_owner), - deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0) { + deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0), + is_client(is_client) { cl = new grpc_core::ContextList(); GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index fade9c9e348..b84aadcf0a0 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -275,17 +275,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_closure* notify_on_receive_settings = nullptr; grpc_closure* notify_on_close = nullptr; - /// write execution state of the transport - grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; - - /// is the transport destroying itself? - uint8_t destroying = false; /// has the upper layer closed the transport? grpc_error_handle closed_with_error; - /// is there a read request to the endpoint outstanding? - uint8_t endpoint_reading = 1; - /// various lists of streams grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {}; @@ -322,26 +314,16 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_slice_buffer outbuf; /// hpack encoding grpc_core::HPackCompressor hpack_compressor; - /// is this a client? - bool is_client; /// data to write next write grpc_slice_buffer qbuf; - /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? - /// - uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; - /// Set to a grpc_error object if a goaway frame is received. By default, set /// to absl::OkStatus() grpc_error_handle goaway_error; grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND; - /// are the local settings dirty and need to be sent? - bool dirtied_local_settings = true; - /// have local settings been sent? - bool sent_local_settings = false; /// bitmask of setting indexes to send out /// Hack: it's common for implementations to assume 65536 bytes initial send /// window -- this should by rights be 0 @@ -396,7 +378,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { bool is_first_frame = true; uint32_t expect_continuation_stream_id = 0; uint32_t incoming_frame_size = 0; - uint32_t incoming_stream_id = 0; grpc_chttp2_stream* incoming_stream = nullptr; // active parser @@ -412,8 +393,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_chttp2_write_cb* write_cb_pool = nullptr; // bdp estimator - bool bdp_ping_blocked = - false; // Is the BDP blocked due to not receiving any data? grpc_closure next_bdp_ping_timer_expired_locked; grpc_closure start_bdp_ping_locked; grpc_closure finish_bdp_ping_locked; @@ -425,19 +404,11 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT; // buffer pool state - /// have we scheduled a benign cleanup? - bool benign_reclaimer_registered = false; - /// have we scheduled a destructive cleanup? - bool destructive_reclaimer_registered = false; /// benign cleanup closure grpc_closure benign_reclaimer_locked; /// destructive cleanup closure grpc_closure destructive_reclaimer_locked; - /// If start_bdp_ping_locked has been called - bool bdp_ping_started = false; - // True if pings should be acked - bool ack_pings = true; // next bdp ping timer handle absl::optional next_bdp_ping_timer_handle; @@ -461,10 +432,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_core::Duration keepalive_time; /// grace period for a ping to complete before watchdog kicks in grpc_core::Duration keepalive_timeout; - /// if keepalive pings are allowed when there's no outstanding streams - bool keepalive_permit_without_calls = false; - /// If start_keepalive_ping_locked has been called - bool keepalive_ping_started = false; /// keep-alive state machine state grpc_chttp2_keepalive_state keepalive_state; // Soft limit on max header size. @@ -478,6 +445,12 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// only continue reading when we are able to write to the socket again, /// thereby reducing the number of induced frames. uint32_t num_pending_induced_frames = 0; + uint32_t incoming_stream_id = 0; + + /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? + /// + uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; + bool reading_paused_on_pending_induced_frames = false; /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to /// the peer @@ -486,6 +459,42 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// covering a write in a pollset. Such closures cannot be scheduled until /// we can prove that the write got scheduled. uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE; + + /// have we scheduled a benign cleanup? + bool benign_reclaimer_registered = false; + /// have we scheduled a destructive cleanup? + bool destructive_reclaimer_registered = false; + + /// if keepalive pings are allowed when there's no outstanding streams + bool keepalive_permit_without_calls = false; + /// If start_keepalive_ping_locked has been called + bool keepalive_ping_started = false; + + // bdp estimator + bool bdp_ping_blocked = + false; // Is the BDP blocked due to not receiving any data? + + /// is the transport destroying itself? + uint8_t destroying = false; + + /// is there a read request to the endpoint outstanding? + uint8_t endpoint_reading = 1; + + /// is this a client? + bool is_client; + + /// are the local settings dirty and need to be sent? + bool dirtied_local_settings = true; + /// have local settings been sent? + bool sent_local_settings = false; + + /// If start_bdp_ping_locked has been called + bool bdp_ping_started = false; + // True if pings should be acked + bool ack_pings = true; + + /// write execution state of the transport + grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; }; typedef enum { @@ -508,7 +517,6 @@ struct grpc_chttp2_stream { grpc_closure* destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; - grpc_core::BitSet included; /// HTTP2 stream id for this stream, or zero if one has not been assigned uint32_t id = 0; @@ -534,7 +542,6 @@ struct grpc_chttp2_stream { grpc_metadata_batch* recv_initial_metadata; grpc_closure* recv_initial_metadata_ready = nullptr; bool* trailing_metadata_available = nullptr; - bool parsed_trailers_only = false; absl::optional* recv_message = nullptr; uint32_t* recv_message_flags = nullptr; bool* call_failed_before_recv_message = nullptr; @@ -562,30 +569,25 @@ struct grpc_chttp2_stream { bool eos_received = false; bool eos_sent = false; + grpc_core::BitSet included; + /// the error that resulted in this stream being read-closed grpc_error_handle read_closed_error; /// the error that resulted in this stream being write-closed grpc_error_handle write_closed_error; grpc_published_metadata_method published_metadata[2] = {}; - bool final_metadata_requested = false; grpc_metadata_batch initial_metadata_buffer; grpc_metadata_batch trailing_metadata_buffer; - grpc_slice_buffer frame_storage; // protected by t combiner - bool received_last_frame = false; // protected by t combiner + grpc_slice_buffer frame_storage; // protected by t combiner grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture(); - /// how many header frames have we received? - uint8_t header_frames_received = 0; /// number of bytes received - reset at end of parse thread execution int64_t received_bytes = 0; - bool sent_initial_metadata = false; - bool sent_trailing_metadata = false; - grpc_core::chttp2::StreamFlowControl flow_control; grpc_slice_buffer flow_controlled_buffer; @@ -595,13 +597,25 @@ struct grpc_chttp2_stream { grpc_chttp2_write_cb* finish_after_write = nullptr; size_t sending_bytes = 0; - /// Whether the bytes needs to be traced using Fathom - bool traced = false; /// Byte counter for number of bytes written size_t byte_counter = 0; // time this stream was created gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC); + + bool parsed_trailers_only = false; + + bool final_metadata_requested = false; + bool received_last_frame = false; // protected by t combiner + + /// how many header frames have we received? + uint8_t header_frames_received = 0; + + bool sent_initial_metadata = false; + bool sent_trailing_metadata = false; + + /// Whether the bytes needs to be traced using Fathom + bool traced = false; }; /// Transport writing call flow: diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index c19cc8fd06f..870b73fe810 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -479,10 +479,7 @@ struct grpc_tcp { grpc_endpoint base; grpc_fd* em_fd; int fd; - // Used by the endpoint read function to distinguish the very first read call - // from the rest - bool is_first_read; - bool has_posted_reclaimer ABSL_GUARDED_BY(read_mu) = false; + int inq; // bytes pending on the socket from the last read. double target_length; double bytes_read_this_round; grpc_core::RefCount refcount; @@ -490,15 +487,12 @@ struct grpc_tcp { int min_read_chunk_size; int max_read_chunk_size; - int set_rcvlowat = 0; // garbage after the last read grpc_slice_buffer last_read_buffer; grpc_core::Mutex read_mu; grpc_slice_buffer* incoming_buffer ABSL_GUARDED_BY(read_mu) = nullptr; - int inq; // bytes pending on the socket from the last read. - bool inq_capable; // cache whether kernel supports inq grpc_slice_buffer* outgoing_buffer; // byte within outgoing_buffer->slices[0] to write next @@ -535,17 +529,26 @@ struct grpc_tcp { // options for collecting timestamps are set, and is incremented with each // byte sent. int bytes_counter; - bool socket_ts_enabled; // True if timestamping options are set on the socket - // - bool ts_capable; // Cache whether we can set timestamping options + + int min_progress_size; // A hint from upper layers specifying the minimum + // number of bytes that need to be read to make + // meaningful progress + gpr_atm stop_error_notification; // Set to 1 if we do not want to be notified // on errors anymore TcpZerocopySendCtx tcp_zerocopy_send_ctx; TcpZerocopySendRecord* current_zerocopy_send = nullptr; - int min_progress_size; // A hint from upper layers specifying the minimum - // number of bytes that need to be read to make - // meaningful progress + int set_rcvlowat = 0; + + // Used by the endpoint read function to distinguish the very first read call + // from the rest + bool is_first_read; + bool has_posted_reclaimer ABSL_GUARDED_BY(read_mu) = false; + bool inq_capable; // cache whether kernel supports inq + bool socket_ts_enabled; // True if timestamping options are set on the socket + // + bool ts_capable; // Cache whether we can set timestamping options }; struct backup_poller { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 933793706e4..a59aec9d974 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -508,7 +508,6 @@ typedef struct grpc_transport_op { /// connectivity monitoring - set connectivity_state to NULL to unsubscribe grpc_core::OrphanablePtr start_connectivity_watch; - grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE; grpc_core::ConnectivityStateWatcherInterface* stop_connectivity_watch = nullptr; /// should the transport be disconnected @@ -519,27 +518,11 @@ typedef struct grpc_transport_op { /// Error contract: the transport that gets this op must cause /// goaway_error to be unref'ed after processing it grpc_error_handle goaway_error; - /// set the callback for accepting new streams; - /// this is a permanent callback, unlike the other one-shot closures. - /// If true, the callback is set to set_accept_stream_fn, with its - /// user_data argument set to set_accept_stream_user_data. - /// `set_registered_method_matcher_fn` is also set with its user_data argument - /// set to set_accept_stream_user_data. The transport should invoke - /// `set_registered_method_matcher_fn` after initial metadata is received but - /// before recv_initial_metadata_ready callback is invoked. If the transport - /// detects an error in the stream, invoking - /// `set_registered_method_matcher_fn` can be skipped. - bool set_accept_stream = false; void (*set_accept_stream_fn)(void* user_data, grpc_transport* transport, const void* server_data) = nullptr; void (*set_registered_method_matcher_fn)( void* user_data, grpc_core::ServerMetadata* metadata) = nullptr; void* set_accept_stream_user_data = nullptr; - /// set the callback for accepting new streams based upon promises; - /// this is a permanent callback, unlike the other one-shot closures. - /// If true, the callback is set to set_make_promise_fn, with its - /// user_data argument set to set_make_promise_data - bool set_make_promise = false; void (*set_make_promise_fn)(void* user_data, grpc_transport* transport, const void* server_data) = nullptr; void* set_make_promise_user_data = nullptr; @@ -555,9 +538,28 @@ typedef struct grpc_transport_op { /// Called when the ping ack is received grpc_closure* on_ack = nullptr; } send_ping; + grpc_connectivity_state start_connectivity_watch_state = GRPC_CHANNEL_IDLE; // If true, will reset the channel's connection backoff. bool reset_connect_backoff = false; + /// set the callback for accepting new streams; + /// this is a permanent callback, unlike the other one-shot closures. + /// If true, the callback is set to set_accept_stream_fn, with its + /// user_data argument set to set_accept_stream_user_data. + /// `set_registered_method_matcher_fn` is also set with its user_data argument + /// set to set_accept_stream_user_data. The transport should invoke + /// `set_registered_method_matcher_fn` after initial metadata is received but + /// before recv_initial_metadata_ready callback is invoked. If the transport + /// detects an error in the stream, invoking + /// `set_registered_method_matcher_fn` can be skipped. + bool set_accept_stream = false; + + /// set the callback for accepting new streams based upon promises; + /// this is a permanent callback, unlike the other one-shot closures. + /// If true, the callback is set to set_make_promise_fn, with its + /// user_data argument set to set_make_promise_data + bool set_make_promise = false; + //************************************************************************** // remaining fields are initialized and used at the discretion of the // transport implementation