diff --git a/grpc.def b/grpc.def index b7ba2c5d10e..345c0a786ce 100644 --- a/grpc.def +++ b/grpc.def @@ -77,6 +77,7 @@ EXPORTS grpc_channelz_get_servers grpc_channelz_get_channel grpc_channelz_get_subchannel + grpc_channelz_get_socket grpc_insecure_channel_create_from_fd grpc_server_add_insecure_channel_from_fd grpc_use_signal diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 787d6ae6d7c..a9beee1c9e1 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -511,6 +511,10 @@ GRPCAPI char* grpc_channelz_get_channel(intptr_t channel_id); is allocated and must be freed by the application. */ GRPCAPI char* grpc_channelz_get_subchannel(intptr_t subchannel_id); +/* Returns a single Socket, or else a NOT_FOUND code. The returned string + is allocated and must be freed by the application. */ +GRPCAPI char* grpc_channelz_get_socket(intptr_t socket_id); + #ifdef __cplusplus } #endif diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 26cad2cc9a6..d3232f4d268 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -157,6 +157,10 @@ bool g_flow_control_enabled = true; static void destruct_transport(grpc_chttp2_transport* t) { size_t i; + if (t->channelz_socket != nullptr) { + t->channelz_socket.reset(); + } + grpc_endpoint_destroy(t->ep); grpc_slice_buffer_destroy_internal(&t->qbuf); @@ -335,6 +339,10 @@ static bool read_channel_args(grpc_chttp2_transport* t, GRPC_ARG_OPTIMIZATION_TARGET, channel_args->args[i].value.string); } + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_ENABLE_CHANNELZ)) { + t->channelz_socket = + grpc_core::MakeRefCounted(); } else { static const struct { const char* channel_arg_name; @@ -720,6 +728,14 @@ static void destroy_stream_locked(void* sp, grpc_error* error) { grpc_chttp2_stream* s = static_cast(sp); grpc_chttp2_transport* t = s->t; + if (t->channelz_socket != nullptr) { + if ((t->is_client && s->eos_received) || (!t->is_client && s->eos_sent)) { + t->channelz_socket->RecordStreamSucceeded(); + } else { + t->channelz_socket->RecordStreamFailed(); + } + } + GPR_ASSERT((s->write_closed && s->read_closed) || s->id == 0); if (s->id != 0) { GPR_ASSERT(grpc_chttp2_stream_map_find(&t->stream_map, s->id) == nullptr); @@ -1407,6 +1423,9 @@ static void perform_stream_op_locked(void* stream_op, } if (op->send_initial_metadata) { + if (t->is_client && t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromLocal(); + } GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA(); GPR_ASSERT(s->send_initial_metadata_finished == nullptr); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -1492,6 +1511,7 @@ static void perform_stream_op_locked(void* stream_op, if (op->send_message) { GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE(); + t->num_messages_in_next_write++; GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE( op->payload->send_message.send_message->length()); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; @@ -2707,6 +2727,9 @@ static void start_keepalive_ping_locked(void* arg, grpc_error* error) { if (error != GRPC_ERROR_NONE) { return; } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordKeepaliveSent(); + } GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); grpc_timer_init(&t->keepalive_watchdog_timer, grpc_core::ExecCtx::Get()->Now() + t->keepalive_timeout, diff --git a/src/core/ext/transport/chttp2/transport/frame_data.cc b/src/core/ext/transport/chttp2/transport/frame_data.cc index f8f06f67891..933b32c03c0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.cc +++ b/src/core/ext/transport/chttp2/transport/frame_data.cc @@ -62,6 +62,7 @@ grpc_error* grpc_chttp2_data_parser_begin_frame(grpc_chttp2_data_parser* parser, if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) { s->received_last_frame = true; + s->eos_received = true; } else { s->received_last_frame = false; } @@ -191,6 +192,9 @@ grpc_error* grpc_deframe_unprocessed_incoming_frames( GPR_ASSERT(stream_out != nullptr); GPR_ASSERT(p->parsing_frame == nullptr); p->frame_size |= (static_cast(*cur)); + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessageReceived(); + } p->state = GRPC_CHTTP2_DATA_FRAME; ++cur; message_flags = 0; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 6b5309bab48..ff26dd9255d 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -36,6 +36,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/ext/transport/chttp2/transport/incoming_metadata.h" #include "src/core/ext/transport/chttp2/transport/stream_map.h" +#include "src/core/lib/channel/channelz.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/gprpp/manual_constructor.h" #include "src/core/lib/iomgr/combiner.h" @@ -471,6 +472,9 @@ struct grpc_chttp2_transport { bool keepalive_permit_without_calls; /** keep-alive state machine state */ grpc_chttp2_keepalive_state keepalive_state; + + grpc_core::RefCountedPtr channelz_socket; + uint32_t num_messages_in_next_write; }; typedef enum { @@ -534,6 +538,10 @@ struct grpc_chttp2_stream { /** Has trailing metadata been received. */ bool received_trailing_metadata; + /* have we sent or received the EOS bit? */ + bool eos_received; + bool eos_sent; + /** the error that resulted in this stream being read-closed */ grpc_error* read_closed_error; /** the error that resulted in this stream being write-closed */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.cc b/src/core/ext/transport/chttp2/transport/parsing.cc index 8e9aa613a6e..f532b084c93 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.cc +++ b/src/core/ext/transport/chttp2/transport/parsing.cc @@ -623,6 +623,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, gpr_log(GPR_ERROR, "grpc_chttp2_stream not accepted")); return init_skip_frame_parser(t, 1); } + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordStreamStartedFromRemote(); + } } else { t->incoming_stream = s; } @@ -636,6 +639,9 @@ static grpc_error* init_header_frame_parser(grpc_chttp2_transport* t, } t->parser = grpc_chttp2_header_parser_parse; t->parser_data = &t->hpack_parser; + if (t->header_eof) { + s->eos_received = true; + } switch (s->header_frames_received) { case 0: if (t->is_client && t->header_eof) { diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc index 8b73b01dea2..d533989444c 100644 --- a/src/core/ext/transport/chttp2/transport/writing.cc +++ b/src/core/ext/transport/chttp2/transport/writing.cc @@ -569,6 +569,7 @@ class StreamWriteContext { void SentLastFrame() { s_->send_trailing_metadata = nullptr; s_->sent_trailing_metadata = true; + s_->eos_sent = true; if (!t_->is_client && !s_->read_closed) { grpc_slice_buffer_add( @@ -632,6 +633,11 @@ void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error* error) { GPR_TIMER_SCOPE("grpc_chttp2_end_write", 0); grpc_chttp2_stream* s; + if (t->channelz_socket != nullptr) { + t->channelz_socket->RecordMessagesSent(t->num_messages_in_next_write); + } + t->num_messages_in_next_write = 0; + while (grpc_chttp2_list_pop_writing_stream(t, &s)) { if (s->sending_bytes != 0) { update_list(t, s, static_cast(s->sending_bytes), diff --git a/src/core/lib/channel/channelz.cc b/src/core/lib/channel/channelz.cc index 375cf25cc66..339c827525c 100644 --- a/src/core/lib/channel/channelz.cc +++ b/src/core/lib/channel/channelz.cc @@ -62,7 +62,7 @@ CallCountingHelper::CallCountingHelper() { CallCountingHelper::~CallCountingHelper() {} void CallCountingHelper::RecordCallStarted() { - gpr_atm_no_barrier_fetch_add(&calls_started_, (gpr_atm)1); + gpr_atm_no_barrier_fetch_add(&calls_started_, static_cast(1)); gpr_atm_no_barrier_store(&last_call_started_millis_, (gpr_atm)ExecCtx::Get()->Now()); } @@ -81,11 +81,13 @@ void CallCountingHelper::PopulateCallCounts(grpc_json* json) { json_iterator = grpc_json_add_number_string_child( json, json_iterator, "callsFailed", calls_failed_); } - gpr_timespec ts = - grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); - json_iterator = - grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", - gpr_format_timespec(ts), GRPC_JSON_STRING, true); + if (calls_started_ != 0) { + gpr_timespec ts = + grpc_millis_to_timespec(last_call_started_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastCallStartedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } } ChannelNode::ChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, @@ -180,7 +182,103 @@ grpc_json* ServerNode::RenderJson() { } // ask CallCountingHelper to populate trace and call count data. call_counter_.PopulateCallCounts(json); + return top_level_json; +} + +SocketNode::SocketNode() : BaseNode(EntityType::kSocket) {} + +void SocketNode::RecordStreamStartedFromLocal() { + gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); + gpr_atm_no_barrier_store(&last_local_stream_created_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordStreamStartedFromRemote() { + gpr_atm_no_barrier_fetch_add(&streams_started_, static_cast(1)); + gpr_atm_no_barrier_store(&last_remote_stream_created_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordMessagesSent(uint32_t num_sent) { + gpr_atm_no_barrier_fetch_add(&messages_sent_, static_cast(num_sent)); + gpr_atm_no_barrier_store(&last_message_sent_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +void SocketNode::RecordMessageReceived() { + gpr_atm_no_barrier_fetch_add(&messages_received_, static_cast(1)); + gpr_atm_no_barrier_store(&last_message_received_millis_, + (gpr_atm)ExecCtx::Get()->Now()); +} + +grpc_json* SocketNode::RenderJson() { + // We need to track these three json objects to build our object + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* json_iterator = nullptr; + // create and fill the ref child + json_iterator = grpc_json_create_child(json_iterator, json, "ref", nullptr, + GRPC_JSON_OBJECT, false); + json = json_iterator; + json_iterator = nullptr; + json_iterator = grpc_json_add_number_string_child(json, json_iterator, + "socketId", uuid()); + // reset json iterators to top level object json = top_level_json; + json_iterator = nullptr; + // create and fill the data child. + grpc_json* data = grpc_json_create_child(json_iterator, json, "data", nullptr, + GRPC_JSON_OBJECT, false); + json = data; + json_iterator = nullptr; + gpr_timespec ts; + if (streams_started_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsStarted", streams_started_); + if (last_local_stream_created_millis_ != 0) { + ts = grpc_millis_to_timespec(last_local_stream_created_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastLocalStreamCreatedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (last_remote_stream_created_millis_ != 0) { + ts = grpc_millis_to_timespec(last_remote_stream_created_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastRemoteStreamCreatedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + } + if (streams_succeeded_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsSucceeded", streams_succeeded_); + } + if (streams_failed_) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "streamsFailed", streams_failed_); + } + if (messages_sent_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "messagesSent", messages_sent_); + ts = grpc_millis_to_timespec(last_message_sent_millis_, GPR_CLOCK_REALTIME); + json_iterator = + grpc_json_create_child(json_iterator, json, "lastMessageSentTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (messages_received_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "messagesReceived", messages_received_); + ts = grpc_millis_to_timespec(last_message_received_millis_, + GPR_CLOCK_REALTIME); + json_iterator = grpc_json_create_child( + json_iterator, json, "lastMessageReceivedTimestamp", + gpr_format_timespec(ts), GRPC_JSON_STRING, true); + } + if (keepalives_sent_ != 0) { + json_iterator = grpc_json_add_number_string_child( + json, json_iterator, "keepAlivesSent", keepalives_sent_); + } return top_level_json; } diff --git a/src/core/lib/channel/channelz.h b/src/core/lib/channel/channelz.h index 9be256147bb..b7ae1012389 100644 --- a/src/core/lib/channel/channelz.h +++ b/src/core/lib/channel/channelz.h @@ -92,10 +92,10 @@ class CallCountingHelper { void RecordCallStarted(); void RecordCallFailed() { - gpr_atm_no_barrier_fetch_add(&calls_failed_, (gpr_atm(1))); + gpr_atm_no_barrier_fetch_add(&calls_failed_, static_cast(1)); } void RecordCallSucceeded() { - gpr_atm_no_barrier_fetch_add(&calls_succeeded_, (gpr_atm(1))); + gpr_atm_no_barrier_fetch_add(&calls_succeeded_, static_cast(1)); } // Common rendering of the call count data and last_call_started_timestamp. @@ -197,11 +197,39 @@ class ServerNode : public BaseNode { }; // Handles channelz bookkeeping for sockets -// TODO(ncteisen): implement in subsequent PR. class SocketNode : public BaseNode { public: - SocketNode() : BaseNode(EntityType::kSocket) {} + SocketNode(); ~SocketNode() override {} + + grpc_json* RenderJson() override; + + void RecordStreamStartedFromLocal(); + void RecordStreamStartedFromRemote(); + void RecordStreamSucceeded() { + gpr_atm_no_barrier_fetch_add(&streams_succeeded_, static_cast(1)); + } + void RecordStreamFailed() { + gpr_atm_no_barrier_fetch_add(&streams_failed_, static_cast(1)); + } + void RecordMessagesSent(uint32_t num_sent); + void RecordMessageReceived(); + void RecordKeepaliveSent() { + gpr_atm_no_barrier_fetch_add(&keepalives_sent_, static_cast(1)); + } + + private: + gpr_atm streams_started_ = 0; + gpr_atm streams_succeeded_ = 0; + gpr_atm streams_failed_ = 0; + gpr_atm messages_sent_ = 0; + gpr_atm messages_received_ = 0; + gpr_atm keepalives_sent_ = 0; + gpr_atm last_local_stream_created_millis_ = 0; + gpr_atm last_remote_stream_created_millis_ = 0; + gpr_atm last_message_sent_millis_ = 0; + gpr_atm last_message_received_millis_ = 0; + UniquePtr peer_string_; }; // Creation functions diff --git a/src/core/lib/channel/channelz_registry.cc b/src/core/lib/channel/channelz_registry.cc index adc7b6ba441..841f1c6104b 100644 --- a/src/core/lib/channel/channelz_registry.cc +++ b/src/core/lib/channel/channelz_registry.cc @@ -197,3 +197,21 @@ char* grpc_channelz_get_subchannel(intptr_t subchannel_id) { grpc_json_destroy(top_level_json); return json_str; } + +char* grpc_channelz_get_socket(intptr_t socket_id) { + grpc_core::channelz::BaseNode* socket_node = + grpc_core::channelz::ChannelzRegistry::Get(socket_id); + if (socket_node == nullptr || + socket_node->type() != + grpc_core::channelz::BaseNode::EntityType::kSocket) { + return nullptr; + } + grpc_json* top_level_json = grpc_json_create(GRPC_JSON_OBJECT); + grpc_json* json = top_level_json; + grpc_json* socket_json = socket_node->RenderJson(); + socket_json->key = "socket"; + grpc_json_link_child(json, socket_json, nullptr); + char* json_str = grpc_json_dump_to_string(top_level_json, 0); + grpc_json_destroy(top_level_json); + return json_str; +} diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 0c46f6c85a1..1e7d7f687f0 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -100,6 +100,7 @@ grpc_channelz_get_top_channels_type grpc_channelz_get_top_channels_import; grpc_channelz_get_servers_type grpc_channelz_get_servers_import; grpc_channelz_get_channel_type grpc_channelz_get_channel_import; grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; +grpc_channelz_get_socket_type grpc_channelz_get_socket_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; grpc_use_signal_type grpc_use_signal_import; @@ -356,6 +357,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channelz_get_servers_import = (grpc_channelz_get_servers_type) GetProcAddress(library, "grpc_channelz_get_servers"); grpc_channelz_get_channel_import = (grpc_channelz_get_channel_type) GetProcAddress(library, "grpc_channelz_get_channel"); grpc_channelz_get_subchannel_import = (grpc_channelz_get_subchannel_type) GetProcAddress(library, "grpc_channelz_get_subchannel"); + grpc_channelz_get_socket_import = (grpc_channelz_get_socket_type) GetProcAddress(library, "grpc_channelz_get_socket"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 8ea5807e4f7..ed4b6264b08 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -275,6 +275,9 @@ extern grpc_channelz_get_channel_type grpc_channelz_get_channel_import; typedef char*(*grpc_channelz_get_subchannel_type)(intptr_t subchannel_id); extern grpc_channelz_get_subchannel_type grpc_channelz_get_subchannel_import; #define grpc_channelz_get_subchannel grpc_channelz_get_subchannel_import +typedef char*(*grpc_channelz_get_socket_type)(intptr_t socket_id); +extern grpc_channelz_get_socket_type grpc_channelz_get_socket_import; +#define grpc_channelz_get_socket grpc_channelz_get_socket_import typedef grpc_channel*(*grpc_insecure_channel_create_from_fd_type)(const char* target, int fd, const grpc_channel_args* args); extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; #define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c index b0af788796f..3ebdb88a08e 100644 --- a/test/core/surface/public_headers_must_be_c89.c +++ b/test/core/surface/public_headers_must_be_c89.c @@ -139,6 +139,7 @@ int main(int argc, char **argv) { printf("%lx", (unsigned long) grpc_channelz_get_servers); printf("%lx", (unsigned long) grpc_channelz_get_channel); printf("%lx", (unsigned long) grpc_channelz_get_subchannel); + printf("%lx", (unsigned long) grpc_channelz_get_socket); printf("%lx", (unsigned long) grpc_auth_property_iterator_next); printf("%lx", (unsigned long) grpc_auth_context_property_iterator); printf("%lx", (unsigned long) grpc_auth_context_peer_identity);