Merge Master

pull/37512/head
tanvi-jagtap 3 months ago
commit a2f07f45d6
  1. 4
      BUILD
  2. 1
      include/grpc/event_engine/slice.h
  3. 1
      include/grpc/event_engine/slice_buffer.h
  4. 1
      include/grpcpp/client_context.h
  5. 1
      include/grpcpp/completion_queue.h
  6. 1
      include/grpcpp/impl/interceptor_common.h
  7. 1
      include/grpcpp/impl/metadata_map.h
  8. 1
      include/grpcpp/impl/proto_utils.h
  9. 1
      include/grpcpp/impl/server_callback_handlers.h
  10. 1
      include/grpcpp/impl/service_type.h
  11. 1
      include/grpcpp/impl/sync.h
  12. 1
      include/grpcpp/security/tls_certificate_provider.h
  13. 1
      include/grpcpp/security/tls_certificate_verifier.h
  14. 1
      include/grpcpp/security/tls_credentials_options.h
  15. 1
      include/grpcpp/server_interface.h
  16. 1
      include/grpcpp/support/async_stream.h
  17. 1
      include/grpcpp/support/async_unary_call.h
  18. 1
      include/grpcpp/support/byte_buffer.h
  19. 1
      include/grpcpp/support/callback_common.h
  20. 1
      include/grpcpp/support/client_callback.h
  21. 1
      include/grpcpp/support/client_interceptor.h
  22. 1
      include/grpcpp/support/method_handler.h
  23. 1
      include/grpcpp/support/proto_buffer_reader.h
  24. 1
      include/grpcpp/support/proto_buffer_writer.h
  25. 1
      include/grpcpp/support/server_interceptor.h
  26. 1
      include/grpcpp/support/sync_stream.h
  27. 1
      src/core/BUILD
  28. 1
      src/core/channelz/channelz.cc
  29. 1
      src/core/channelz/channelz_registry.cc
  30. 2
      src/core/client_channel/dynamic_filters.cc
  31. 2
      src/core/client_channel/local_subchannel_pool.cc
  32. 1
      src/core/ext/filters/channel_idle/legacy_channel_idle_filter.cc
  33. 2
      src/core/ext/filters/stateful_session/stateful_session_filter.cc
  34. 7
      src/core/ext/transport/chaotic_good/server_transport.cc
  35. 63
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  36. 6
      src/core/ext/transport/chttp2/transport/parsing.cc
  37. 33
      src/core/ext/transport/chttp2/transport/writing.cc
  38. 1
      src/core/handshaker/security/security_handshaker.cc
  39. 1
      src/core/handshaker/tcp_connect/tcp_connect_handshaker.cc
  40. 25
      src/core/lib/channel/promise_based_filter.cc
  41. 13
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  42. 220
      src/core/lib/event_engine/windows/grpc_polled_fd_windows.cc
  43. 14
      src/core/lib/iomgr/cfstream_handle.cc
  44. 13
      src/core/lib/iomgr/closure.h
  45. 14
      src/core/lib/iomgr/combiner.cc
  46. 31
      src/core/lib/iomgr/endpoint_cfstream.cc
  47. 5
      src/core/lib/iomgr/ev_epoll1_linux.cc
  48. 22
      src/core/lib/iomgr/ev_poll_posix.cc
  49. 16
      src/core/lib/iomgr/event_engine_shims/closure.cc
  50. 15
      src/core/lib/iomgr/exec_ctx.cc
  51. 18
      src/core/lib/iomgr/lockfree_event.cc
  52. 19
      src/core/lib/iomgr/tcp_client_cfstream.cc
  53. 111
      src/core/lib/iomgr/timer_generic.cc
  54. 88
      src/core/lib/promise/detail/join_state.h
  55. 2457
      src/core/lib/promise/detail/seq_state.h
  56. 38
      src/core/lib/promise/interceptor_list.h
  57. 7
      src/core/lib/promise/pipe.h
  58. 20
      src/core/lib/security/authorization/grpc_server_authz_filter.cc
  59. 7
      src/core/lib/transport/call_filters.cc
  60. 14
      src/core/lib/transport/transport.h
  61. 14
      src/core/load_balancing/grpclb/grpclb.cc
  62. 66
      src/core/load_balancing/pick_first/pick_first.cc
  63. 1
      src/core/load_balancing/round_robin/round_robin.cc
  64. 1
      src/core/load_balancing/weighted_round_robin/static_stride_scheduler.cc
  65. 1
      src/core/load_balancing/xds/xds_cluster_impl.cc
  66. 1
      src/core/load_balancing/xds/xds_wrr_locality.cc
  67. 2
      src/core/resolver/binder/binder_resolver.cc
  68. 1
      src/core/resolver/dns/c_ares/dns_resolver_ares.cc
  69. 2
      src/core/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  70. 1
      src/core/resolver/dns/c_ares/grpc_ares_ev_driver_windows.cc
  71. 1
      src/core/resolver/dns/dns_resolver_plugin.cc
  72. 79
      src/core/resolver/dns/event_engine/event_engine_client_channel_resolver.cc
  73. 22
      src/core/resolver/dns/native/dns_resolver.cc
  74. 1
      src/core/resolver/endpoint_addresses.cc
  75. 1
      src/core/resolver/fake/fake_resolver.cc
  76. 1
      src/core/server/server_config_selector_filter.cc
  77. 1
      src/core/service_config/service_config_channel_arg_filter.cc
  78. 1
      src/core/telemetry/call_tracer.cc
  79. 1
      src/core/tsi/alts/handshaker/alts_shared_resource.cc
  80. 1
      src/core/tsi/alts/zero_copy_frame_protector/alts_iovec_record_protocol.cc
  81. 1
      src/core/tsi/ssl/session_cache/ssl_session_cache.cc
  82. 44
      src/core/util/http_client/httpcli.cc
  83. 19
      src/core/util/http_client/httpcli.h
  84. 7
      src/core/util/http_client/parser.cc
  85. 1
      src/core/xds/grpc/xds_certificate_provider.cc
  86. 1
      src/core/xds/grpc/xds_cluster_specifier_plugin.cc
  87. 1
      src/core/xds/grpc/xds_http_filter_registry.cc
  88. 1
      src/core/xds/grpc/xds_routing.cc
  89. 7
      src/cpp/ext/gcp/environment_autodetect.cc
  90. 1
      src/cpp/server/load_reporter/load_reporter_async_service_impl.h
  91. 2
      src/python/grpcio_observability/grpc_observability/observability_util.cc
  92. 1
      test/core/end2end/end2end_tests.h
  93. 1
      test/core/end2end/fixtures/h2_oauth2_common.h
  94. 1
      test/core/end2end/fixtures/h2_ssl_cred_reload_fixture.h
  95. 1
      test/core/end2end/fixtures/h2_ssl_tls_common.h
  96. 1
      test/core/end2end/fixtures/h2_tls_common.h
  97. 1
      test/core/end2end/fixtures/secure_fixture.h
  98. 1
      test/core/end2end/fixtures/sockpair_fixture.h
  99. 1
      test/core/end2end/fuzzers/fuzzing_common.h
  100. 1
      test/core/event_engine/test_suite/event_engine_test_framework.h
  101. Some files were not shown because too many files have changed in this diff Show More

@ -3943,7 +3943,6 @@ grpc_cc_library(
deps = [
"config",
"debug_location",
"event_engine_base_hdrs",
"exec_ctx",
"gpr",
"grpc_base",
@ -3955,16 +3954,17 @@ grpc_cc_library(
"orphanable",
"ref_counted_ptr",
"resource_quota_api",
"sockaddr_utils",
"uri_parser",
"//src/core:channel_args",
"//src/core:channel_args_preconditioning",
"//src/core:closure",
"//src/core:error",
"//src/core:error_utils",
"//src/core:event_engine_tcp_socket_utils",
"//src/core:handshaker_registry",
"//src/core:iomgr_fwd",
"//src/core:pollset_set",
"//src/core:resolved_address",
"//src/core:resource_quota",
"//src/core:slice",
"//src/core:slice_refcount",

@ -25,7 +25,6 @@
#include <grpc/event_engine/internal/slice_cast.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
// This public slice definition largely based of the internal grpc_core::Slice

@ -28,7 +28,6 @@
#include <grpc/impl/codegen/slice.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {

@ -42,7 +42,6 @@
#include <grpc/impl/compression_types.h>
#include <grpc/impl/propagation_bits.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/create_auth_context.h>
#include <grpcpp/impl/metadata_map.h>
#include <grpcpp/impl/rpc_method.h>

@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <grpcpp/impl/codegen/rpc_service_method.h>
#include <grpcpp/impl/codegen/status.h>

@ -25,7 +25,6 @@
#include "absl/log/absl_check.h"
#include <grpc/impl/grpc_types.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set_interface.h>
#include <grpcpp/impl/intercepted_channel.h>

@ -22,7 +22,6 @@
#include <map>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpcpp/support/slice.h>
namespace grpc {

@ -26,7 +26,6 @@
#include <grpc/byte_buffer_reader.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config_protobuf.h>
#include <grpcpp/impl/serialization_traits.h>
#include <grpcpp/support/byte_buffer.h>

@ -22,7 +22,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/message_allocator.h>

@ -21,7 +21,6 @@
#include "absl/log/absl_check.h"
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/impl/serialization_traits.h>
#include <grpcpp/server_interface.h>

@ -30,7 +30,6 @@
#include "absl/log/absl_check.h"
#include "absl/synchronization/mutex.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>

@ -24,7 +24,6 @@
#include <grpc/grpc_security.h>
#include <grpc/grpc_security_constants.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpcpp/support/config.h>
namespace grpc {

@ -25,7 +25,6 @@
#include <grpc/grpc_security_constants.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/grpc_library.h>
#include <grpcpp/impl/sync.h>
#include <grpcpp/support/config.h>

@ -25,7 +25,6 @@
#include <grpc/grpc_security.h>
#include <grpc/grpc_security_constants.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include <grpcpp/security/tls_certificate_provider.h>
#include <grpcpp/security/tls_certificate_verifier.h>
#include <grpcpp/security/tls_crl_provider.h>

@ -23,7 +23,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>
#include <grpcpp/impl/call.h>

@ -22,7 +22,6 @@
#include "absl/log/absl_check.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/channel_interface.h>
#include <grpcpp/impl/service_type.h>

@ -22,7 +22,6 @@
#include "absl/log/absl_check.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpcpp/client_context.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>

@ -23,7 +23,6 @@
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/serialization_traits.h>
#include <grpcpp/support/config.h>
#include <grpcpp/support/slice.h>

@ -25,7 +25,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/codegen/channel_interface.h>
#include <grpcpp/impl/completion_queue_tag.h>

@ -26,7 +26,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/call.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/call.h>
#include <grpcpp/impl/call_op_set.h>
#include <grpcpp/impl/sync.h>

@ -24,7 +24,6 @@
#include "absl/log/absl_check.h"
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/interceptor.h>
#include <grpcpp/support/string_ref.h>

@ -22,7 +22,6 @@
#include "absl/log/absl_check.h"
#include <grpc/byte_buffer.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_service_method.h>
#include <grpcpp/support/byte_buffer.h>
#include <grpcpp/support/sync_stream.h>

@ -28,7 +28,6 @@
#include <grpc/byte_buffer_reader.h>
#include <grpc/impl/grpc_types.h>
#include <grpc/slice.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config_protobuf.h>
#include <grpcpp/impl/serialization_traits.h>
#include <grpcpp/support/byte_buffer.h>

@ -28,7 +28,6 @@
#include <grpc/impl/grpc_types.h>
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/log.h>
#include <grpcpp/impl/codegen/config_protobuf.h>
#include <grpcpp/impl/serialization_traits.h>
#include <grpcpp/support/byte_buffer.h>

@ -24,7 +24,6 @@
#include "absl/log/absl_check.h"
#include <grpc/support/log.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/interceptor.h>
#include <grpcpp/support/string_ref.h>

@ -21,7 +21,6 @@
#include "absl/log/absl_check.h"
#include <grpc/support/log.h>
#include <grpcpp/client_context.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/call.h>

@ -954,6 +954,7 @@ grpc_cc_library(
grpc_cc_library(
name = "seq",
external_deps = ["absl/log:log"],
language = "c++",
public_hdrs = [
"lib/promise/seq.h",

@ -29,7 +29,6 @@
#include "absl/strings/strip.h"
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/time.h>

@ -29,7 +29,6 @@
#include <grpc/grpc.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>

@ -27,8 +27,6 @@
#include "absl/log/log.h"
#include "absl/status/statusor.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/channel_stack_builder_impl.h"

@ -24,8 +24,6 @@
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include "src/core/client_channel/subchannel.h"
namespace grpc_core {

@ -29,7 +29,6 @@
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/promise_based_filter.h"

@ -37,8 +37,6 @@
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include "src/core/ext/filters/stateful_session/stateful_session_service_config_parser.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"

@ -203,10 +203,9 @@ auto ChaoticGoodServerTransport::CallOutboundLoop(
Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
call_initiator),
[stream_id](absl::Status main_body_result) {
if (GRPC_TRACE_FLAG_ENABLED(chaotic_good)) {
VLOG(2) << "CHAOTIC_GOOD: CallOutboundLoop: stream_id="
<< stream_id << " main_body_result=" << main_body_result;
}
GRPC_TRACE_VLOG(chaotic_good, 2)
<< "CHAOTIC_GOOD: CallOutboundLoop: stream_id=" << stream_id
<< " main_body_result=" << main_body_result;
return Empty{};
}),
call_initiator.PullServerTrailingMetadata(),

@ -1308,15 +1308,14 @@ void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
return;
}
closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
if (GRPC_TRACE_FLAG_ENABLED(http)) {
LOG(INFO) << "complete_closure_step: t=" << t << " " << closure << " refs="
<< (closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT)
<< " flags="
<< (closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT)
<< " desc=" << desc << " err=" << grpc_core::StatusToString(error)
<< " write_state=" << write_state_name(t->write_state)
<< " whence=" << whence.file() << ":" << whence.line();
}
GRPC_TRACE_LOG(http, INFO)
<< "complete_closure_step: t=" << t << " " << closure << " refs="
<< (closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT)
<< " flags="
<< (closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT)
<< " desc=" << desc << " err=" << grpc_core::StatusToString(error)
<< " write_state=" << write_state_name(t->write_state)
<< " whence=" << whence.file() << ":" << whence.line();
if (!error.ok()) {
grpc_error_handle cl_err =
@ -2085,11 +2084,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
// Lambda is immediately invoked as a big scoped section that can be
// exited out of at any point by returning.
[&]() {
if (GRPC_TRACE_FLAG_ENABLED(http)) {
VLOG(2) << "maybe_complete_recv_message " << s
<< " final_metadata_requested=" << s->final_metadata_requested
<< " seen_error=" << s->seen_error;
}
GRPC_TRACE_VLOG(http, 2)
<< "maybe_complete_recv_message " << s
<< " final_metadata_requested=" << s->final_metadata_requested
<< " seen_error=" << s->seen_error;
if (s->final_metadata_requested && s->seen_error) {
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
s->recv_message->reset();
@ -2100,11 +2098,10 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
int64_t min_progress_size;
auto r = grpc_deframe_unprocessed_incoming_frames(
s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
if (GRPC_TRACE_FLAG_ENABLED(http)) {
VLOG(2) << "Deframe data frame: "
<< grpc_core::PollToString(
r, [](absl::Status r) { return r.ToString(); });
}
GRPC_TRACE_VLOG(http, 2)
<< "Deframe data frame: "
<< grpc_core::PollToString(
r, [](absl::Status r) { return r.ToString(); });
if (r.pending()) {
if (s->read_closed) {
grpc_slice_buffer_reset_and_unref(&s->frame_storage);
@ -2155,13 +2152,12 @@ void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
grpc_chttp2_stream* s) {
grpc_chttp2_maybe_complete_recv_message(t, s);
if (GRPC_TRACE_FLAG_ENABLED(http)) {
VLOG(2) << "maybe_complete_recv_trailing_metadata cli=" << t->is_client
<< " s=" << s << " closure=" << s->recv_trailing_metadata_finished
<< " read_closed=" << s->read_closed
<< " write_closed=" << s->write_closed << " "
<< s->frame_storage.length;
}
GRPC_TRACE_VLOG(http, 2) << "maybe_complete_recv_trailing_metadata cli="
<< t->is_client << " s=" << s
<< " closure=" << s->recv_trailing_metadata_finished
<< " read_closed=" << s->read_closed
<< " write_closed=" << s->write_closed << " "
<< s->frame_storage.length;
if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
s->write_closed) {
if (s->seen_error || !t->is_client) {
@ -2365,15 +2361,12 @@ grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error) {
grpc_chttp2_transport::RemovedStreamHandle rsh;
if (GRPC_TRACE_FLAG_ENABLED(http)) {
VLOG(2) << "MARK_STREAM_CLOSED: t=" << t << " s=" << s << "(id=" << s->id
<< ") "
<< ((close_reads && close_writes)
? "read+write"
: (close_reads ? "read"
: (close_writes ? "write" : "nothing??")))
<< " [" << grpc_core::StatusToString(error) << "]";
}
GRPC_TRACE_VLOG(http, 2)
<< "MARK_STREAM_CLOSED: t=" << t << " s=" << s << "(id=" << s->id << ") "
<< ((close_reads && close_writes)
? "read+write"
: (close_reads ? "read" : (close_writes ? "write" : "nothing??")))
<< " [" << grpc_core::StatusToString(error) << "]";
if (s->read_closed && s->write_closed) {
// already closed, but we should still fake the status if needed.
grpc_error_handle overall_error = removal_error(error, s, "Stream removed");

@ -883,10 +883,8 @@ static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
if (GPR_LIKELY(err.ok())) {
return err;
}
if (GRPC_TRACE_FLAG_ENABLED(http)) {
LOG(ERROR) << "INCOMING[" << t << ";" << s << "]: Parse failed with "
<< err;
}
GRPC_TRACE_LOG(http, ERROR)
<< "INCOMING[" << t << ";" << s << "]: Parse failed with " << err;
if (grpc_error_get_int(err, grpc_core::StatusIntProperty::kStreamId,
&unused)) {
grpc_chttp2_parsing_become_skip_parser(t);

@ -204,24 +204,21 @@ static bool update_list(grpc_chttp2_transport* t, int64_t send_bytes,
static void report_stall(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
const char* staller) {
if (GRPC_TRACE_FLAG_ENABLED(flowctl)) {
VLOG(2) << t->peer_string.as_string_view() << ":" << t << " stream "
<< s->id << " moved to stalled list by " << staller
<< ". This is FULLY expected to happen in a healthy program that "
"is not seeing flow control stalls. However, if you know that "
"there are unwanted stalls, here is some helpful data: "
"[fc:pending="
<< s->flow_controlled_buffer.length
<< ":flowed=" << s->flow_controlled_bytes_flowed
<< ":peer_initwin=" << t->settings.acked().initial_window_size()
<< ":t_win=" << t->flow_control.remote_window() << ":s_win="
<< static_cast<uint32_t>(
std::max(int64_t{0},
s->flow_control.remote_window_delta() +
static_cast<int64_t>(
t->settings.peer().initial_window_size())))
<< ":s_delta=" << s->flow_control.remote_window_delta() << "]";
}
GRPC_TRACE_VLOG(flowctl, 2)
<< t->peer_string.as_string_view() << ":" << t << " stream " << s->id
<< " moved to stalled list by " << staller
<< ". This is FULLY expected to happen in a healthy program that is not "
"seeing flow control stalls. However, if you know that there are "
"unwanted stalls, here is some helpful data: [fc:pending="
<< s->flow_controlled_buffer.length
<< ":flowed=" << s->flow_controlled_bytes_flowed
<< ":peer_initwin=" << t->settings.acked().initial_window_size()
<< ":t_win=" << t->flow_control.remote_window() << ":s_win="
<< static_cast<uint32_t>(std::max(
int64_t{0}, s->flow_control.remote_window_delta() +
static_cast<int64_t>(
t->settings.peer().initial_window_size())))
<< ":s_delta=" << s->flow_control.remote_window_delta() << "]";
}
namespace {

@ -41,7 +41,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/channelz/channelz.h"

@ -31,7 +31,6 @@
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/handshaker/handshaker.h"

@ -2111,15 +2111,13 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
// Handle cancellation.
void ServerCallData::Completed(grpc_error_handle error,
bool tarpit_cancellation, Flusher* flusher) {
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
VLOG(2) << LogTag() << "ServerCallData::Completed: send_trailing_state="
<< StateString(send_trailing_state_) << " send_initial_state="
<< (send_initial_metadata_ == nullptr
? "null"
: SendInitialMetadata::StateString(
send_initial_metadata_->state))
<< " error=" << error;
}
GRPC_TRACE_VLOG(channel, 2)
<< LogTag() << "ServerCallData::Completed: send_trailing_state="
<< StateString(send_trailing_state_) << " send_initial_state="
<< (send_initial_metadata_ == nullptr
? "null"
: SendInitialMetadata::StateString(send_initial_metadata_->state))
<< " error=" << error;
// Track the latest reason for cancellation.
cancelled_error_ = error;
// Stop running the promise.
@ -2388,11 +2386,10 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
flusher,
send_initial_metadata_ == nullptr ||
send_initial_metadata_->state == SendInitialMetadata::kForwarded);
if (GRPC_TRACE_FLAG_ENABLED(channel)) {
VLOG(2) << LogTag() << ": After send_message WakeInsideCombiner "
<< DebugString() << " is_idle=" << send_message()->IsIdle()
<< " is_forwarded=" << send_message()->IsForwarded();
}
GRPC_TRACE_VLOG(channel, 2)
<< LogTag() << ": After send_message WakeInsideCombiner "
<< DebugString() << " is_idle=" << send_message()->IsIdle()
<< " is_forwarded=" << send_message()->IsForwarded();
if (send_trailing_state_ == SendTrailingState::kQueuedBehindSendMessage &&
(send_message()->IsIdle() ||
(send_trailing_metadata_batch_->send_message &&

@ -118,17 +118,13 @@ void TimerManager::Shutdown() {
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_) return;
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TimerManager::" << this << " shutting down";
}
GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutting down";
shutdown_ = true;
// Wait on the main loop to exit.
cv_wait_.Signal();
}
main_loop_exit_signal_->WaitForNotification();
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TimerManager::" << this << " shutdown complete";
}
GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutdown complete";
}
TimerManager::~TimerManager() { Shutdown(); }
@ -144,9 +140,8 @@ void TimerManager::Kick() {
void TimerManager::RestartPostFork() {
grpc_core::MutexLock lock(&mu_);
CHECK(GPR_LIKELY(shutdown_));
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TimerManager::" << this << " restarting after shutdown";
}
GRPC_TRACE_VLOG(timer, 2)
<< "TimerManager::" << this << " restarting after shutdown";
shutdown_ = false;
main_loop_exit_signal_.emplace();
thread_pool_->Run([this]() { MainLoop(); });

@ -53,14 +53,6 @@ namespace grpc_event_engine {
namespace experimental {
namespace {
#define GRPC_ARES_RESOLVER_TRACE_LOG(format, ...) \
do { \
if (GRPC_TRACE_FLAG_ENABLED(cares_resolver)) { \
LOG(INFO) << "(EventEngine c-ares resolver) " \
<< absl::StrFormat(format, __VA_ARGS__); \
} \
} while (0)
constexpr int kRecvFromSourceAddrSize = 200;
constexpr int kReadBufferSize = 4192;
@ -136,9 +128,9 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
event_engine_(event_engine) {}
~GrpcPolledFdWindows() override {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ~GrpcPolledFdWindows shutdown_called_: %d ", GetName(),
shutdown_called_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| ~GrpcPolledFdWindows shutdown_called_: " << shutdown_called_;
grpc_core::CSliceUnref(read_buf_);
grpc_core::CSliceUnref(write_buf_);
CHECK(read_closure_ == nullptr);
@ -166,14 +158,16 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
void RegisterForOnWriteableLocked(
absl::AnyInvocable<void(absl::Status)> write_closure) override {
if (socket_type_ == SOCK_DGRAM) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called";
} else {
CHECK(socket_type_ == SOCK_STREAM);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RegisterForOnWriteableLocked called tcp_write_state_: %d "
"connect_done_: %d",
GetName(), tcp_write_state_, connect_done_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| RegisterForOnWriteableLocked called tcp_write_state_: "
<< static_cast<int>(tcp_write_state_)
<< " connect_done_: " << connect_done_;
}
CHECK(write_closure_ == nullptr);
write_closure_ = std::move(write_closure);
@ -192,7 +186,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
if (!absl::IsCancelled(error)) {
return false;
}
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| ShutdownLocked", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO) << "(EventEngine c-ares resolver) fd:|"
<< GetName() << "| ShutdownLocked";
shutdown_called_ = true;
// The socket is disconnected and closed here since this is an external
// cancel request, e.g. a timeout. c-ares shouldn't do anything on the
@ -211,10 +206,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
ares_ssize_t RecvFrom(WSAErrorContext* wsa_error_ctx, void* data,
ares_socket_t data_len, int /* flags */,
struct sockaddr* from, ares_socklen_t* from_len) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| RecvFrom called read_buf_has_data:%d Current read buf "
"length:|%d|",
GetName(), read_buf_has_data_, GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " RecvFrom called read_buf_has_data:" << read_buf_has_data_
<< " Current read buf length:" << GRPC_SLICE_LENGTH(read_buf_);
if (!read_buf_has_data_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -241,9 +236,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
ares_ssize_t SendV(WSAErrorContext* wsa_error_ctx, const struct iovec* iov,
int iov_count) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendV called connect_done_:%d wsa_connect_error_:%d",
GetName(), connect_done_, wsa_connect_error_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| SendV called connect_done_:" << connect_done_
<< " wsa_connect_error_:" << wsa_connect_error_;
if (!connect_done_) {
wsa_error_ctx->SetWSAError(WSAEWOULDBLOCK);
return -1;
@ -296,10 +292,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
}
void ContinueRegisterForOnReadableLocked() {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnReadableLocked wsa_connect_error_:"
<< wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullReadClosure(GRPC_WSA_ERROR(wsa_connect_error_, "connect"));
@ -317,11 +313,11 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
winsocket_->read_info()->overlapped(), nullptr) != 0) {
int wsa_last_error = WSAGetLastError();
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnReadableLocked WSARecvFrom error "
"code:|%d| "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " ContinueRegisterForOnReadableLocked WSARecvFrom error "
"code:"
<< wsa_last_error << " msg:" << msg;
gpr_free(msg);
if (wsa_last_error != WSA_IO_PENDING) {
winsocket_->UnregisterReadCallback();
@ -333,10 +329,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
}
void ContinueRegisterForOnWriteableLocked() {
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| ContinueRegisterForOnWriteableLocked "
"wsa_connect_error_:%d",
GetName(), wsa_connect_error_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| ContinueRegisterForOnWriteableLocked wsa_connect_error_:"
<< wsa_connect_error_;
CHECK(connect_done_);
if (wsa_connect_error_ != 0) {
ScheduleAndNullWriteClosure(
@ -380,12 +376,12 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int out = WSASend(winsocket_->raw_socket(), &buf, 1, bytes_sent_ptr, flags,
overlapped, nullptr);
*wsa_error_code = WSAGetLastError();
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendWriteBuf WSASend buf.len:%d *bytes_sent_ptr:%d "
"overlapped:%p "
"return:%d *wsa_error_code:%d",
GetName(), buf.len, bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0,
overlapped, out, *wsa_error_code);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " SendWriteBuf WSASend buf.len:" << buf.len << " *bytes_sent_ptr:"
<< (bytes_sent_ptr != nullptr ? *bytes_sent_ptr : 0)
<< " overlapped:" << overlapped << " return:" << out
<< " *wsa_error_code:" << *wsa_error_code;
return out;
}
@ -394,7 +390,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
// c-ares doesn't handle retryable errors on writes of UDP sockets.
// Therefore, the sendv handler for UDP sockets must only attempt
// to write everything inline.
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVUDP called", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO) << "(EventEngine c-ares resolver) fd:|"
<< GetName() << "| SendVUDP called";
CHECK_EQ(GRPC_SLICE_LENGTH(write_buf_), 0);
grpc_core::CSliceUnref(write_buf_);
write_buf_ = FlattenIovec(iov, iov_count);
@ -405,9 +402,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
write_buf_ = grpc_empty_slice();
wsa_error_ctx->SetWSAError(wsa_error_code);
char* msg = gpr_format_message(wsa_error_code);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| SendVUDP SendWriteBuf error code:%d msg:|%s|", GetName(),
wsa_error_code, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| SendVUDP SendWriteBuf error code:" << wsa_error_code << " msg:|"
<< msg << "|";
gpr_free(msg);
return -1;
}
@ -423,8 +421,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
// out in the background, and making further send progress in general, will
// happen as long as c-ares continues to show interest in writeability on
// this fd.
GRPC_ARES_RESOLVER_TRACE_LOG("fd:|%s| SendVTCP called tcp_write_state_:%d",
GetName(), tcp_write_state_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| SendVTCP called tcp_write_state_:"
<< static_cast<int>(tcp_write_state_);
switch (tcp_write_state_) {
case WRITE_IDLE:
tcp_write_state_ = WRITE_REQUESTED;
@ -461,12 +461,12 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
void OnTcpConnect() {
grpc_core::MutexLock lock(mu_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked "
"pending_register_for_readable:%d"
" pending_register_for_writeable:%d",
GetName(), pending_continue_register_for_on_readable_locked_,
pending_continue_register_for_on_writeable_locked_);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked pending_register_for_readable:"
<< pending_continue_register_for_on_readable_locked_
<< " pending_register_for_writeable:"
<< pending_continue_register_for_on_writeable_locked_;
CHECK(!connect_done_);
connect_done_ = true;
CHECK_EQ(wsa_connect_error_, 0);
@ -482,10 +482,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
if (!wsa_success) {
wsa_connect_error_ = WSAGetLastError();
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s InnerOnTcpConnectLocked WSA overlapped result code:%d "
"msg:|%s|",
GetName(), wsa_connect_error_, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " InnerOnTcpConnectLocked WSA overlapped result code:"
<< wsa_connect_error_ << " msg:|" << msg << "|";
gpr_free(msg);
}
}
@ -499,7 +499,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int ConnectUDP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectUDP", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName() << " ConnectUDP";
CHECK(!connect_done_);
CHECK_EQ(wsa_connect_error_, 0);
SOCKET s = winsocket_->raw_socket();
@ -509,8 +510,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
wsa_error_ctx->SetWSAError(wsa_connect_error_);
connect_done_ = true;
char* msg = gpr_format_message(wsa_connect_error_);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s WSAConnect error code:|%d| msg:|%s|",
GetName(), wsa_connect_error_, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " WSAConnect error code:|" << wsa_connect_error_ << "| msg:|" << msg
<< "|";
gpr_free(msg);
// c-ares expects a posix-style connect API
return out == 0 ? 0 : -1;
@ -518,7 +521,8 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int ConnectTCP(WSAErrorContext* wsa_error_ctx, const struct sockaddr* target,
ares_socklen_t target_len) {
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectTCP", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName() << " ConnectTCP";
LPFN_CONNECTEX ConnectEx;
GUID guid = WSAID_CONNECTEX;
DWORD ioctl_num_bytes;
@ -529,10 +533,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:%s WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:%d "
"msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) error code:"
<< wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -552,8 +556,9 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s bind error code:%d msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " bind error code:" << wsa_last_error << " msg:|" << msg << "|";
gpr_free(msg);
connect_done_ = true;
wsa_connect_error_ = wsa_last_error;
@ -569,8 +574,10 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
int wsa_last_error = WSAGetLastError();
wsa_error_ctx->SetWSAError(wsa_last_error);
char* msg = gpr_format_message(wsa_last_error);
GRPC_ARES_RESOLVER_TRACE_LOG("fd:%s ConnectEx error code:%d msg:|%s|",
GetName(), wsa_last_error, msg);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << GetName()
<< " ConnectEx error code:" << wsa_last_error << " msg:|" << msg
<< "|";
gpr_free(msg);
if (wsa_last_error == WSA_IO_PENDING) {
// c-ares only understands WSAEINPROGRESS and EWOULDBLOCK error codes on
@ -605,11 +612,12 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
if (winsocket_->read_info()->result().wsa_error != WSAEMSGSIZE) {
error = GRPC_WSA_ERROR(winsocket_->read_info()->result().wsa_error,
"OnIocpReadableInner");
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->read_info()->result().wsa_error,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadableInner winsocket_->read_info.wsa_error "
"code:|"
<< winsocket_->read_info()->result().wsa_error << "| msg:|"
<< grpc_core::StatusToString(error) << "|";
}
}
if (error.ok()) {
@ -620,34 +628,40 @@ class GrpcPolledFdWindows : public GrpcPolledFd {
grpc_core::CSliceUnref(read_buf_);
read_buf_ = grpc_empty_slice();
}
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpReadable finishing. read buf length now:|%d|", GetName(),
GRPC_SLICE_LENGTH(read_buf_));
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| OnIocpReadable finishing. read buf length now:|"
<< GRPC_SLICE_LENGTH(read_buf_) << "|";
ScheduleAndNullReadClosure(error);
}
void OnIocpWriteable() {
grpc_core::MutexLock lock(mu_);
GRPC_ARES_RESOLVER_TRACE_LOG("OnIocpWriteableInner. fd:|%s|", GetName());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) OnIocpWriteableInner. fd:|"
<< GetName() << "|";
CHECK(socket_type_ == SOCK_STREAM);
absl::Status error;
if (winsocket_->write_info()->result().wsa_error != 0) {
error = GRPC_WSA_ERROR(winsocket_->write_info()->result().wsa_error,
"OnIocpWriteableInner");
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|%d| msg:|%s|",
GetName(), winsocket_->write_info()->result().wsa_error,
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. winsocket_->write_info.wsa_error "
"code:|"
<< winsocket_->write_info()->result().wsa_error << "| msg:|"
<< grpc_core::StatusToString(error) << "|";
}
CHECK(tcp_write_state_ == WRITE_PENDING);
if (error.ok()) {
tcp_write_state_ = WRITE_WAITING_FOR_VERIFICATION_UPON_RETRY;
write_buf_ = grpc_slice_sub_no_ref(
write_buf_, 0, winsocket_->write_info()->result().bytes_transferred);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| OnIocpWriteableInner. bytes transferred:%d", GetName(),
winsocket_->write_info()->result().bytes_transferred);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:|" << GetName()
<< "| OnIocpWriteableInner. bytes transferred:"
<< winsocket_->write_info()->result().bytes_transferred;
} else {
grpc_core::CSliceUnref(write_buf_);
write_buf_ = grpc_empty_slice();
@ -694,8 +708,10 @@ class CustomSockFuncs {
public:
static ares_socket_t Socket(int af, int type, int protocol, void* user_data) {
if (type != SOCK_DGRAM && type != SOCK_STREAM) {
GRPC_ARES_RESOLVER_TRACE_LOG("Socket called with invalid socket type:%d",
type);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) Socket called with invalid socket "
"type:"
<< type;
return INVALID_SOCKET;
}
GrpcPolledFdFactoryWindows* self =
@ -703,24 +719,26 @@ class CustomSockFuncs {
SOCKET s = WSASocket(af, type, protocol, nullptr, 0,
IOCP::GetDefaultSocketFlags());
if (s == INVALID_SOCKET) {
GRPC_ARES_RESOLVER_TRACE_LOG(
"WSASocket failed with params af:%d type:%d protocol:%d", af, type,
protocol);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) WSASocket failed with params af:"
<< af << " type:" << type << " protocol:" << protocol;
return INVALID_SOCKET;
}
if (type == SOCK_STREAM) {
absl::Status error = PrepareSocket(s);
if (!error.ok()) {
GRPC_ARES_RESOLVER_TRACE_LOG("WSAIoctl failed with error: %s",
grpc_core::StatusToString(error).c_str());
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) WSAIoctl failed with error: "
<< grpc_core::StatusToString(error);
return INVALID_SOCKET;
}
}
auto polled_fd = std::make_unique<GrpcPolledFdWindows>(
self->iocp_->Watch(s), self->mu_, af, type, self->event_engine_);
GRPC_ARES_RESOLVER_TRACE_LOG(
"fd:|%s| created with params af:%d type:%d protocol:%d",
polled_fd->GetName(), af, type, protocol);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) fd:" << polled_fd->GetName()
<< " created with params af:" << af << " type:" << type
<< " protocol:" << protocol;
CHECK(self->sockets_.insert({s, std::move(polled_fd)}).second);
return s;
}
@ -758,7 +776,9 @@ class CustomSockFuncs {
}
static int CloseSocket(SOCKET s, void*) {
GRPC_ARES_RESOLVER_TRACE_LOG("c-ares socket: %d CloseSocket", s);
GRPC_TRACE_LOG(cares_resolver, INFO)
<< "(EventEngine c-ares resolver) c-ares socket: " << s
<< " CloseSocket";
return 0;
}
};

@ -65,10 +65,9 @@ void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
grpc_error_handle error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream ReadCallback (" << handle << ", " << stream << ", "
<< type << ", " << client_callback_info << ")";
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream ReadCallback (" << handle << ", "
<< stream << ", " << type << ", "
<< client_callback_info << ")";
switch (type) {
case kCFStreamEventOpenCompleted:
handle->open_event_.SetReady();
@ -99,10 +98,9 @@ void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
grpc_error_handle error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream WriteCallback (" << handle << ", " << stream << ", "
<< type << ", " << clientCallBackInfo << ")";
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream WriteCallback (" << handle << ", "
<< stream << ", " << type << ", "
<< clientCallBackInfo << ")";
switch (type) {
case kCFStreamEventOpenCompleted:
handle->open_event_.SetReady();

@ -292,18 +292,15 @@ class Closure {
return;
}
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "running closure " << closure << ": created ["
<< closure->file_created << ":" << closure->line_created
<< "]: run [" << location.file() << ":" << location.line() << "]";
}
GRPC_TRACE_VLOG(closure, 2)
<< "running closure " << closure << ": created ["
<< closure->file_created << ":" << closure->line_created << "]: run ["
<< location.file() << ":" << location.line() << "]";
CHECK_NE(closure->cb, nullptr);
#endif
closure->cb(closure->cb_arg, error);
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "closure " << closure << " finished";
}
GRPC_TRACE_VLOG(closure, 2) << "closure " << closure << " finished";
#endif
}
};

@ -71,14 +71,12 @@ static void start_destroy(grpc_core::Combiner* lock) {
}
#ifndef NDEBUG
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
if (GRPC_TRACE_FLAG_ENABLED(combiner)) { \
VLOG(2).AtLocation(file, line) \
<< "C:" << lock << " " << (op) << " " \
<< gpr_atm_no_barrier_load(&lock->refs.count) << " --> " \
<< gpr_atm_no_barrier_load(&lock->refs.count) + (delta) << " " \
<< reason; \
}
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
GRPC_TRACE_VLOG(combiner, 2).AtLocation(file, line) \
<< "C:" << lock << " " << (op) << " " \
<< gpr_atm_no_barrier_load(&lock->refs.count) << " --> " \
<< gpr_atm_no_barrier_load(&lock->refs.count) + (delta) << " " \
<< reason;
#else
#define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif

@ -129,11 +129,10 @@ static void CallReadCb(CFStreamEndpoint* ep, grpc_error_handle error) {
}
static void CallWriteCb(CFStreamEndpoint* ep, grpc_error_handle error) {
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream endpoint:" << ep << " call_write_cb " << ep->write_cb
<< " " << ep->write_cb->cb << ":" << ep->write_cb->cb_arg;
VLOG(2) << "write: error=" << grpc_core::StatusToString(error);
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep << " call_write_cb "
<< ep->write_cb << " " << ep->write_cb->cb << ":"
<< ep->write_cb->cb_arg << "write: error="
<< grpc_core::StatusToString(error);
grpc_closure* cb = ep->write_cb;
ep->write_cb = nullptr;
ep->write_slices = nullptr;
@ -233,10 +232,9 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, bool /*urgent*/,
int /*min_progress_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream endpoint:" << ep_impl << " read (" << slices << ", "
<< cb << ") length:" << slices->length;
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " read ("
<< slices << ", " << cb
<< ") length:" << slices->length;
CHECK_EQ(ep_impl->read_cb, nullptr);
ep_impl->read_cb = cb;
ep_impl->read_slices = slices;
@ -251,10 +249,9 @@ static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb, void* /*arg*/,
int /*max_frame_size*/) {
CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream endpoint:" << ep_impl << " write (" << slices << ", "
<< cb << ") length:" << slices->length;
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " write ("
<< slices << ", " << cb
<< ") length:" << slices->length;
CHECK_EQ(ep_impl->write_cb, nullptr);
ep_impl->write_cb = cb;
ep_impl->write_slices = slices;
@ -308,11 +305,9 @@ grpc_endpoint* grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,
const char* peer_string,
CFStreamHandle* stream_sync) {
CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CFStream endpoint:" << ep_impl
<< " create readStream:" << read_stream
<< " writeStream: " << write_stream;
}
GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl
<< " create readStream:" << read_stream
<< " writeStream: " << write_stream;
ep_impl->base.vtable = &vtable;
gpr_ref_init(&ep_impl->refcount, 1);
ep_impl->read_stream = read_stream;

@ -360,9 +360,8 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
fork_fd_list_add_grpc_fd(new_fd);
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(fd_refcount)) {
VLOG(2) << "FD " << fd << " " << new_fd << " create " << fd_name;
}
GRPC_TRACE_VLOG(fd_refcount, 2)
<< "FD " << fd << " " << new_fd << " create " << fd_name;
#endif
struct epoll_event ev;

@ -332,12 +332,11 @@ static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
if (GRPC_TRACE_FLAG_ENABLED(fd_refcount)) {
VLOG(2) << "FD " << fd->fd << " " << fd << " ref " << n << " "
<< gpr_atm_no_barrier_load(&fd->refst) << " -> "
<< gpr_atm_no_barrier_load(&fd->refst) + n << " [" << reason << "; "
<< file << ":" << line << "]";
}
GRPC_TRACE_VLOG(fd_refcount, 2)
<< "FD " << fd->fd << " " << fd << " ref " << n << " "
<< gpr_atm_no_barrier_load(&fd->refst) << " -> "
<< gpr_atm_no_barrier_load(&fd->refst) + n << " [" << reason << "; "
<< file << ":" << line << "]";
#else
#define REF_BY(fd, n, reason) \
do { \
@ -357,12 +356,11 @@ static void ref_by(grpc_fd* fd, int n) {
#ifndef NDEBUG
static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
int line) {
if (GRPC_TRACE_FLAG_ENABLED(fd_refcount)) {
VLOG(2) << "FD " << fd->fd << " " << fd << " unref " << n << " "
<< gpr_atm_no_barrier_load(&fd->refst) << " -> "
<< gpr_atm_no_barrier_load(&fd->refst) - n << " [" << reason << "; "
<< file << ":" << line << "]";
}
GRPC_TRACE_VLOG(fd_refcount, 2)
<< "FD " << fd->fd << " " << fd << " unref " << n << " "
<< gpr_atm_no_barrier_load(&fd->refst) << " -> "
<< gpr_atm_no_barrier_load(&fd->refst) - n << " [" << reason << "; "
<< file << ":" << line << "]";
#else
static void unref_by(grpc_fd* fd, int n) {
#endif

@ -35,18 +35,16 @@ void RunEventEngineClosure(grpc_closure* closure, grpc_error_handle error) {
grpc_core::ExecCtx exec_ctx;
#ifndef NDEBUG
closure->scheduled = false;
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "EventEngine: running closure " << closure << ": created ["
<< closure->file_created << ":" << closure->line_created
<< "]: " << (closure->run ? "run" : "scheduled") << " ["
<< closure->file_initiated << ":" << closure->line_initiated << "]";
}
GRPC_TRACE_VLOG(closure, 2)
<< "EventEngine: running closure " << closure << ": created ["
<< closure->file_created << ":" << closure->line_created
<< "]: " << (closure->run ? "run" : "scheduled") << " ["
<< closure->file_initiated << ":" << closure->line_initiated << "]";
#endif
closure->cb(closure->cb_arg, error);
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "EventEngine: closure " << closure << " finished";
}
GRPC_TRACE_VLOG(closure, 2)
<< "EventEngine: closure " << closure << " finished";
#endif
}

@ -32,21 +32,18 @@
static void exec_ctx_run(grpc_closure* closure) {
#ifndef NDEBUG
closure->scheduled = false;
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "running closure " << closure << ": created ["
<< closure->file_created << ":" << closure->line_created
<< "]: " << (closure->run ? "run" : "scheduled") << " ["
<< closure->file_initiated << ":" << closure->line_initiated << "]";
}
GRPC_TRACE_VLOG(closure, 2)
<< "running closure " << closure << ": created [" << closure->file_created
<< ":" << closure->line_created
<< "]: " << (closure->run ? "run" : "scheduled") << " ["
<< closure->file_initiated << ":" << closure->line_initiated << "]";
#endif
grpc_error_handle error =
grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
closure->error_data.error = 0;
closure->cb(closure->cb_arg, std::move(error));
#ifndef NDEBUG
if (GRPC_TRACE_FLAG_ENABLED(closure)) {
VLOG(2) << "closure " << closure << " finished";
}
GRPC_TRACE_VLOG(closure, 2) << "closure " << closure << " finished";
#endif
}

@ -95,10 +95,8 @@ void LockfreeEvent::NotifyOn(grpc_closure* closure) {
// sure that the shutdown error has been initialized properly before us
// referencing it.
gpr_atm curr = gpr_atm_acq_load(&state_);
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
VLOG(2) << "LockfreeEvent::NotifyOn: " << this << " curr=" << curr
<< " closure=" << closure;
}
GRPC_TRACE_VLOG(polling, 2) << "LockfreeEvent::NotifyOn: " << this
<< " curr=" << curr << " closure=" << closure;
switch (curr) {
case kClosureNotReady: {
// kClosureNotReady -> <closure>.
@ -163,10 +161,9 @@ bool LockfreeEvent::SetShutdown(grpc_error_handle shutdown_error) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
VLOG(2) << "LockfreeEvent::SetShutdown: " << &state_ << " curr=" << curr
<< " err=" << StatusToString(shutdown_error);
}
GRPC_TRACE_VLOG(polling, 2)
<< "LockfreeEvent::SetShutdown: " << &state_ << " curr=" << curr
<< " err=" << StatusToString(shutdown_error);
switch (curr) {
case kClosureReady:
case kClosureNotReady:
@ -212,9 +209,8 @@ void LockfreeEvent::SetReady() {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(&state_);
if (GRPC_TRACE_FLAG_ENABLED(polling)) {
VLOG(2) << "LockfreeEvent::SetReady: " << &state_ << " curr=" << curr;
}
GRPC_TRACE_VLOG(polling, 2)
<< "LockfreeEvent::SetReady: " << &state_ << " curr=" << curr;
switch (curr) {
case kClosureReady: {

@ -78,10 +78,8 @@ static void CFStreamConnectCleanup(CFStreamConnect* connect) {
static void OnAlarm(void* arg, grpc_error_handle error) {
CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CLIENT_CONNECT :" << connect
<< " OnAlarm, error:" << grpc_core::StatusToString(error);
}
GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnAlarm, error:"
<< grpc_core::StatusToString(error);
gpr_mu_lock(&connect->mu);
grpc_closure* closure = connect->closure;
connect->closure = nil;
@ -99,10 +97,8 @@ static void OnAlarm(void* arg, grpc_error_handle error) {
static void OnOpen(void* arg, grpc_error_handle error) {
CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CLIENT_CONNECT :" << connect
<< " OnOpen, error:" << grpc_core::StatusToString(error);
}
GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT :" << connect << " OnOpen, error:"
<< grpc_core::StatusToString(error);
gpr_mu_lock(&connect->mu);
grpc_timer_cancel(&connect->alarm);
grpc_closure* closure = connect->closure;
@ -173,10 +169,9 @@ static int64_t CFStreamClientConnect(
gpr_ref_init(&connect->refcount, 1);
gpr_mu_init(&connect->mu);
if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
VLOG(2) << "CLIENT_CONNECT: " << connect << ", " << connect->addr_name
<< ": asynchronously connecting";
}
GRPC_TRACE_VLOG(tcp, 2) << "CLIENT_CONNECT: " << connect << ", "
<< connect->addr_name
<< ": asynchronously connecting";
CFReadStreamRef read_stream;
CFWriteStreamRef write_stream;

@ -334,12 +334,11 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
timer->hash_table_next = nullptr;
#endif
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TIMER " << timer << ": SET "
<< deadline.milliseconds_after_process_epoch() << " now "
<< grpc_core::Timestamp::Now().milliseconds_after_process_epoch()
<< " call " << closure << "[" << closure->cb << "]";
}
GRPC_TRACE_VLOG(timer, 2)
<< "TIMER " << timer << ": SET "
<< deadline.milliseconds_after_process_epoch() << " now "
<< grpc_core::Timestamp::Now().milliseconds_after_process_epoch()
<< " call " << closure << "[" << closure->cb << "]";
if (!g_shared_mutables.initialized) {
timer->pending = false;
@ -370,12 +369,11 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
timer->heap_index = INVALID_HEAP_INDEX;
list_join(&shard->list, timer);
}
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << " .. add to shard " << (shard - g_shards)
<< " with queue_deadline_cap="
<< shard->queue_deadline_cap.milliseconds_after_process_epoch()
<< " => is_first_timer=" << (is_first_timer ? "true" : "false");
}
GRPC_TRACE_VLOG(timer, 2)
<< " .. add to shard " << (shard - g_shards)
<< " with queue_deadline_cap="
<< shard->queue_deadline_cap.milliseconds_after_process_epoch()
<< " => is_first_timer=" << (is_first_timer ? "true" : "false");
gpr_mu_unlock(&shard->mu);
// Deadline may have decreased, we need to adjust the main queue. Note
@ -391,10 +389,9 @@ static void timer_init(grpc_timer* timer, grpc_core::Timestamp deadline,
// grpc_timer_check.
if (is_first_timer) {
gpr_mu_lock(&g_shared_mutables.mu);
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << " .. old shard min_deadline="
<< shard->min_deadline.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer, 2)
<< " .. old shard min_deadline="
<< shard->min_deadline.milliseconds_after_process_epoch();
if (deadline < shard->min_deadline) {
grpc_core::Timestamp old_min_deadline = g_shard_queue[0]->min_deadline;
shard->min_deadline = deadline;
@ -433,10 +430,9 @@ static void timer_cancel(grpc_timer* timer) {
timer_shard* shard = &g_shards[grpc_core::HashPointer(timer, g_num_shards)];
gpr_mu_lock(&shard->mu);
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TIMER " << timer
<< ": CANCEL pending=" << (timer->pending ? "true" : "false");
}
GRPC_TRACE_VLOG(timer, 2)
<< "TIMER " << timer
<< ": CANCEL pending=" << (timer->pending ? "true" : "false");
if (timer->pending) {
REMOVE_FROM_HASH_TABLE(timer);
@ -474,11 +470,9 @@ static bool refill_heap(timer_shard* shard, grpc_core::Timestamp now) {
std::max(now, shard->queue_deadline_cap) +
grpc_core::Duration::FromSecondsAsDouble(deadline_delta);
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << " .. shard[" << (shard - g_shards)
<< "]->queue_deadline_cap --> "
<< shard->queue_deadline_cap.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. shard[" << (shard - g_shards) << "]->queue_deadline_cap --> "
<< shard->queue_deadline_cap.milliseconds_after_process_epoch();
for (timer = shard->list.next; timer != &shard->list; timer = next) {
next = timer->next;
auto timer_deadline =
@ -486,11 +480,9 @@ static bool refill_heap(timer_shard* shard, grpc_core::Timestamp now) {
timer->deadline);
if (timer_deadline < shard->queue_deadline_cap) {
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << " .. add timer with deadline "
<< timer_deadline.milliseconds_after_process_epoch()
<< " to heap";
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. add timer with deadline "
<< timer_deadline.milliseconds_after_process_epoch() << " to heap";
list_remove(timer);
grpc_timer_heap_add(&shard->heap, timer);
}
@ -504,10 +496,9 @@ static bool refill_heap(timer_shard* shard, grpc_core::Timestamp now) {
static grpc_timer* pop_one(timer_shard* shard, grpc_core::Timestamp now) {
grpc_timer* timer;
for (;;) {
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << " .. shard[" << (shard - g_shards) << "]: heap_empty="
<< (grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. shard[" << (shard - g_shards) << "]: heap_empty="
<< (grpc_timer_heap_is_empty(&shard->heap) ? "true" : "false");
if (grpc_timer_heap_is_empty(&shard->heap)) {
if (now < shard->queue_deadline_cap) return nullptr;
if (!refill_heap(shard, now)) return nullptr;
@ -516,16 +507,13 @@ static grpc_timer* pop_one(timer_shard* shard, grpc_core::Timestamp now) {
auto timer_deadline =
grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
timer->deadline);
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << " .. check top timer deadline="
<< timer_deadline.milliseconds_after_process_epoch()
<< " now=" << now.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. check top timer deadline="
<< timer_deadline.milliseconds_after_process_epoch()
<< " now=" << now.milliseconds_after_process_epoch();
if (timer_deadline > now) return nullptr;
if (GRPC_TRACE_FLAG_ENABLED(timer)) {
VLOG(2) << "TIMER " << timer << ": FIRE "
<< (now - timer_deadline).millis() << "ms late";
}
GRPC_TRACE_VLOG(timer, 2) << "TIMER " << timer << ": FIRE "
<< (now - timer_deadline).millis() << "ms late";
timer->pending = false;
grpc_timer_heap_pop(&shard->heap);
return timer;
@ -546,9 +534,8 @@ static size_t pop_timers(timer_shard* shard, grpc_core::Timestamp now,
}
*new_min_deadline = compute_min_deadline(shard);
gpr_mu_unlock(&shard->mu);
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << " .. shard[" << (shard - g_shards) << "] popped " << n;
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. shard[" << (shard - g_shards) << "] popped " << n;
return n;
}
@ -584,12 +571,10 @@ static grpc_timer_check_result run_some_expired_timers(
gpr_mu_lock(&g_shared_mutables.mu);
result = GRPC_TIMERS_CHECKED_AND_EMPTY;
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2)
<< " .. shard[" << (g_shard_queue[0] - g_shards)
<< "]->min_deadline = "
<< g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. shard[" << (g_shard_queue[0] - g_shards)
<< "]->min_deadline = "
<< g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch();
while (g_shard_queue[0]->min_deadline < now ||
(now != grpc_core::Timestamp::InfFuture() &&
@ -603,14 +588,12 @@ static grpc_timer_check_result run_some_expired_timers(
result = GRPC_TIMERS_FIRED;
}
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2)
<< " .. result --> " << result << ", shard["
<< (g_shard_queue[0] - g_shards) << "]->min_deadline "
<< g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch()
<< " --> " << new_min_deadline.milliseconds_after_process_epoch()
<< ", now=" << now.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer_check, 2)
<< " .. result --> " << result << ", shard["
<< (g_shard_queue[0] - g_shards) << "]->min_deadline "
<< g_shard_queue[0]->min_deadline.milliseconds_after_process_epoch()
<< " --> " << new_min_deadline.milliseconds_after_process_epoch()
<< ", now=" << now.milliseconds_after_process_epoch();
// An grpc_timer_init() on the shard could intervene here, adding a new
// timer that is earlier than new_min_deadline. However,
@ -660,11 +643,9 @@ static grpc_timer_check_result timer_check(grpc_core::Timestamp* next) {
if (next != nullptr) {
*next = std::min(*next, min_timer);
}
if (GRPC_TRACE_FLAG_ENABLED(timer_check)) {
VLOG(2) << "TIMER CHECK SKIP: now="
<< now.milliseconds_after_process_epoch()
<< " min_timer=" << min_timer.milliseconds_after_process_epoch();
}
GRPC_TRACE_VLOG(timer_check, 2)
<< "TIMER CHECK SKIP: now=" << now.milliseconds_after_process_epoch()
<< " min_timer=" << min_timer.milliseconds_after_process_epoch();
return GRPC_TIMERS_CHECKED_AND_EMPTY;
}

@ -103,7 +103,7 @@ struct JoinState<Traits, P0, P1> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/2 already ready";
}
@ -125,7 +125,7 @@ struct JoinState<Traits, P0, P1> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/2 already ready";
}
@ -216,7 +216,7 @@ struct JoinState<Traits, P0, P1, P2> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/3 already ready";
}
@ -238,7 +238,7 @@ struct JoinState<Traits, P0, P1, P2> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/3 already ready";
}
@ -260,7 +260,7 @@ struct JoinState<Traits, P0, P1, P2> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/3 already ready";
}
@ -367,7 +367,7 @@ struct JoinState<Traits, P0, P1, P2, P3> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/4 already ready";
}
@ -389,7 +389,7 @@ struct JoinState<Traits, P0, P1, P2, P3> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/4 already ready";
}
@ -411,7 +411,7 @@ struct JoinState<Traits, P0, P1, P2, P3> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/4 already ready";
}
@ -433,7 +433,7 @@ struct JoinState<Traits, P0, P1, P2, P3> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/4 already ready";
}
@ -555,7 +555,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/5 already ready";
}
@ -577,7 +577,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/5 already ready";
}
@ -599,7 +599,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/5 already ready";
}
@ -621,7 +621,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/5 already ready";
}
@ -643,7 +643,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 5/5 already ready";
}
@ -780,7 +780,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/6 already ready";
}
@ -802,7 +802,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/6 already ready";
}
@ -824,7 +824,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/6 already ready";
}
@ -846,7 +846,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/6 already ready";
}
@ -868,7 +868,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 5/6 already ready";
}
@ -890,7 +890,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 6/6 already ready";
}
@ -1042,7 +1042,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/7 already ready";
}
@ -1064,7 +1064,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/7 already ready";
}
@ -1086,7 +1086,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/7 already ready";
}
@ -1108,7 +1108,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/7 already ready";
}
@ -1130,7 +1130,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 5/7 already ready";
}
@ -1152,7 +1152,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 6/7 already ready";
}
@ -1174,7 +1174,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 7/7 already ready";
}
@ -1341,7 +1341,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/8 already ready";
}
@ -1363,7 +1363,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/8 already ready";
}
@ -1385,7 +1385,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/8 already ready";
}
@ -1407,7 +1407,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/8 already ready";
}
@ -1429,7 +1429,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 5/8 already ready";
}
@ -1451,7 +1451,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 6/8 already ready";
}
@ -1473,7 +1473,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 7/8 already ready";
}
@ -1495,7 +1495,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 8/8 already ready";
}
@ -1677,7 +1677,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 1/9 already ready";
}
@ -1699,7 +1699,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 2/9 already ready";
}
@ -1721,7 +1721,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 3/9 already ready";
}
@ -1743,7 +1743,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 4/9 already ready";
}
@ -1765,7 +1765,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 5/9 already ready";
}
@ -1787,7 +1787,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 6/9 already ready";
}
@ -1809,7 +1809,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 7/9 already ready";
}
@ -1831,7 +1831,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 8/9 already ready";
}
@ -1853,7 +1853,7 @@ struct JoinState<Traits, P0, P1, P2, P3, P4, P5, P6, P7, P8> {
return Traits::template EarlyReturn<Result>(std::move(*p));
}
}
} else if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
} else {
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "join[" << this << "]: joint 9/9 already ready";
}

File diff suppressed because it is too large Load Diff

@ -87,10 +87,8 @@ class InterceptorList {
public:
RunPromise(size_t memory_required, Map** factory, absl::optional<T> value) {
if (!value.has_value() || *factory == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this
<< "]: create immediate";
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this << "]: create immediate";
is_immediately_resolved_ = true;
Construct(&result_, std::move(value));
} else {
@ -100,17 +98,15 @@ class InterceptorList {
async_resolution_.space.get());
async_resolution_.current_factory = *factory;
async_resolution_.first_factory = factory;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this
<< "]: create async; mem=" << async_resolution_.space.get();
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this
<< "]: create async; mem=" << async_resolution_.space.get();
}
}
~RunPromise() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this << "]: destroy";
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this << "]: destroy";
if (is_immediately_resolved_) {
Destruct(&result_);
} else {
@ -127,10 +123,9 @@ class InterceptorList {
RunPromise(RunPromise&& other) noexcept
: is_immediately_resolved_(other.is_immediately_resolved_) {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this << "]: move from "
<< &other;
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this << "]: move from "
<< &other;
if (is_immediately_resolved_) {
Construct(&result_, std::move(other.result_));
} else {
@ -141,10 +136,8 @@ class InterceptorList {
RunPromise& operator=(RunPromise&& other) noexcept = delete;
Poll<absl::optional<T>> operator()() {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this
<< "]: " << DebugString();
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this << "]: " << DebugString();
if (is_immediately_resolved_) return std::move(result_);
while (true) {
if (*async_resolution_.first_factory == nullptr) {
@ -159,10 +152,9 @@ class InterceptorList {
async_resolution_.current_factory =
async_resolution_.current_factory->next();
if (!p->has_value()) async_resolution_.current_factory = nullptr;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << "InterceptorList::RunPromise[" << this
<< "]: " << DebugString();
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< "InterceptorList::RunPromise[" << this
<< "]: " << DebugString();
if (async_resolution_.current_factory == nullptr) {
return std::move(*p);
}

@ -634,10 +634,9 @@ class Push {
Poll<bool> operator()() {
if (center_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2) << GetContext<Activity>()->DebugTag()
<< " Pipe push has a null center";
}
GRPC_TRACE_VLOG(promise_primitives, 2)
<< GetContext<Activity>()->DebugTag()
<< " Pipe push has a null center";
return false;
}
if (auto* p = absl::get_if<T>(&state_)) {

@ -64,13 +64,12 @@ GrpcServerAuthzFilter::Create(const ChannelArgs& args, ChannelFilter::Args) {
bool GrpcServerAuthzFilter::IsAuthorized(ClientMetadata& initial_metadata) {
EvaluateArgs args(&initial_metadata, &per_channel_evaluate_args_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_authz_api)) {
VLOG(2) << "checking request: url_path=" << args.GetPath()
<< ", transport_security_type=" << args.GetTransportSecurityType()
<< ", uri_sans=[" << absl::StrJoin(args.GetUriSans(), ",")
<< "], dns_sans=[" << absl::StrJoin(args.GetDnsSans(), ",")
<< "], subject=" << args.GetSubject();
}
GRPC_TRACE_VLOG(grpc_authz_api, 2)
<< "checking request: url_path=" << args.GetPath()
<< ", transport_security_type=" << args.GetTransportSecurityType()
<< ", uri_sans=[" << absl::StrJoin(args.GetUriSans(), ",")
<< "], dns_sans=[" << absl::StrJoin(args.GetDnsSans(), ",")
<< "], subject=" << args.GetSubject();
grpc_authorization_policy_provider::AuthorizationEngines engines =
provider_->engines();
if (engines.deny_engine != nullptr) {
@ -87,10 +86,9 @@ bool GrpcServerAuthzFilter::IsAuthorized(ClientMetadata& initial_metadata) {
AuthorizationEngine::Decision decision =
engines.allow_engine->Evaluate(args);
if (decision.type == AuthorizationEngine::Decision::Type::kAllow) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_authz_api)) {
VLOG(2) << "chand=" << this << ": request allowed by policy "
<< decision.matching_policy_name;
}
GRPC_TRACE_VLOG(grpc_authz_api, 2)
<< "chand=" << this << ": request allowed by policy "
<< decision.matching_policy_name;
return true;
}
}

@ -188,10 +188,9 @@ void CallFilters::Finalize(const grpc_call_final_info* final_info) {
void CallFilters::CancelDueToFailedPipeOperation(SourceLocation but_where) {
// We expect something cancelled before now
if (push_server_trailing_metadata_ == nullptr) return;
if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) {
VLOG(2).AtLocation(but_where.file(), but_where.line())
<< "Cancelling due to failed pipe operation: " << DebugString();
}
GRPC_TRACE_VLOG(promise_primitives, 2)
.AtLocation(but_where.file(), but_where.line())
<< "Cancelling due to failed pipe operation: " << DebugString();
auto status =
ServerMetadataFromStatus(GRPC_STATUS_CANCELLED, "Failed pipe operation");
status->Set(GrpcCallWasCancelled(), true);

@ -190,10 +190,9 @@ void grpc_stream_ref_init(grpc_stream_refcount* refcount, int initial_refs,
#ifndef NDEBUG
inline void grpc_stream_ref(grpc_stream_refcount* refcount,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(stream_refcount)) {
VLOG(2) << refcount->object_type << " " << refcount << ":"
<< refcount->destroy.cb_arg << " REF " << reason;
}
GRPC_TRACE_VLOG(stream_refcount, 2)
<< refcount->object_type << " " << refcount << ":"
<< refcount->destroy.cb_arg << " REF " << reason;
refcount->refs.RefNonZero(DEBUG_LOCATION, reason);
}
#else
@ -207,10 +206,9 @@ void grpc_stream_destroy(grpc_stream_refcount* refcount);
#ifndef NDEBUG
inline void grpc_stream_unref(grpc_stream_refcount* refcount,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(stream_refcount)) {
VLOG(2) << refcount->object_type << " " << refcount << ":"
<< refcount->destroy.cb_arg << " UNREF " << reason;
}
GRPC_TRACE_VLOG(stream_refcount, 2)
<< refcount->object_type << " " << refcount << ":"
<< refcount->destroy.cb_arg << " UNREF " << reason;
if (GPR_UNLIKELY(refcount->refs.Unref(DEBUG_LOCATION, reason))) {
grpc_stream_destroy(refcount);
}

@ -851,14 +851,12 @@ void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
parent()->lb_calld_->client_stats() != nullptr) {
client_stats = parent()->lb_calld_->client_stats()->Ref();
}
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << parent() << " helper " << this
<< "] state=" << ConnectivityStateName(state) << " ("
<< status.ToString() << ") wrapping child picker " << picker.get()
<< " (serverlist=" << serverlist.get()
<< ", client_stats=" << client_stats.get() << ")";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << parent() << " helper " << this
<< "] state=" << ConnectivityStateName(state) << " (" << status.ToString()
<< ") wrapping child picker " << picker.get()
<< " (serverlist=" << serverlist.get()
<< ", client_stats=" << client_stats.get() << ")";
parent()->channel_control_helper()->UpdateState(
state, status,
MakeRefCounted<Picker>(std::move(serverlist), std::move(picker),

@ -790,22 +790,21 @@ PickFirst::SubchannelList::SubchannelData::SubchannelData(
void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
grpc_connectivity_state new_state, absl::Status status) {
PickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << p << "] subchannel list " << subchannel_list_
<< " index " << index_ << " of " << subchannel_list_->size()
<< " (subchannel_state " << subchannel_state_.get()
<< "): connectivity changed: old_state="
<< (connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A")
<< ", new_state=" << ConnectivityStateName(new_state)
<< ", status=" << status
<< ", seen_transient_failure=" << seen_transient_failure_
<< ", p->selected_=" << p->selected_.get()
<< ", p->subchannel_list_=" << p->subchannel_list_.get()
<< ", p->subchannel_list_->shutting_down_="
<< p->subchannel_list_->shutting_down_;
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
<< index_ << " of " << subchannel_list_->size() << " (subchannel_state "
<< subchannel_state_.get() << "): connectivity changed: old_state="
<< (connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A")
<< ", new_state=" << ConnectivityStateName(new_state)
<< ", status=" << status
<< ", seen_transient_failure=" << seen_transient_failure_
<< ", p->selected_=" << p->selected_.get()
<< ", p->subchannel_list_=" << p->subchannel_list_.get()
<< ", p->subchannel_list_->shutting_down_="
<< p->subchannel_list_->shutting_down_;
if (subchannel_list_->shutting_down_) return;
// The notification must be for a subchannel in the current list.
CHECK(subchannel_list_ == p->subchannel_list_.get());
@ -1596,24 +1595,23 @@ void OldPickFirst::SubchannelList::SubchannelData::ShutdownLocked() {
void OldPickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
grpc_connectivity_state new_state, absl::Status status) {
OldPickFirst* p = subchannel_list_->policy_.get();
if (GRPC_TRACE_FLAG_ENABLED(pick_first)) {
LOG(INFO) << "[PF " << p << "] subchannel list " << subchannel_list_
<< " index " << index_ << " of " << subchannel_list_->size()
<< " (subchannel " << subchannel_.get()
<< "): connectivity changed: old_state="
<< (connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A")
<< ", new_state=" << ConnectivityStateName(new_state)
<< ", status=" << status
<< ", shutting_down=" << subchannel_list_->shutting_down_
<< ", pending_watcher=" << pending_watcher_
<< ", seen_transient_failure=" << seen_transient_failure_
<< ", p->selected_=" << p->selected_
<< ", p->subchannel_list_=" << p->subchannel_list_.get()
<< ", p->latest_pending_subchannel_list_="
<< p->latest_pending_subchannel_list_.get();
}
GRPC_TRACE_LOG(pick_first, INFO)
<< "[PF " << p << "] subchannel list " << subchannel_list_ << " index "
<< index_ << " of " << subchannel_list_->size() << " (subchannel "
<< subchannel_.get() << "): connectivity changed: old_state="
<< (connectivity_state_.has_value()
? ConnectivityStateName(*connectivity_state_)
: "N/A")
<< ", new_state=" << ConnectivityStateName(new_state)
<< ", status=" << status
<< ", shutting_down=" << subchannel_list_->shutting_down_
<< ", pending_watcher=" << pending_watcher_
<< ", seen_transient_failure=" << seen_transient_failure_
<< ", p->selected_=" << p->selected_
<< ", p->subchannel_list_=" << p->subchannel_list_.get()
<< ", p->latest_pending_subchannel_list_="
<< p->latest_pending_subchannel_list_.get();
if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return;
auto& stats_plugins = subchannel_list_->policy_->channel_control_helper()
->GetStatsPluginGroup();

@ -35,7 +35,6 @@
#include "absl/types/optional.h"
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -25,7 +25,6 @@
#include "absl/functional/any_invocable.h"
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
namespace grpc_core {

@ -35,7 +35,6 @@
#include "absl/types/variant.h"
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/client_channel/client_channel_internal.h"

@ -29,7 +29,6 @@
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -43,8 +43,6 @@
#include "absl/strings/string_view.h"
#include "absl/strings/strip.h"
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/orphanable.h"

@ -34,7 +34,6 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/config/core_configuration.h"

@ -39,8 +39,6 @@
#include "absl/log/check.h"
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"

@ -33,7 +33,6 @@
#include "absl/strings/str_format.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/log_windows.h>
#include <grpc/support/string_util.h>
#include <grpc/support/time.h>

@ -18,7 +18,6 @@
#include "absl/log/log.h"
#include "absl/strings/match.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/config/config_vars.h"

@ -77,12 +77,6 @@ using grpc_event_engine::experimental::EventEngine;
// TODO(hork): Add a test that checks for proper authority from balancer
// addresses.
#define GRPC_EVENT_ENGINE_RESOLVER_TRACE(format, ...) \
if (GRPC_TRACE_FLAG_ENABLED(event_engine_client_channel_resolver)) { \
VLOG(2) << "(event_engine client channel resolver) " \
<< absl::StrFormat(format, __VA_ARGS__); \
}
// ----------------------------------------------------------------------------
// EventEngineClientChannelDNSResolver
// ----------------------------------------------------------------------------
@ -221,9 +215,10 @@ EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
event_engine_resolver_(std::move(event_engine_resolver)) {
// Locking to prevent completion before all records are queried
MutexLock lock(&on_resolved_mu_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting hostname resolution for %s", resolver_.get(),
resolver_->name_to_resolve().c_str());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< resolver_.get() << " Starting hostname resolution for "
<< resolver_->name_to_resolve();
is_hostname_inflight_ = true;
event_engine_resolver_->LookupHostname(
[self = Ref(DEBUG_LOCATION, "OnHostnameResolved")](
@ -236,9 +231,10 @@ EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
},
resolver_->name_to_resolve(), kDefaultSecurePort);
if (resolver_->enable_srv_queries_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting SRV record resolution for %s",
resolver_.get(), resolver_->name_to_resolve().c_str());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< resolver_.get() << " Starting SRV record resolution for "
<< resolver_->name_to_resolve();
is_srv_inflight_ = true;
event_engine_resolver_->LookupSRV(
[self = Ref(DEBUG_LOCATION, "OnSRVResolved")](
@ -252,9 +248,10 @@ EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
absl::StrCat("_grpclb._tcp.", resolver_->name_to_resolve()));
}
if (resolver_->request_service_config_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting TXT record resolution for %s",
resolver_.get(), resolver_->name_to_resolve().c_str());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< resolver_.get() << " Starting TXT record resolution for "
<< resolver_->name_to_resolve();
is_txt_inflight_ = true;
event_engine_resolver_->LookupTXT(
[self = Ref(DEBUG_LOCATION, "OnTXTResolved")](
@ -303,8 +300,9 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
OnTimeout() {
MutexLock lock(&on_resolved_mu_);
GRPC_EVENT_ENGINE_RESOLVER_TRACE("DNSResolver::%p OnTimeout",
resolver_.get());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< resolver_.get() << " OnTimeout";
timeout_handle_.reset();
event_engine_resolver_.reset();
}
@ -368,9 +366,10 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
}
// Do a subsequent hostname query since SRV records were returned
for (auto& srv_record : *srv_records) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p Starting balancer hostname resolution for %s:%d",
resolver_.get(), srv_record.host.c_str(), srv_record.port);
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< resolver_.get() << " Starting balancer hostname resolution for "
<< srv_record.host << ":" << srv_record.port;
++number_of_balancer_hostnames_initiated_;
event_engine_resolver_->LookupHostname(
[host = srv_record.host,
@ -446,9 +445,10 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
// Found a service config record.
service_config_json_ =
result->substr(kServiceConfigAttributePrefix.size());
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p found service config: %s",
event_engine_resolver_.get(), service_config_json_->c_str());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< event_engine_resolver_.get()
<< " found service config: " << service_config_json_->c_str();
} else {
service_config_json_ = absl::UnavailableError(absl::StrCat(
"failed to find attribute prefix: ", kServiceConfigAttributePrefix,
@ -492,9 +492,10 @@ void EventEngineClientChannelDNSResolver::EventEngineDNSRequestWrapper::
return;
}
if (service_config->empty()) return;
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p selected service config choice: %s",
event_engine_resolver_.get(), service_config->c_str());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::"
<< event_engine_resolver_.get()
<< " selected service config choice: " << service_config->c_str();
result->service_config =
ServiceConfigImpl::Create(resolver_->channel_args(), *service_config);
if (!result->service_config.ok()) {
@ -511,20 +512,19 @@ absl::optional<Resolver::Result> EventEngineClientChannelDNSResolver::
if (is_hostname_inflight_ || is_srv_inflight_ || is_txt_inflight_ ||
number_of_balancer_hostnames_resolved_ !=
number_of_balancer_hostnames_initiated_) {
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p OnResolved() waiting for results (hostname: %s, "
"srv: %s, "
"txt: %s, "
"balancer addresses: %" PRIuPTR "/%" PRIuPTR " complete",
this, is_hostname_inflight_ ? "waiting" : "done",
is_srv_inflight_ ? "waiting" : "done",
is_txt_inflight_ ? "waiting" : "done",
number_of_balancer_hostnames_resolved_,
number_of_balancer_hostnames_initiated_);
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::" << this
<< " OnResolved() waiting for results (hostname: "
<< (is_hostname_inflight_ ? "waiting" : "done")
<< ", srv: " << (is_srv_inflight_ ? "waiting" : "done")
<< ", txt: " << (is_txt_inflight_ ? "waiting" : "done")
<< ", balancer addresses: " << number_of_balancer_hostnames_resolved_
<< "/" << number_of_balancer_hostnames_initiated_ << " complete";
return absl::nullopt;
}
GRPC_EVENT_ENGINE_RESOLVER_TRACE(
"DNSResolver::%p OnResolvedLocked() proceeding", this);
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) DNSResolver::" << this
<< " OnResolvedLocked() proceeding";
Resolver::Result result;
result.args = resolver_->channel_args();
// If both addresses and balancer addresses failed, return an error for both
@ -538,7 +538,8 @@ absl::optional<Resolver::Result> EventEngineClientChannelDNSResolver::
// return an error. Validation errors may be empty.
status = absl::UnavailableError("No results from DNS queries");
}
GRPC_EVENT_ENGINE_RESOLVER_TRACE("%s", status.message().data());
GRPC_TRACE_VLOG(event_engine_client_channel_resolver, 2)
<< "(event_engine client channel resolver) " << status.message().data();
result.addresses = status;
result.service_config = status;
return std::move(result);

@ -90,15 +90,11 @@ NativeClientChannelDNSResolver::NativeClientChannelDNSResolver(
.set_max_backoff(Duration::Milliseconds(
GRPC_DNS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)),
&dns_resolver_trace) {
if (GRPC_TRACE_FLAG_ENABLED(dns_resolver)) {
VLOG(2) << "[dns_resolver=" << this << "] created";
}
GRPC_TRACE_VLOG(dns_resolver, 2) << "[dns_resolver=" << this << "] created";
}
NativeClientChannelDNSResolver::~NativeClientChannelDNSResolver() {
if (GRPC_TRACE_FLAG_ENABLED(dns_resolver)) {
VLOG(2) << "[dns_resolver=" << this << "] destroyed";
}
GRPC_TRACE_VLOG(dns_resolver, 2) << "[dns_resolver=" << this << "] destroyed";
}
OrphanablePtr<Orphanable> NativeClientChannelDNSResolver::StartRequest() {
@ -107,19 +103,17 @@ OrphanablePtr<Orphanable> NativeClientChannelDNSResolver::StartRequest() {
absl::bind_front(&NativeClientChannelDNSResolver::OnResolved, this),
name_to_resolve(), kDefaultSecurePort, kDefaultDNSRequestTimeout,
interested_parties(), /*name_server=*/"");
if (GRPC_TRACE_FLAG_ENABLED(dns_resolver)) {
VLOG(2) << "[dns_resolver=" << this << "] starting request="
<< DNSResolver::HandleToString(dns_request_handle);
}
GRPC_TRACE_VLOG(dns_resolver, 2)
<< "[dns_resolver=" << this << "] starting request="
<< DNSResolver::HandleToString(dns_request_handle);
return MakeOrphanable<Request>();
}
void NativeClientChannelDNSResolver::OnResolved(
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
if (GRPC_TRACE_FLAG_ENABLED(dns_resolver)) {
VLOG(2) << "[dns_resolver=" << this
<< "] request complete, status=" << addresses_or.status();
}
GRPC_TRACE_VLOG(dns_resolver, 2)
<< "[dns_resolver=" << this
<< "] request complete, status=" << addresses_or.status();
// Convert result from iomgr DNS API into Resolver::Result.
Result result;
if (addresses_or.ok()) {

@ -30,7 +30,6 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/str_join.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/address_utils/sockaddr_utils.h"

@ -26,7 +26,6 @@
#include "absl/log/check.h"
#include "absl/strings/string_view.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -24,7 +24,6 @@
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -28,7 +28,6 @@
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/message_size/message_size_filter.h"

@ -24,7 +24,6 @@
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/promise/context.h"

@ -20,7 +20,6 @@
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -22,7 +22,6 @@
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/crash.h"

@ -21,7 +21,6 @@
#include "absl/log/check.h"
#include "absl/log/log.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>

@ -37,14 +37,15 @@
#include "src/core/handshaker/handshaker.h"
#include "src/core/handshaker/handshaker_registry.h"
#include "src/core/handshaker/tcp_connect/tcp_connect_handshaker.h"
#include "src/core/lib/address_utils/sockaddr_utils.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_args_preconditioning.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/event_engine/tcp_socket_utils.h"
#include "src/core/lib/gprpp/status_helper.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/resource_quota/api.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/security_connector/security_connector.h"
@ -57,9 +58,6 @@ namespace grpc_core {
namespace {
using grpc_event_engine::experimental::EventEngine;
using grpc_event_engine::experimental::ResolvedAddressToURI;
grpc_httpcli_get_override g_get_override;
grpc_httpcli_post_override g_post_override;
grpc_httpcli_put_override g_put_override;
@ -174,10 +172,7 @@ HttpRequest::HttpRequest(
pollent_(pollent),
pollset_set_(grpc_pollset_set_create()),
test_only_generate_response_(std::move(test_only_generate_response)),
resolver_(
ChannelArgs::FromC(channel_args_)
.GetObjectRef<EventEngine>()
->GetDNSResolver(EventEngine::DNSResolver::ResolverOptions())) {
resolver_(GetDNSResolver()) {
grpc_http_parser_init(&parser_, GRPC_HTTP_RESPONSE, response);
grpc_slice_buffer_init(&incoming_);
grpc_slice_buffer_init(&outgoing_);
@ -211,14 +206,11 @@ void HttpRequest::Start() {
test_only_generate_response_.value()();
return;
}
if (!resolver_.ok()) {
Finish(resolver_.status());
return;
}
Ref().release(); // ref held by pending DNS resolution
(*resolver_)
->LookupHostname(absl::bind_front(&HttpRequest::OnResolved, this),
uri_.authority(), uri_.scheme());
dns_request_handle_ = resolver_->LookupHostname(
absl::bind_front(&HttpRequest::OnResolved, this), uri_.authority(),
uri_.scheme(), kDefaultDNSRequestTimeout, pollset_set_,
/*name_server=*/"");
}
void HttpRequest::Orphan() {
@ -227,8 +219,10 @@ void HttpRequest::Orphan() {
CHECK(!cancelled_);
cancelled_ = true;
// cancel potentially pending DNS resolution.
if (*resolver_ != nullptr) {
resolver_->reset();
if (dns_request_handle_.has_value() &&
resolver_->Cancel(dns_request_handle_.value())) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
Unref();
}
if (handshake_mgr_ != nullptr) {
// Shutdown will cancel any ongoing tcp connect.
@ -244,7 +238,8 @@ void HttpRequest::AppendError(grpc_error_handle error) {
if (overall_error_.ok()) {
overall_error_ = GRPC_ERROR_CREATE("Failed HTTP/1 client request");
}
auto addr_text = ResolvedAddressToURI(addresses_[next_address_ - 1]);
const grpc_resolved_address* addr = &addresses_[next_address_ - 1];
auto addr_text = grpc_sockaddr_to_uri(addr);
if (addr_text.ok()) error = AddMessagePrefix(*addr_text, std::move(error));
overall_error_ = grpc_error_add_child(overall_error_, std::move(error));
}
@ -314,7 +309,7 @@ void HttpRequest::OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result) {
StartWrite();
}
void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
void HttpRequest::DoHandshake(const grpc_resolved_address* addr) {
// Create the security connector using the credentials and target name.
ChannelArgs args = ChannelArgs::FromC(channel_args_);
RefCountedPtr<grpc_channel_security_connector> sc =
@ -325,7 +320,7 @@ void HttpRequest::DoHandshake(const EventEngine::ResolvedAddress& addr) {
&overall_error_, 1));
return;
}
absl::StatusOr<std::string> address = ResolvedAddressToURI(addr);
absl::StatusOr<std::string> address = grpc_sockaddr_to_uri(addr);
if (!address.ok()) {
Finish(GRPC_ERROR_CREATE_REFERENCING("Failed to extract URI from address",
&overall_error_, 1));
@ -358,16 +353,15 @@ void HttpRequest::NextAddress(grpc_error_handle error) {
&overall_error_, 1));
return;
}
DoHandshake(addresses_[next_address_++]);
const grpc_resolved_address* addr = &addresses_[next_address_++];
DoHandshake(addr);
}
void HttpRequest::OnResolved(
absl::StatusOr<std::vector<EventEngine::ResolvedAddress>> addresses_or) {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or) {
RefCountedPtr<HttpRequest> unreffer(this);
MutexLock lock(&mu_);
resolver_->reset();
dns_request_handle_.reset();
if (cancelled_) {
Finish(GRPC_ERROR_CREATE("cancelled during DNS resolution"));
return;

@ -32,7 +32,6 @@
#include "absl/status/statusor.h"
#include "absl/types/optional.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/slice.h>
@ -49,6 +48,8 @@
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/uri/uri_parser.h"
#include "src/core/util/http_client/parser.h"
@ -222,16 +223,13 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
void OnHandshakeDone(absl::StatusOr<HandshakerArgs*> result);
void DoHandshake(
const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
void DoHandshake(const grpc_resolved_address* addr)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void NextAddress(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
void OnResolved(
absl::StatusOr<std::vector<
grpc_event_engine::experimental::EventEngine::ResolvedAddress>>
addresses_or);
absl::StatusOr<std::vector<grpc_resolved_address>> addresses_or);
const URI uri_;
const grpc_slice request_text_;
@ -252,17 +250,16 @@ class HttpRequest : public InternallyRefCounted<HttpRequest> {
RefCountedPtr<HandshakeManager> handshake_mgr_ ABSL_GUARDED_BY(mu_);
bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
grpc_http_parser parser_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_event_engine::experimental::EventEngine::ResolvedAddress>
addresses_ ABSL_GUARDED_BY(mu_);
std::vector<grpc_resolved_address> addresses_ ABSL_GUARDED_BY(mu_);
size_t next_address_ ABSL_GUARDED_BY(mu_) = 0;
int have_read_byte_ ABSL_GUARDED_BY(mu_) = 0;
grpc_iomgr_object iomgr_obj_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer incoming_ ABSL_GUARDED_BY(mu_);
grpc_slice_buffer outgoing_ ABSL_GUARDED_BY(mu_);
grpc_error_handle overall_error_ ABSL_GUARDED_BY(mu_) = absl::OkStatus();
absl::StatusOr<std::unique_ptr<
grpc_event_engine::experimental::EventEngine::DNSResolver>>
resolver_;
std::shared_ptr<DNSResolver> resolver_;
absl::optional<DNSResolver::TaskHandle> dns_request_handle_
ABSL_GUARDED_BY(mu_) = DNSResolver::kNullHandle;
};
} // namespace grpc_core

@ -376,10 +376,9 @@ static grpc_error_handle addbyte(grpc_http_parser* parser, uint8_t byte,
case GRPC_HTTP_HEADERS:
case GRPC_HTTP_TRAILERS:
if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) {
if (GRPC_TRACE_FLAG_ENABLED(http1)) {
LOG(ERROR) << "HTTP header max line length ("
<< GRPC_HTTP_PARSER_MAX_HEADER_LENGTH << ") exceeded";
}
GRPC_TRACE_LOG(http1, ERROR)
<< "HTTP header max line length ("
<< GRPC_HTTP_PARSER_MAX_HEADER_LENGTH << ") exceeded";
return GRPC_ERROR_CREATE("HTTP header max line length exceeded");
}
parser->cur_line[parser->cur_line_length] = byte;

@ -24,7 +24,6 @@
#include "absl/log/check.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -30,7 +30,6 @@
#include "upb/mem/arena.hpp"
#include <grpc/support/json.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/util/json/json.h"

@ -25,7 +25,6 @@
#include "envoy/extensions/filters/http/router/v3/router.upb.h"
#include "envoy/extensions/filters/http/router/v3/router.upbdefs.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/util/json/json.h"

@ -31,7 +31,6 @@
#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include "src/core/lib/channel/channel_args.h"

@ -63,10 +63,9 @@ std::string GetNamespaceName() {
"/var/run/secrets/kubernetes.io/serviceaccount/namespace";
auto namespace_name = grpc_core::LoadFile(filename, false);
if (!namespace_name.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(environment_autodetect)) {
VLOG(2) << "Reading file " << filename << " failed: "
<< grpc_core::StatusToString(namespace_name.status());
}
GRPC_TRACE_VLOG(environment_autodetect, 2)
<< "Reading file " << filename
<< " failed: " << grpc_core::StatusToString(namespace_name.status());
// Fallback on an environment variable
return grpc_core::GetEnv("NAMESPACE_NAME").value_or("");
}

@ -29,7 +29,6 @@
#include "absl/log/check.h"
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>

@ -27,8 +27,6 @@
#include "python_observability_context.h"
#include "server_call_tracer.h"
#include <grpc/support/log.h>
namespace grpc_observability {
std::queue<CensusData>* g_census_data_buffer;

@ -46,7 +46,6 @@
#include <grpc/impl/propagation_bits.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/channel/channel_args.h"

@ -26,7 +26,6 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"

@ -26,7 +26,6 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"

@ -26,7 +26,6 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"

@ -34,7 +34,6 @@
#include <grpc/impl/channel_arg_names.h>
#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"

@ -24,7 +24,6 @@
#include <grpc/credentials.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gprpp/host_port.h"

@ -26,7 +26,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/channel_arg_names.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/channelz/channelz.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"

@ -32,7 +32,6 @@
#include "absl/types/span.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/time.h"

@ -23,7 +23,6 @@
#include "absl/log/check.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
extern absl::AnyInvocable<
std::shared_ptr<grpc_event_engine::experimental::EventEngine>()>*

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save