Everything compiles

reviewable/pr20542/r1
Yash Tibrewal 5 years ago
parent 714ec01e4b
commit 56539cc448
  1. 5
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 63
      src/core/lib/iomgr/combiner.cc
  3. 4
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  4. 12
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  5. 2
      test/core/client_channel/resolvers/dns_resolver_test.cc
  6. 4
      test/core/client_channel/resolvers/fake_resolver_test.cc
  7. 2
      test/core/client_channel/resolvers/sockaddr_resolver_test.cc
  8. 103
      test/core/end2end/fixtures/http_proxy_fixture.cc
  9. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  10. 4
      test/core/end2end/goaway_server_test.cc
  11. 33
      test/core/iomgr/combiner_test.cc
  12. 100
      test/cpp/microbenchmarks/bm_closure.cc
  13. 2
      test/cpp/naming/cancel_ares_query_test.cc
  14. 8
      test/cpp/naming/resolver_component_test.cc

@ -3099,9 +3099,8 @@ static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
if (!t->destructive_reclaimer_registered) { if (!t->destructive_reclaimer_registered) {
t->destructive_reclaimer_registered = true; t->destructive_reclaimer_registered = true;
GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer"); GRPC_CHTTP2_REF_TRANSPORT(t, "destructive_reclaimer");
GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, GRPC_CLOSURE_INIT(&t->destructive_reclaimer_locked, destructive_reclaimer,
destructive_reclaimer_locked, t, t, grpc_schedule_on_exec_ctx);
grpc_schedule_on_exec_ctx);
grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep), grpc_resource_user_post_reclaimer(grpc_endpoint_get_resource_user(t->ep),
true, &t->destructive_reclaimer_locked); true, &t->destructive_reclaimer_locked);
} }

@ -45,18 +45,15 @@ grpc_core::DebugOnlyTraceFlag grpc_combiner_trace(false, "combiner");
#define STATE_UNORPHANED 1 #define STATE_UNORPHANED 1
#define STATE_ELEM_COUNT_LOW_BIT 2 #define STATE_ELEM_COUNT_LOW_BIT 2
namespace grpc_core { static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* closure,
static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_error* error);
static void combiner_exec(Combiner* lock, grpc_closure* closure,
grpc_error* error); grpc_error* error);
static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, static void combiner_finally_exec(grpc_core::Combiner* lock,
grpc_error* error); grpc_closure* closure, grpc_error* error);
static void offload(void* arg, grpc_error* error); static void offload(void* arg, grpc_error* error);
Combiner* grpc_combiner_create(void) { grpc_core::Combiner* grpc_combiner_create(void) {
Combiner* lock = grpc_core::New<Combiner>(); grpc_core::Combiner* lock = grpc_core::New<grpc_core::Combiner>();
gpr_ref_init(&lock->refs, 1); gpr_ref_init(&lock->refs, 1);
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
grpc_closure_list_init(&lock->final_list); grpc_closure_list_init(&lock->final_list);
@ -67,13 +64,13 @@ Combiner* grpc_combiner_create(void) {
return lock; return lock;
} }
static void really_destroy(Combiner* lock) { static void really_destroy(grpc_core::Combiner* lock) {
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p really_destroy", lock));
GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0);
grpc_core::Delete(lock); grpc_core::Delete(lock);
} }
static void start_destroy(Combiner* lock) { static void start_destroy(grpc_core::Combiner* lock) {
gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); GPR_INFO, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state));
@ -94,20 +91,21 @@ static void start_destroy(Combiner* lock) {
#define GRPC_COMBINER_DEBUG_SPAM(op, delta) #define GRPC_COMBINER_DEBUG_SPAM(op, delta)
#endif #endif
void grpc_combiner_unref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
if (gpr_unref(&lock->refs)) { if (gpr_unref(&lock->refs)) {
start_destroy(lock); start_destroy(lock);
} }
} }
Combiner* grpc_combiner_ref(Combiner* lock GRPC_COMBINER_DEBUG_ARGS) { grpc_core::Combiner* grpc_combiner_ref(
grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
GRPC_COMBINER_DEBUG_SPAM(" REF", 1); GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
gpr_ref(&lock->refs); gpr_ref(&lock->refs);
return lock; return lock;
} }
static void push_last_on_exec_ctx(Combiner* lock) { static void push_last_on_exec_ctx(grpc_core::Combiner* lock) {
lock->next_combiner_on_this_exec_ctx = nullptr; lock->next_combiner_on_this_exec_ctx = nullptr;
if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) { if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
@ -120,7 +118,7 @@ static void push_last_on_exec_ctx(Combiner* lock) {
} }
} }
static void push_first_on_exec_ctx(Combiner* lock) { static void push_first_on_exec_ctx(grpc_core::Combiner* lock) {
lock->next_combiner_on_this_exec_ctx = lock->next_combiner_on_this_exec_ctx =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock; grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
@ -129,7 +127,8 @@ static void push_first_on_exec_ctx(Combiner* lock) {
} }
} }
static void combiner_exec(Combiner* lock, grpc_closure* cl, grpc_error* error) { static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute", 0); GPR_TIMER_SCOPE("combiner.execute", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS(); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_ITEMS();
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT); gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
@ -170,11 +169,11 @@ static void move_next() {
} }
static void offload(void* arg, grpc_error* error) { static void offload(void* arg, grpc_error* error) {
Combiner* lock = static_cast<Combiner*>(arg); grpc_core::Combiner* lock = static_cast<grpc_core::Combiner*>(arg);
push_last_on_exec_ctx(lock); push_last_on_exec_ctx(lock);
} }
static void queue_offload(Combiner* lock) { static void queue_offload(grpc_core::Combiner* lock) {
GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(); GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED();
move_next(); move_next();
GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p queue_offload", lock));
@ -183,7 +182,8 @@ static void queue_offload(Combiner* lock) {
bool grpc_combiner_continue_exec_ctx() { bool grpc_combiner_continue_exec_ctx() {
GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0); GPR_TIMER_SCOPE("combiner.continue_exec_ctx", 0);
Combiner* lock = grpc_core::ExecCtx::Get()->combiner_data()->active_combiner; grpc_core::Combiner* lock =
grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
if (lock == nullptr) { if (lock == nullptr) {
return false; return false;
} }
@ -300,8 +300,8 @@ bool grpc_combiner_continue_exec_ctx() {
static void enqueue_finally(void* closure, grpc_error* error); static void enqueue_finally(void* closure, grpc_error* error);
static void combiner_finally_exec(Combiner* lock, grpc_closure* closure, static void combiner_finally_exec(grpc_core::Combiner* lock,
grpc_error* error) { grpc_closure* closure, grpc_error* error) {
GPR_TIMER_SCOPE("combiner.execute_finally", 0); GPR_TIMER_SCOPE("combiner.execute_finally", 0);
GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS(); GRPC_STATS_INC_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS();
GRPC_COMBINER_TRACE(gpr_log( GRPC_COMBINER_TRACE(gpr_log(
@ -322,28 +322,13 @@ static void combiner_finally_exec(Combiner* lock, grpc_closure* closure,
grpc_closure_list_append(&lock->final_list, closure, error); grpc_closure_list_append(&lock->final_list, closure, error);
} }
static void combiner_run(Combiner* lock, grpc_closure* closure,
grpc_error* error) {
#ifndef NDEBUG
closure->scheduled = false;
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG,
"Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]",
lock, closure, closure->file_created, closure->line_created,
closure->file_initiated, closure->line_initiated));
#endif
GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner ==
lock);
closure->cb(closure->cb_arg, error);
GRPC_ERROR_UNREF(error);
}
static void enqueue_finally(void* closure, grpc_error* error) { static void enqueue_finally(void* closure, grpc_error* error) {
grpc_closure* cl = static_cast<grpc_closure*>(cl); grpc_closure* cl = static_cast<grpc_closure*>(closure);
combiner_finally_exec(reinterpret_cast<Combiner*>(cl->scheduler), cl, combiner_finally_exec(reinterpret_cast<grpc_core::Combiner*>(cl->scheduler),
GRPC_ERROR_REF(error)); cl, GRPC_ERROR_REF(error));
} }
namespace grpc_core {
void Combiner::Run(grpc_closure* closure, grpc_error* error) { void Combiner::Run(grpc_closure* closure, grpc_error* error) {
combiner_exec(this, closure, error); combiner_exec(this, closure, error);
} }

@ -33,7 +33,7 @@
static gpr_mu g_mu; static gpr_mu g_mu;
static bool g_fail_resolution = true; static bool g_fail_resolution = true;
static grpc_combiner* g_combiner; static grpc_core::Combiner* g_combiner;
static void my_resolve_address(const char* addr, const char* default_port, static void my_resolve_address(const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
@ -65,7 +65,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms, bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) { grpc_core::Combiner* combiner) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
GPR_ASSERT(0 == strcmp("test", addr)); GPR_ASSERT(0 == strcmp("test", addr));
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;

@ -37,14 +37,14 @@ constexpr int kMinResolutionPeriodForCheckMs = 900;
extern grpc_address_resolver_vtable* grpc_resolve_address_impl; extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
static grpc_address_resolver_vtable* default_resolve_address; static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner; static grpc_core::Combiner* g_combiner;
static grpc_ares_request* (*g_default_dns_lookup_ares_locked)( static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms, bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner); grpc_core::Combiner* combiner);
// Counter incremented by test_resolve_address_impl indicating the number of // Counter incremented by test_resolve_address_impl indicating the number of
// times a system-level resolution has happened. // times a system-level resolution has happened.
@ -95,7 +95,7 @@ static grpc_ares_request* test_dns_lookup_ares_locked(
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms, bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) { grpc_core::Combiner* combiner) {
grpc_ares_request* result = g_default_dns_lookup_ares_locked( grpc_ares_request* result = g_default_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, dns_server, name, default_port, g_iomgr_args.pollset_set, on_done,
addresses, check_grpclb, service_config_json, query_timeout_ms, combiner); addresses, check_grpclb, service_config_json, query_timeout_ms, combiner);
@ -309,9 +309,9 @@ static void test_cooldown() {
grpc_core::New<OnResolutionCallbackArg>(); grpc_core::New<OnResolutionCallbackArg>();
res_cb_arg->uri_str = "dns:127.0.0.1"; res_cb_arg->uri_str = "dns:127.0.0.1";
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, g_combiner->Run(
grpc_combiner_scheduler(g_combiner)), GRPC_CLOSURE_CREATE(start_test_under_combiner, res_cb_arg, nullptr),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&g_iomgr_args); poll_pollset_until_request_done(&g_iomgr_args);
iomgr_args_finish(&g_iomgr_args); iomgr_args_finish(&g_iomgr_args);

@ -28,7 +28,7 @@
#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/combiner.h"
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
static grpc_combiner* g_combiner; static grpc_core::Combiner* g_combiner;
class TestResultHandler : public grpc_core::Resolver::ResultHandler { class TestResultHandler : public grpc_core::Resolver::ResultHandler {
void ReturnResult(grpc_core::Resolver::Result result) override {} void ReturnResult(grpc_core::Resolver::Result result) override {}

@ -63,7 +63,7 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {
}; };
static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver( static grpc_core::OrphanablePtr<grpc_core::Resolver> build_fake_resolver(
grpc_combiner* combiner, grpc_core::Combiner* combiner,
grpc_core::FakeResolverResponseGenerator* response_generator, grpc_core::FakeResolverResponseGenerator* response_generator,
grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> result_handler) { grpc_core::UniquePtr<grpc_core::Resolver::ResultHandler> result_handler) {
grpc_core::ResolverFactory* factory = grpc_core::ResolverFactory* factory =
@ -118,7 +118,7 @@ static grpc_core::Resolver::Result create_new_resolver_result() {
static void test_fake_resolver() { static void test_fake_resolver() {
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create(); grpc_core::Combiner* combiner = grpc_combiner_create();
// Create resolver. // Create resolver.
ResultHandler* result_handler = grpc_core::New<ResultHandler>(); ResultHandler* result_handler = grpc_core::New<ResultHandler>();
grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator> grpc_core::RefCountedPtr<grpc_core::FakeResolverResponseGenerator>

@ -28,7 +28,7 @@
#include "test/core/util/test_config.h" #include "test/core/util/test_config.h"
static grpc_combiner* g_combiner; static grpc_core::Combiner* g_combiner;
class ResultHandler : public grpc_core::Resolver::ResultHandler { class ResultHandler : public grpc_core::Resolver::ResultHandler {
public: public:

@ -69,7 +69,7 @@ struct grpc_end2end_http_proxy {
grpc_pollset* pollset; grpc_pollset* pollset;
gpr_refcount users; gpr_refcount users;
grpc_combiner* combiner; grpc_core::Combiner* combiner;
}; };
// //
@ -89,6 +89,14 @@ typedef struct proxy_connection {
grpc_pollset_set* pollset_set; grpc_pollset_set* pollset_set;
grpc_closure on_read_request_done_hopper;
grpc_closure on_server_connect_done_hopper;
grpc_closure on_write_response_done_hopper;
grpc_closure on_client_read_done_hopper;
grpc_closure on_client_write_done_hopper;
grpc_closure on_server_read_done_hopper;
grpc_closure on_server_write_done_hopper;
// NOTE: All the closures execute under proxy->combiner lock. Which means // NOTE: All the closures execute under proxy->combiner lock. Which means
// there will not be any data-races between the closures // there will not be any data-races between the closures
grpc_closure on_read_request_done; grpc_closure on_read_request_done;
@ -192,6 +200,12 @@ static void proxy_connection_failed(proxy_connection* conn,
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);
} }
static void on_client_write_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_client_write_done,
GRPC_ERROR_REF(error));
}
// Callback for writing proxy data to the client. // Callback for writing proxy data to the client.
static void on_client_write_done(void* arg, grpc_error* error) { static void on_client_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg); proxy_connection* conn = static_cast<proxy_connection*>(arg);
@ -210,13 +224,19 @@ static void on_client_write_done(void* arg, grpc_error* error) {
&conn->client_write_buffer); &conn->client_write_buffer);
conn->client_is_writing = true; conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done, nullptr); &conn->on_client_write_done_hopper, nullptr);
} else { } else {
// No more writes. Unref the connection. // No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done"); proxy_connection_unref(conn, "write_done");
} }
} }
static void on_server_write_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_server_write_done,
GRPC_ERROR_REF(error));
}
// Callback for writing proxy data to the backend server. // Callback for writing proxy data to the backend server.
static void on_server_write_done(void* arg, grpc_error* error) { static void on_server_write_done(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg); proxy_connection* conn = static_cast<proxy_connection*>(arg);
@ -235,13 +255,18 @@ static void on_server_write_done(void* arg, grpc_error* error) {
&conn->server_write_buffer); &conn->server_write_buffer);
conn->server_is_writing = true; conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done, nullptr); &conn->on_server_write_done_hopper, nullptr);
} else { } else {
// No more writes. Unref the connection. // No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write"); proxy_connection_unref(conn, "server_write");
} }
} }
static void on_client_read_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_client_read_done, GRPC_ERROR_REF(error));
}
// Callback for reading data from the client, which will be proxied to // Callback for reading data from the client, which will be proxied to
// the backend server. // the backend server.
static void on_client_read_done(void* arg, grpc_error* error) { static void on_client_read_done(void* arg, grpc_error* error) {
@ -266,11 +291,16 @@ static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read"); proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true; conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
&conn->on_server_write_done, nullptr); &conn->on_server_write_done_hopper, nullptr);
} }
// Read more data. // Read more data.
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false); &conn->on_client_read_done_hopper, /*urgent=*/false);
}
static void on_server_read_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_server_read_done, GRPC_ERROR_REF(error));
} }
// Callback for reading data from the backend server, which will be // Callback for reading data from the backend server, which will be
@ -297,11 +327,17 @@ static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read"); proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true; conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_client_write_done, nullptr); &conn->on_client_write_done_hopper, nullptr);
} }
// Read more data. // Read more data.
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false); &conn->on_server_read_done_hopper, /*urgent=*/false);
}
static void on_write_response_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_write_response_done,
GRPC_ERROR_REF(error));
} }
// Callback to write the HTTP response for the CONNECT request. // Callback to write the HTTP response for the CONNECT request.
@ -322,9 +358,15 @@ static void on_write_response_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read"); proxy_connection_ref(conn, "server_read");
proxy_connection_unref(conn, "write_response"); proxy_connection_unref(conn, "write_response");
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_client_read_done, /*urgent=*/false); &conn->on_client_read_done_hopper, /*urgent=*/false);
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
&conn->on_server_read_done, /*urgent=*/false); &conn->on_server_read_done_hopper, /*urgent=*/false);
}
static void on_server_connect_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_server_connect_done,
GRPC_ERROR_REF(error));
} }
// Callback to connect to the backend server specified by the HTTP // Callback to connect to the backend server specified by the HTTP
@ -349,7 +391,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_buffer_add(&conn->client_write_buffer, slice); grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true; conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
&conn->on_write_response_done, nullptr); &conn->on_write_response_done_hopper, nullptr);
} }
/** /**
@ -372,6 +414,11 @@ static bool proxy_auth_header_matches(char* proxy_auth_header_val,
return header_matches; return header_matches;
} }
static void on_read_request_done_hopper(void* arg, grpc_error* error) {
proxy_connection* conn = static_cast<proxy_connection*>(arg);
conn->proxy->combiner->Run(&conn->on_read_request_done,
GRPC_ERROR_REF(error));
}
// Callback to read the HTTP CONNECT request. // Callback to read the HTTP CONNECT request.
// TODO(roth): Technically, for any of the failure modes handled by this // TODO(roth): Technically, for any of the failure modes handled by this
// function, we should handle the error by returning an HTTP response to // function, we should handle the error by returning an HTTP response to
@ -404,7 +451,7 @@ static void on_read_request_done(void* arg, grpc_error* error) {
// If we're not done reading the request, read more data. // If we're not done reading the request, read more data.
if (conn->http_parser.state != GRPC_HTTP_BODY) { if (conn->http_parser.state != GRPC_HTTP_BODY) {
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false); &conn->on_read_request_done_hopper, /*urgent=*/false);
return; return;
} }
// Make sure we got a CONNECT request. // Make sure we got a CONNECT request.
@ -456,8 +503,8 @@ static void on_read_request_done(void* arg, grpc_error* error) {
// The connection callback inherits our reference to conn. // The connection callback inherits our reference to conn.
const grpc_millis deadline = const grpc_millis deadline =
grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC; grpc_core::ExecCtx::Get()->Now() + 10 * GPR_MS_PER_SEC;
grpc_tcp_client_connect(&conn->on_server_connect_done, &conn->server_endpoint, grpc_tcp_client_connect(&conn->on_server_connect_done_hopper,
conn->pollset_set, nullptr, &conn->server_endpoint, conn->pollset_set, nullptr,
&resolved_addresses->addrs[0], deadline); &resolved_addresses->addrs[0], deadline);
grpc_resolved_addresses_destroy(resolved_addresses); grpc_resolved_addresses_destroy(resolved_addresses);
} }
@ -478,19 +525,33 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset); grpc_pollset_set_add_pollset(conn->pollset_set, proxy->pollset);
grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set); grpc_endpoint_add_to_pollset_set(endpoint, conn->pollset_set);
GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn, GRPC_CLOSURE_INIT(&conn->on_read_request_done, on_read_request_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn, GRPC_CLOSURE_INIT(&conn->on_server_connect_done, on_server_connect_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn, GRPC_CLOSURE_INIT(&conn->on_write_response_done, on_write_response_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_client_read_done, on_client_read_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn, GRPC_CLOSURE_INIT(&conn->on_client_write_done, on_client_write_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn, GRPC_CLOSURE_INIT(&conn->on_server_read_done, on_server_read_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn, GRPC_CLOSURE_INIT(&conn->on_server_write_done, on_server_write_done, conn,
grpc_combiner_scheduler(conn->proxy->combiner)); grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&conn->on_read_request_done_hopper,
on_read_request_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_connect_done_hopper,
on_server_connect_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_write_response_done_hopper,
on_write_response_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_client_read_done_hopper,
on_client_read_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_client_write_done_hopper,
on_client_write_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_read_done_hopper,
on_server_read_done_hopper, conn, nullptr);
GRPC_CLOSURE_INIT(&conn->on_server_write_done_hopper,
on_server_write_done_hopper, conn, nullptr);
grpc_slice_buffer_init(&conn->client_read_buffer); grpc_slice_buffer_init(&conn->client_read_buffer);
grpc_slice_buffer_init(&conn->client_deferred_write_buffer); grpc_slice_buffer_init(&conn->client_deferred_write_buffer);
conn->client_is_writing = false; conn->client_is_writing = false;
@ -502,7 +563,7 @@ static void on_accept(void* arg, grpc_endpoint* endpoint,
grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST, grpc_http_parser_init(&conn->http_parser, GRPC_HTTP_REQUEST,
&conn->http_request); &conn->http_request);
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
&conn->on_read_request_done, /*urgent=*/false); &conn->on_read_request_done_hopper, /*urgent=*/false);
} }
// //

@ -379,7 +379,7 @@ grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout, bool check_grpclb, char** service_config_json, int query_timeout,
grpc_combiner* combiner) { grpc_core::Combiner* combiner) {
addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r))); addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r)));
r->addr = gpr_strdup(addr); r->addr = gpr_strdup(addr);
r->on_done = on_done; r->on_done = on_done;

@ -49,7 +49,7 @@ static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms, bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner); grpc_core::Combiner* combiner);
static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request); static void (*iomgr_cancel_ares_request_locked)(grpc_ares_request* request);
@ -106,7 +106,7 @@ static grpc_ares_request* my_dns_lookup_ares_locked(
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses, grpc_core::UniquePtr<grpc_core::ServerAddressList>* addresses,
bool check_grpclb, char** service_config_json, int query_timeout_ms, bool check_grpclb, char** service_config_json, int query_timeout_ms,
grpc_combiner* combiner) { grpc_core::Combiner* combiner) {
if (0 != strcmp(addr, "test")) { if (0 != strcmp(addr, "test")) {
return iomgr_dns_lookup_ares_locked( return iomgr_dns_lookup_ares_locked(
dns_server, addr, default_port, interested_parties, on_done, addresses, dns_server, addr, default_port, interested_parties, on_done, addresses,

@ -39,13 +39,12 @@ static void set_event_to_true(void* value, grpc_error* error) {
static void test_execute_one(void) { static void test_execute_one(void) {
gpr_log(GPR_DEBUG, "test_execute_one"); gpr_log(GPR_DEBUG, "test_execute_one");
grpc_combiner* lock = grpc_combiner_create(); grpc_core::Combiner* lock = grpc_combiner_create();
gpr_event done; gpr_event done;
gpr_event_init(&done); gpr_event_init(&done);
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &done, lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &done, nullptr),
grpc_combiner_scheduler(lock)), GRPC_ERROR_NONE);
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) != GPR_ASSERT(gpr_event_wait(&done, grpc_timeout_seconds_to_deadline(5)) !=
nullptr); nullptr);
@ -54,7 +53,7 @@ static void test_execute_one(void) {
typedef struct { typedef struct {
size_t ctr; size_t ctr;
grpc_combiner* lock; grpc_core::Combiner* lock;
gpr_event done; gpr_event done;
} thd_args; } thd_args;
@ -79,24 +78,22 @@ static void execute_many_loop(void* a) {
ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c))); ex_args* c = static_cast<ex_args*>(gpr_malloc(sizeof(*c)));
c->ctr = &args->ctr; c->ctr = &args->ctr;
c->value = n++; c->value = n++;
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE( args->lock->Run(GRPC_CLOSURE_CREATE(check_one, c, nullptr),
check_one, c, grpc_combiner_scheduler(args->lock)), GRPC_ERROR_NONE);
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
// sleep for a little bit, to test a combiner draining and another thread // sleep for a little bit, to test a combiner draining and another thread
// picking it up // picking it up
gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100)); gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(100));
} }
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, args->lock->Run(GRPC_CLOSURE_CREATE(set_event_to_true, &args->done, nullptr),
grpc_combiner_scheduler(args->lock)), GRPC_ERROR_NONE);
GRPC_ERROR_NONE);
} }
static void test_execute_many(void) { static void test_execute_many(void) {
gpr_log(GPR_DEBUG, "test_execute_many"); gpr_log(GPR_DEBUG, "test_execute_many");
grpc_combiner* lock = grpc_combiner_create(); grpc_core::Combiner* lock = grpc_combiner_create();
grpc_core::Thread thds[100]; grpc_core::Thread thds[100];
thd_args ta[GPR_ARRAY_SIZE(thds)]; thd_args ta[GPR_ARRAY_SIZE(thds)];
for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) { for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
@ -122,21 +119,17 @@ static void in_finally(void* arg, grpc_error* error) {
} }
static void add_finally(void* arg, grpc_error* error) { static void add_finally(void* arg, grpc_error* error) {
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(in_finally, arg, static_cast<grpc_core::Combiner*>(arg)->Run(
grpc_combiner_finally_scheduler( GRPC_CLOSURE_CREATE(in_finally, arg, nullptr), GRPC_ERROR_NONE);
static_cast<grpc_combiner*>(arg))),
GRPC_ERROR_NONE);
} }
static void test_execute_finally(void) { static void test_execute_finally(void) {
gpr_log(GPR_DEBUG, "test_execute_finally"); gpr_log(GPR_DEBUG, "test_execute_finally");
grpc_combiner* lock = grpc_combiner_create(); grpc_core::Combiner* lock = grpc_combiner_create();
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
gpr_event_init(&got_in_finally); gpr_event_init(&got_in_finally);
GRPC_CLOSURE_SCHED( lock->Run(GRPC_CLOSURE_CREATE(add_finally, lock, nullptr), GRPC_ERROR_NONE);
GRPC_CLOSURE_CREATE(add_finally, lock, grpc_combiner_scheduler(lock)),
GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&got_in_finally, GPR_ASSERT(gpr_event_wait(&got_in_finally,
grpc_timeout_seconds_to_deadline(5)) != nullptr); grpc_timeout_seconds_to_deadline(5)) != nullptr);

@ -65,12 +65,12 @@ BENCHMARK(BM_ClosureInitAgainstExecCtx);
static void BM_ClosureInitAgainstCombiner(benchmark::State& state) { static void BM_ClosureInitAgainstCombiner(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner = grpc_combiner_create(); grpc_core::Combiner* combiner = grpc_combiner_create();
grpc_closure c; grpc_closure c;
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
benchmark::DoNotOptimize(GRPC_CLOSURE_INIT( benchmark::DoNotOptimize(
&c, DoNothing, nullptr, grpc_combiner_scheduler(combiner))); GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, nullptr));
} }
GRPC_COMBINER_UNREF(combiner, "finished"); GRPC_COMBINER_UNREF(combiner, "finished");
@ -242,12 +242,12 @@ BENCHMARK(BM_TryAcquireSpinlock);
static void BM_ClosureSchedOnCombiner(benchmark::State& state) { static void BM_ClosureSchedOnCombiner(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner = grpc_combiner_create(); grpc_core::Combiner* combiner = grpc_combiner_create();
grpc_closure c; grpc_closure c;
GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c, DoNothing, nullptr, nullptr);
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c, GRPC_ERROR_NONE); combiner->Run(&c, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
GRPC_COMBINER_UNREF(combiner, "finished"); GRPC_COMBINER_UNREF(combiner, "finished");
@ -258,15 +258,15 @@ BENCHMARK(BM_ClosureSchedOnCombiner);
static void BM_ClosureSched2OnCombiner(benchmark::State& state) { static void BM_ClosureSched2OnCombiner(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner = grpc_combiner_create(); grpc_core::Combiner* combiner = grpc_combiner_create();
grpc_closure c1; grpc_closure c1;
grpc_closure c2; grpc_closure c2;
GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr);
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); combiner->Run(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); combiner->Run(&c2, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
GRPC_COMBINER_UNREF(combiner, "finished"); GRPC_COMBINER_UNREF(combiner, "finished");
@ -277,18 +277,18 @@ BENCHMARK(BM_ClosureSched2OnCombiner);
static void BM_ClosureSched3OnCombiner(benchmark::State& state) { static void BM_ClosureSched3OnCombiner(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner = grpc_combiner_create(); grpc_core::Combiner* combiner = grpc_combiner_create();
grpc_closure c1; grpc_closure c1;
grpc_closure c2; grpc_closure c2;
grpc_closure c3; grpc_closure c3;
GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, grpc_combiner_scheduler(combiner)); GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, nullptr);
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); combiner->Run(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); combiner->Run(&c2, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); combiner->Run(&c3, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
GRPC_COMBINER_UNREF(combiner, "finished"); GRPC_COMBINER_UNREF(combiner, "finished");
@ -299,18 +299,16 @@ BENCHMARK(BM_ClosureSched3OnCombiner);
static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) { static void BM_ClosureSched2OnTwoCombiners(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner1 = grpc_combiner_create(); grpc_core::Combiner* combiner1 = grpc_combiner_create();
grpc_combiner* combiner2 = grpc_combiner_create(); grpc_core::Combiner* combiner2 = grpc_combiner_create();
grpc_closure c1; grpc_closure c1;
grpc_closure c2; grpc_closure c2;
GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr);
grpc_combiner_scheduler(combiner1)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr,
grpc_combiner_scheduler(combiner2));
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); combiner1->Run(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); combiner2->Run(&c2, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
GRPC_COMBINER_UNREF(combiner1, "finished"); GRPC_COMBINER_UNREF(combiner1, "finished");
@ -322,26 +320,22 @@ BENCHMARK(BM_ClosureSched2OnTwoCombiners);
static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) { static void BM_ClosureSched4OnTwoCombiners(benchmark::State& state) {
TrackCounters track_counters; TrackCounters track_counters;
grpc_combiner* combiner1 = grpc_combiner_create(); grpc_core::Combiner* combiner1 = grpc_combiner_create();
grpc_combiner* combiner2 = grpc_combiner_create(); grpc_core::Combiner* combiner2 = grpc_combiner_create();
grpc_closure c1; grpc_closure c1;
grpc_closure c2; grpc_closure c2;
grpc_closure c3; grpc_closure c3;
grpc_closure c4; grpc_closure c4;
GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, GRPC_CLOSURE_INIT(&c1, DoNothing, nullptr, nullptr);
grpc_combiner_scheduler(combiner1)); GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c2, DoNothing, nullptr, GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr, nullptr);
grpc_combiner_scheduler(combiner2)); GRPC_CLOSURE_INIT(&c4, DoNothing, nullptr, nullptr);
GRPC_CLOSURE_INIT(&c3, DoNothing, nullptr,
grpc_combiner_scheduler(combiner1));
GRPC_CLOSURE_INIT(&c4, DoNothing, nullptr,
grpc_combiner_scheduler(combiner2));
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;
for (auto _ : state) { for (auto _ : state) {
GRPC_CLOSURE_SCHED(&c1, GRPC_ERROR_NONE); combiner1->Run(&c1, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c2, GRPC_ERROR_NONE); combiner2->Run(&c2, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c3, GRPC_ERROR_NONE); combiner1->Run(&c3, GRPC_ERROR_NONE);
GRPC_CLOSURE_SCHED(&c4, GRPC_ERROR_NONE); combiner2->Run(&c4, GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
} }
GRPC_COMBINER_UNREF(combiner1, "finished"); GRPC_COMBINER_UNREF(combiner1, "finished");
@ -390,32 +384,6 @@ static void BM_ClosureReschedOnExecCtx(benchmark::State& state) {
} }
BENCHMARK(BM_ClosureReschedOnExecCtx); BENCHMARK(BM_ClosureReschedOnExecCtx);
static void BM_ClosureReschedOnCombiner(benchmark::State& state) {
TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create();
Rescheduler r(state, grpc_combiner_scheduler(combiner));
r.ScheduleFirst();
grpc_core::ExecCtx::Get()->Flush();
GRPC_COMBINER_UNREF(combiner, "finished");
track_counters.Finish(state);
}
BENCHMARK(BM_ClosureReschedOnCombiner);
static void BM_ClosureReschedOnCombinerFinally(benchmark::State& state) {
TrackCounters track_counters;
grpc_core::ExecCtx exec_ctx;
grpc_combiner* combiner = grpc_combiner_create();
Rescheduler r(state, grpc_combiner_finally_scheduler(combiner));
r.ScheduleFirstAgainstDifferentScheduler(grpc_combiner_scheduler(combiner));
grpc_core::ExecCtx::Get()->Flush();
GRPC_COMBINER_UNREF(combiner, "finished");
track_counters.Finish(state);
}
BENCHMARK(BM_ClosureReschedOnCombinerFinally);
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace, // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes. // and others do not. This allows us to support both modes.
namespace benchmark { namespace benchmark {

@ -81,7 +81,7 @@ struct ArgsStruct {
gpr_mu* mu; gpr_mu* mu;
grpc_pollset* pollset; grpc_pollset* pollset;
grpc_pollset_set* pollset_set; grpc_pollset_set* pollset_set;
grpc_combiner* lock; grpc_core::Combiner* lock;
grpc_channel_args* channel_args; grpc_channel_args* channel_args;
}; };

@ -192,7 +192,7 @@ struct ArgsStruct {
gpr_mu* mu; gpr_mu* mu;
grpc_pollset* pollset; grpc_pollset* pollset;
grpc_pollset_set* pollset_set; grpc_pollset_set* pollset_set;
grpc_combiner* lock; grpc_core::Combiner* lock;
grpc_channel_args* channel_args; grpc_channel_args* channel_args;
vector<GrpcLBAddress> expected_addrs; vector<GrpcLBAddress> expected_addrs;
std::string expected_service_config_string; std::string expected_service_config_string;
@ -616,9 +616,9 @@ void RunResolvesRelevantRecordsTest(
CreateResultHandler(&args)); CreateResultHandler(&args));
grpc_channel_args_destroy(resolver_args); grpc_channel_args_destroy(resolver_args);
gpr_free(whole_uri); gpr_free(whole_uri);
GRPC_CLOSURE_SCHED(GRPC_CLOSURE_CREATE(StartResolvingLocked, resolver.get(), args.lock->Run(
grpc_combiner_scheduler(args.lock)), GRPC_CLOSURE_CREATE(StartResolvingLocked, resolver.get(), nullptr),
GRPC_ERROR_NONE); GRPC_ERROR_NONE);
grpc_core::ExecCtx::Get()->Flush(); grpc_core::ExecCtx::Get()->Flush();
PollPollsetUntilRequestDone(&args); PollPollsetUntilRequestDone(&args);
ArgsFinish(&args); ArgsFinish(&args);

Loading…
Cancel
Save