From 56539cc448a225c4936e712d29cd9a1022320aae Mon Sep 17 00:00:00 2001 From: Yash Tibrewal <yashkt@google.com> Date: Wed, 9 Oct 2019 15:22:28 -0700 Subject: [PATCH] Everything compiles --- .../chttp2/transport/chttp2_transport.cc | 5 +- src/core/lib/iomgr/combiner.cc | 63 ++++------- .../dns_resolver_connectivity_test.cc | 4 +- .../resolvers/dns_resolver_cooldown_test.cc | 12 +- .../resolvers/dns_resolver_test.cc | 2 +- .../resolvers/fake_resolver_test.cc | 4 +- .../resolvers/sockaddr_resolver_test.cc | 2 +- .../end2end/fixtures/http_proxy_fixture.cc | 103 ++++++++++++++---- test/core/end2end/fuzzers/api_fuzzer.cc | 2 +- test/core/end2end/goaway_server_test.cc | 4 +- test/core/iomgr/combiner_test.cc | 33 +++--- test/cpp/microbenchmarks/bm_closure.cc | 100 ++++++----------- test/cpp/naming/cancel_ares_query_test.cc | 2 +- test/cpp/naming/resolver_component_test.cc | 8 +- 14 files changed, 175 insertions(+), 169 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index cf0fe3f26f5..f533338131b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -3099,9 +3099,8 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) { if (!t->destructive_reclaimer_registered) { t->destructive_reclaimer_registered = true; GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); - GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, - destructive_reclaimer_locked, t, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer, + t, grpc_schedule_on_exec_ctx); grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), true, &t->destructive_reclaimer_locked); } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index def95613a9d..62a38a2c3fc 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -45,18 +45,15 @@ grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner"); #define STATE_UNORPHANED 1 #define STATE_ELEM_COUNT_LOW_BIT 2 -namespace grpc_core { -static void combiner_run(Combiner* lock, grpc_closure* closure, - grpc_error* error); -static void combiner_exec(Combiner* lock, grpc_closure* closure, +static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* closure, grpc_error* error); -static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, - grpc_error* error); +static void combiner_finally_exec(grpc_core::Combiner* lock, + grpc_closure* closure, grpc_error* error); static void offload(void* arg, grpc_error* error); -Combiner* grpc_combiner_create(void) { - Combiner* lock = grpc_core::New<Combiner>(); +grpc_core::Combiner* grpc_combiner_create(void) { + grpc_core::Combiner* lock = grpc_core::New<grpc_core::Combiner>(); gpr_ref_init(&lock->refs, 1); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); grpc_closure_list_init(&lock->final_list); @@ -67,13 +64,13 @@ Combiner* grpc_combiner_create(void) { return lock; } -static void really_destroy(Combiner* lock) { +static void really_destroy(grpc_core::Combiner* lock) { GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock)); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); grpc_core::Delete(lock); } -static void start_destroy(Combiner* lock) { +static void start_destroy(grpc_core::Combiner* lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); @@ -94,20 +91,21 @@ static void start_destroy(Combiner* lock) { #define GRPC_COMBINER_DEBUG_SPAM(op, delta) #endif -void grpc_combiner_unref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { +void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); if (gpr_unref(&lock->refs)) { start_destroy(lock); } } -Combiner* grpc_combiner_ref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { +grpc_core::Combiner* grpc_combiner_ref( + grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { GRPC_COMBINER_DEBUG_SPAM(" REF", 1); gpr_ref(&lock->refs); return lock; } -static void push_last_on_exec_ctx(Combiner* lock) { +static void push_last_on_exec_ctx(grpc_core::Combiner* lock) { lock->next_combiner_on_this_exec_ctx = nullptr; if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = @@ -120,7 +118,7 @@ static void push_last_on_exec_ctx(Combiner* lock) { } } -static void push_first_on_exec_ctx(Combiner* lock) { +static void push_first_on_exec_ctx(grpc_core::Combiner* lock) { lock->next_combiner_on_this_exec_ctx = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock; @@ -129,7 +127,8 @@ static void push_first_on_exec_ctx(Combiner* lock) { } } -static void combiner_exec(Combiner* lock, grpc_closure* cl, grpc_error* error) { +static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl, + grpc_error* error) { GPR_TIMER_SCOPE("combiner.execute", 0); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); @@ -170,11 +169,11 @@ static void move_next() { } static void offload(void* arg, grpc_error* error) { - Combiner* lock = static_cast<Combiner*>(arg); + grpc_core::Combiner* lock = static_cast<grpc_core::Combiner*>(arg); push_last_on_exec_ctx(lock); } -static void queue_offload(Combiner* lock) { +static void queue_offload(grpc_core::Combiner* lock) { GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); move_next(); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); @@ -183,7 +182,8 @@ static void queue_offload(Combiner* lock) { bool grpc_combiner_continue_exec_ctx() { GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0); - Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; + grpc_core::Combiner* lock = + grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; if (lock == nullptr) { return false; } @@ -300,8 +300,8 @@ bool grpc_combiner_continue_exec_ctx() { static void enqueue_finally(void* closure, grpc_error* error); -static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, - grpc_error* error) { +static void combiner_finally_exec(grpc_core::Combiner* lock, + grpc_closure* closure, grpc_error* error) { GPR_TIMER_SCOPE("combiner.execute_finally", 0); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); GRPC_COMBINER_TRACE(gpr_log( @@ -322,28 +322,13 @@ static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, grpc_closure_list_append(&lock->final_list, closure, error); } -static void combiner_run(Combiner* lock, grpc_closure* closure, - grpc_error* error) { -#ifndef NDEBUG - closure->scheduled = false; - GRPC_COMBINER_TRACE(gpr_log( - GPR_DEBUG, - "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", - lock, closure, closure->file_created, closure->line_created, - closure->file_initiated, closure->line_initiated)); -#endif - GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == - lock); - closure->cb(closure->cb_arg, error); - GRPC_ERROR_UNREF(error); -} - static void enqueue_finally(void* closure, grpc_error* error) { - grpc_closure* cl = static_cast<grpc_closure*>(cl); - combiner_finally_exec(reinterpret_cast<Combiner*>(cl->scheduler), cl, - GRPC_ERROR_REF(error)); + grpc_closure* cl = static_cast<grpc_closure*>(closure); + combiner_finally_exec(reinterpret_cast<grpc_core::Combiner*>(cl->scheduler), + cl, GRPC_ERROR_REF(error)); } +namespace grpc_core { void Combiner::Run(grpc_closure* closure, grpc_error* error) { combiner_exec(this, closure, error); } diff --git a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc index f8a7729671e..62226aff5d2 100644 --- a/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc @@ -33,7 +33,7 @@ static gpr_mu g_mu; static bool g_fail_resolution = true; -static grpc_combiner* g_combiner; +static grpc_core::Combiner* g_combiner; static void my_resolve_address(const char* addr, const char* default_port, grpc_pollset_set* interested_parties, @@ -65,7 +65,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) { + grpc_core::Combiner* combiner) { gpr_mu_lock(&g_mu); GPR_ASSERT(0 == strcmp("test", addr)); grpc_error* error = GRPC_ERROR_NONE; diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc index 683d02079c3..392ed6b5812 100644 --- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc @@ -37,14 +37,14 @@ constexpr int kMinResolutionPeriodForCheckMs = 900; extern grpc_address_resolver_vtable* grpc_resolve_address_impl; static grpc_address_resolver_vtable* default_resolve_address; -static grpc_combiner* g_combiner; +static grpc_core::Combiner* g_combiner; static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( const char* dns_server, const char* name, const char* default_port, grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner); + grpc_core::Combiner* combiner); // Counter incremented by test_resolve_address_impl indicating the number of // times a system-level resolution has happened. @@ -95,7 +95,7 @@ static grpc_ares_request* test_dns_lookup_ares_locked( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) { + grpc_core::Combiner* combiner) { grpc_ares_request* result = g_default_dns_lookup_ares_locked( dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addresses, check_grpclb, service_config_json, query_timeout_ms, combiner); @@ -309,9 +309,9 @@ static void test_cooldown() { grpc_core::New<OnResolutionCallbackArg>(); res_cb_arg->uri_str = "dns:127.0.0.1"; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, - grpc_combiner_scheduler(g_combiner)), - GRPC_ERROR_NONE); + g_combiner->Run( + GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, nullptr), + GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); poll_pollset_until_request_done(&g_iomgr_args); iomgr_args_finish(&g_iomgr_args); diff --git a/test/core/client_channel/resolvers/dns_resolver_test.cc b/test/core/client_channel/resolvers/dns_resolver_test.cc index d03782ef159..f953288cb2b 100644 --- a/test/core/client_channel/resolvers/dns_resolver_test.cc +++ b/test/core/client_channel/resolvers/dns_resolver_test.cc @@ -28,7 +28,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "test/core/util/test_config.h" -static grpc_combiner* g_combiner; +static grpc_core::Combiner* g_combiner; class TestResultHandler : public grpc_core::Resolver::ResultHandler { void ReturnResult(grpc_core::Resolver::Result result) override {} diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc index 0d34a0b8f2c..9c709777fc5 100644 --- a/test/core/client_channel/resolvers/fake_resolver_test.cc +++ b/test/core/client_channel/resolvers/fake_resolver_test.cc @@ -63,7 +63,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler { }; static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver( - grpc_combiner* combiner, + grpc_core::Combiner* combiner, grpc_core::FakeResolverResponseGenerator* response_generator, grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> result_handler) { grpc_core::ResolverFactory* factory = @@ -118,7 +118,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() { static void test_fake_resolver() { grpc_core::ExecCtx exec_ctx; - grpc_combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create(); // Create resolver. ResultHandler* result_handler = grpc_core::New<ResultHandler>(); grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> diff --git a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc index 8cbacaca999..26f70ed23de 100644 --- a/test/core/client_channel/resolvers/sockaddr_resolver_test.cc +++ b/test/core/client_channel/resolvers/sockaddr_resolver_test.cc @@ -28,7 +28,7 @@ #include "test/core/util/test_config.h" -static grpc_combiner* g_combiner; +static grpc_core::Combiner* g_combiner; class ResultHandler : public grpc_core::Resolver::ResultHandler { public: diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index da2381aa0a0..dd790406a46 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -69,7 +69,7 @@ struct grpc_end2end_http_proxy { grpc_pollset* pollset; gpr_refcount users; - grpc_combiner* combiner; + grpc_core::Combiner* combiner; }; // @@ -89,6 +89,14 @@ typedef struct proxy_connection { grpc_pollset_set* pollset_set; + grpc_closure on_read_request_done_hopper; + grpc_closure on_server_connect_done_hopper; + grpc_closure on_write_response_done_hopper; + grpc_closure on_client_read_done_hopper; + grpc_closure on_client_write_done_hopper; + grpc_closure on_server_read_done_hopper; + grpc_closure on_server_write_done_hopper; + // NOTE: All the closures execute under proxy->combiner lock. Which means // there will not be any data-races between the closures grpc_closure on_read_request_done; @@ -192,6 +200,12 @@ static void proxy_connection_failed(proxy_connection* conn, GRPC_ERROR_UNREF(error); } +static void on_client_write_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_client_write_done, + GRPC_ERROR_REF(error)); +} + // Callback for writing proxy data to the client. static void on_client_write_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast<proxy_connection*>(arg); @@ -210,13 +224,19 @@ static void on_client_write_done(void* arg, grpc_error* error) { &conn->client_write_buffer); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done_hopper, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "write_done"); } } +static void on_server_write_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_server_write_done, + GRPC_ERROR_REF(error)); +} + // Callback for writing proxy data to the backend server. static void on_server_write_done(void* arg, grpc_error* error) { proxy_connection* conn = static_cast<proxy_connection*>(arg); @@ -235,13 +255,18 @@ static void on_server_write_done(void* arg, grpc_error* error) { &conn->server_write_buffer); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done_hopper, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "server_write"); } } +static void on_client_read_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_client_read_done, GRPC_ERROR_REF(error)); +} + // Callback for reading data from the client, which will be proxied to // the backend server. static void on_client_read_done(void* arg, grpc_error* error) { @@ -266,11 +291,16 @@ static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done_hopper, nullptr); } // Read more data. grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done, /*urgent=*/false); + &conn->on_client_read_done_hopper, /*urgent=*/false); +} + +static void on_server_read_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_server_read_done, GRPC_ERROR_REF(error)); } // Callback for reading data from the backend server, which will be @@ -297,11 +327,17 @@ static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done_hopper, nullptr); } // Read more data. grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done, /*urgent=*/false); + &conn->on_server_read_done_hopper, /*urgent=*/false); +} + +static void on_write_response_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_write_response_done, + GRPC_ERROR_REF(error)); } // Callback to write the HTTP response for the CONNECT request. @@ -322,9 +358,15 @@ static void on_write_response_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); proxy_connection_unref(conn, "write_response"); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_client_read_done, /*urgent=*/false); + &conn->on_client_read_done_hopper, /*urgent=*/false); grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, - &conn->on_server_read_done, /*urgent=*/false); + &conn->on_server_read_done_hopper, /*urgent=*/false); +} + +static void on_server_connect_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_server_connect_done, + GRPC_ERROR_REF(error)); } // Callback to connect to the backend server specified by the HTTP @@ -349,7 +391,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_write_response_done, nullptr); + &conn->on_write_response_done_hopper, nullptr); } /** @@ -372,6 +414,11 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val, return header_matches; } +static void on_read_request_done_hopper(void* arg, grpc_error* error) { + proxy_connection* conn = static_cast<proxy_connection*>(arg); + conn->proxy->combiner->Run(&conn->on_read_request_done, + GRPC_ERROR_REF(error)); +} // Callback to read the HTTP CONNECT request. // TODO(roth): Technically, for any of the failure modes handled by this // function, we should handle the error by returning an HTTP response to @@ -404,7 +451,7 @@ static void on_read_request_done(void* arg, grpc_error* error) { // If we're not done reading the request, read more data. if (conn->http_parser.state != GRPC_HTTP_BODY) { grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done, /*urgent=*/false); + &conn->on_read_request_done_hopper, /*urgent=*/false); return; } // Make sure we got a CONNECT request. @@ -456,8 +503,8 @@ static void on_read_request_done(void* arg, grpc_error* error) { // The connection callback inherits our reference to conn. const grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC; - grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint, - conn->pollset_set, nullptr, + grpc_tcp_client_connect(&conn->on_server_connect_done_hopper, + &conn->server_endpoint, conn->pollset_set, nullptr, &resolved_addresses->addrs[0], deadline); grpc_resolved_addresses_destroy(resolved_addresses); } @@ -478,19 +525,33 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset); grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set); GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn, - grpc_combiner_scheduler(conn->proxy->combiner)); + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&conn->on_read_request_done_hopper, + on_read_request_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_server_connect_done_hopper, + on_server_connect_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_write_response_done_hopper, + on_write_response_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_client_read_done_hopper, + on_client_read_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_client_write_done_hopper, + on_client_write_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_server_read_done_hopper, + on_server_read_done_hopper, conn, nullptr); + GRPC_CLOSURE_INIT(&conn->on_server_write_done_hopper, + on_server_write_done_hopper, conn, nullptr); grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer); conn->client_is_writing = false; @@ -502,7 +563,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, &conn->http_request); grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, - &conn->on_read_request_done, /*urgent=*/false); + &conn->on_read_request_done_hopper, /*urgent=*/false); } // diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index 74a30913b24..e47ea74a1d9 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -379,7 +379,7 @@ grpc_ares_request* my_dns_lookup_ares_locked( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout, - grpc_combiner* combiner) { + grpc_core::Combiner* combiner) { addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r))); r->addr = gpr_strdup(addr); r->on_done = on_done; diff --git a/test/core/end2end/goaway_server_test.cc b/test/core/end2end/goaway_server_test.cc index 5db2aebe9a1..e180ec8f29c 100644 --- a/test/core/end2end/goaway_server_test.cc +++ b/test/core/end2end/goaway_server_test.cc @@ -49,7 +49,7 @@ static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner); + grpc_core::Combiner* combiner); static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); @@ -106,7 +106,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked( grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, bool check_grpclb, char** service_config_json, int query_timeout_ms, - grpc_combiner* combiner) { + grpc_core::Combiner* combiner) { if (0 != strcmp(addr, "test")) { return iomgr_dns_lookup_ares_locked( dns_server, addr, default_port, interested_parties, on_done, addresses, diff --git a/test/core/iomgr/combiner_test.cc b/test/core/iomgr/combiner_test.cc index c39c3fc8fce..432c1fc6c4c 100644 --- a/test/core/iomgr/combiner_test.cc +++ b/test/core/iomgr/combiner_test.cc @@ -39,13 +39,12 @@ static void set_event_to_true(void* value, grpc_error* error) { static void test_execute_one(void) { gpr_log(GPR_DEBUG, "test_execute_one"); - grpc_combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create(); gpr_event done; gpr_event_init(&done); grpc_core::ExecCtx exec_ctx; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done, - grpc_combiner_scheduler(lock)), - GRPC_ERROR_NONE); + lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &done, nullptr), + GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != nullptr); @@ -54,7 +53,7 @@ static void test_execute_one(void) { typedef struct { size_t ctr; - grpc_combiner* lock; + grpc_core::Combiner* lock; gpr_event done; } thd_args; @@ -79,24 +78,22 @@ static void execute_many_loop(void* a) { ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c))); c->ctr = &args->ctr; c->value = n++; - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE( - check_one, c, grpc_combiner_scheduler(args->lock)), - GRPC_ERROR_NONE); + args->lock->Run(GRPC_CLOSURE_CREATE(check_one, c, nullptr), + GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } // sleep for a little bit, to test a combiner draining and another thread // picking it up gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); } - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, - grpc_combiner_scheduler(args->lock)), - GRPC_ERROR_NONE); + args->lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, nullptr), + GRPC_ERROR_NONE); } static void test_execute_many(void) { gpr_log(GPR_DEBUG, "test_execute_many"); - grpc_combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create(); grpc_core::Thread thds[100]; thd_args ta[GPR_ARRAY_SIZE(thds)]; for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { @@ -122,21 +119,17 @@ static void in_finally(void* arg, grpc_error* error) { } static void add_finally(void* arg, grpc_error* error) { - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg, - grpc_combiner_finally_scheduler( - static_cast<grpc_combiner*>(arg))), - GRPC_ERROR_NONE); + static_cast<grpc_core::Combiner*>(arg)->Run( + GRPC_CLOSURE_CREATE(in_finally, arg, nullptr), GRPC_ERROR_NONE); } static void test_execute_finally(void) { gpr_log(GPR_DEBUG, "test_execute_finally"); - grpc_combiner* lock = grpc_combiner_create(); + grpc_core::Combiner* lock = grpc_combiner_create(); grpc_core::ExecCtx exec_ctx; gpr_event_init(&got_in_finally); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)), - GRPC_ERROR_NONE); + lock->Run(GRPC_CLOSURE_CREATE(add_finally, lock, nullptr), GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); GPR_ASSERT(gpr_event_wait(&got_in_finally, grpc_timeout_seconds_to_deadline(5)) != nullptr); diff --git a/test/cpp/microbenchmarks/bm_closure.cc b/test/cpp/microbenchmarks/bm_closure.cc index 5133c2a0f43..82b8b88a674 100644 --- a/test/cpp/microbenchmarks/bm_closure.cc +++ b/test/cpp/microbenchmarks/bm_closure.cc @@ -65,12 +65,12 @@ BENCHMARK(BM_ClosureInitAgainstExecCtx); static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create(); grpc_closure c; grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - benchmark::DoNotOptimize(GRPC_CLOSURE_INIT( - &c, DoNothing, nullptr, grpc_combiner_scheduler(combiner))); + benchmark::DoNotOptimize( + GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, nullptr)); } GRPC_COMBINER_UNREF(combiner, "finished"); @@ -242,12 +242,12 @@ BENCHMARK(BM_TryAcquireSpinlock); static void BM_ClosureSchedOnCombiner(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create(); grpc_closure c; - GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); + GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_SCHED(&c, GRPC_ERROR_NONE); + combiner->Run(&c, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } GRPC_COMBINER_UNREF(combiner, "finished"); @@ -258,15 +258,15 @@ BENCHMARK(BM_ClosureSchedOnCombiner); static void BM_ClosureSched2OnCombiner(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create(); grpc_closure c1; grpc_closure c2; - GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); + GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); + combiner->Run(&c1, GRPC_ERROR_NONE); + combiner->Run(&c2, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } GRPC_COMBINER_UNREF(combiner, "finished"); @@ -277,18 +277,18 @@ BENCHMARK(BM_ClosureSched2OnCombiner); static void BM_ClosureSched3OnCombiner(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner = grpc_combiner_create(); + grpc_core::Combiner* combiner = grpc_combiner_create(); grpc_closure c1; grpc_closure c2; grpc_closure c3; - GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); - GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); + GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); + combiner->Run(&c1, GRPC_ERROR_NONE); + combiner->Run(&c2, GRPC_ERROR_NONE); + combiner->Run(&c3, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } GRPC_COMBINER_UNREF(combiner, "finished"); @@ -299,18 +299,16 @@ BENCHMARK(BM_ClosureSched3OnCombiner); static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner1 = grpc_combiner_create(); - grpc_combiner* combiner2 = grpc_combiner_create(); + grpc_core::Combiner* combiner1 = grpc_combiner_create(); + grpc_core::Combiner* combiner2 = grpc_combiner_create(); grpc_closure c1; grpc_closure c2; - GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, - grpc_combiner_scheduler(combiner1)); - GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, - grpc_combiner_scheduler(combiner2)); + GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); + combiner1->Run(&c1, GRPC_ERROR_NONE); + combiner2->Run(&c2, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } GRPC_COMBINER_UNREF(combiner1, "finished"); @@ -322,26 +320,22 @@ BENCHMARK(BM_ClosureSched2OnTwoCombiners); static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) { TrackCounters track_counters; - grpc_combiner* combiner1 = grpc_combiner_create(); - grpc_combiner* combiner2 = grpc_combiner_create(); + grpc_core::Combiner* combiner1 = grpc_combiner_create(); + grpc_core::Combiner* combiner2 = grpc_combiner_create(); grpc_closure c1; grpc_closure c2; grpc_closure c3; grpc_closure c4; - GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, - grpc_combiner_scheduler(combiner1)); - GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, - grpc_combiner_scheduler(combiner2)); - GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, - grpc_combiner_scheduler(combiner1)); - GRPC_CLOSURE_INIT(&c4, DoNothing, nullptr, - grpc_combiner_scheduler(combiner2)); + GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, nullptr); + GRPC_CLOSURE_INIT(&c4, DoNothing, nullptr, nullptr); grpc_core::ExecCtx exec_ctx; for (auto _ : state) { - GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); - GRPC_CLOSURE_SCHED(&c4, GRPC_ERROR_NONE); + combiner1->Run(&c1, GRPC_ERROR_NONE); + combiner2->Run(&c2, GRPC_ERROR_NONE); + combiner1->Run(&c3, GRPC_ERROR_NONE); + combiner2->Run(&c4, GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); } GRPC_COMBINER_UNREF(combiner1, "finished"); @@ -390,32 +384,6 @@ static void BM_ClosureReschedOnExecCtx(benchmark::State& state) { } BENCHMARK(BM_ClosureReschedOnExecCtx); -static void BM_ClosureReschedOnCombiner(benchmark::State& state) { - TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; - grpc_combiner* combiner = grpc_combiner_create(); - Rescheduler r(state, grpc_combiner_scheduler(combiner)); - r.ScheduleFirst(); - grpc_core::ExecCtx::Get()->Flush(); - GRPC_COMBINER_UNREF(combiner, "finished"); - - track_counters.Finish(state); -} -BENCHMARK(BM_ClosureReschedOnCombiner); - -static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) { - TrackCounters track_counters; - grpc_core::ExecCtx exec_ctx; - grpc_combiner* combiner = grpc_combiner_create(); - Rescheduler r(state, grpc_combiner_finally_scheduler(combiner)); - r.ScheduleFirstAgainstDifferentScheduler(grpc_combiner_scheduler(combiner)); - grpc_core::ExecCtx::Get()->Flush(); - GRPC_COMBINER_UNREF(combiner, "finished"); - - track_counters.Finish(state); -} -BENCHMARK(BM_ClosureReschedOnCombinerFinally); - // Some distros have RunSpecifiedBenchmarks under the benchmark namespace, // and others do not. This allows us to support both modes. namespace benchmark { diff --git a/test/cpp/naming/cancel_ares_query_test.cc b/test/cpp/naming/cancel_ares_query_test.cc index ab9deb7bcbc..5943b0ed14e 100644 --- a/test/cpp/naming/cancel_ares_query_test.cc +++ b/test/cpp/naming/cancel_ares_query_test.cc @@ -81,7 +81,7 @@ struct ArgsStruct { gpr_mu* mu; grpc_pollset* pollset; grpc_pollset_set* pollset_set; - grpc_combiner* lock; + grpc_core::Combiner* lock; grpc_channel_args* channel_args; }; diff --git a/test/cpp/naming/resolver_component_test.cc b/test/cpp/naming/resolver_component_test.cc index 67ed307d2d7..9f9401cd9f4 100644 --- a/test/cpp/naming/resolver_component_test.cc +++ b/test/cpp/naming/resolver_component_test.cc @@ -192,7 +192,7 @@ struct ArgsStruct { gpr_mu* mu; grpc_pollset* pollset; grpc_pollset_set* pollset_set; - grpc_combiner* lock; + grpc_core::Combiner* lock; grpc_channel_args* channel_args; vector<GrpcLBAddress> expected_addrs; std::string expected_service_config_string; @@ -616,9 +616,9 @@ void RunResolvesRelevantRecordsTest( CreateResultHandler(&args)); grpc_channel_args_destroy(resolver_args); gpr_free(whole_uri); - GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(StartResolvingLocked, resolver.get(), - grpc_combiner_scheduler(args.lock)), - GRPC_ERROR_NONE); + args.lock->Run( + GRPC_CLOSURE_CREATE(StartResolvingLocked, resolver.get(), nullptr), + GRPC_ERROR_NONE); grpc_core::ExecCtx::Get()->Flush(); PollPollsetUntilRequestDone(&args); ArgsFinish(&args);