Merge branch 'master' of github.com:grpc/grpc into nanopb_build_cleanup

pull/15595/head
David Garcia Quintas 7 years ago
commit 657a3eb98b
  1. 1
      build.yaml
  2. 2
      doc/PROTOCOL-WEB.md
  3. 9
      examples/python/helloworld/greeter_client.py
  4. 12
      examples/python/interceptors/default_value/greeter_client.py
  5. 12
      examples/python/interceptors/headers/greeter_client.py
  6. 31
      examples/python/multiplex/multiplex_client.py
  7. 23
      examples/python/route_guide/route_guide_client.py
  8. 4
      include/grpcpp/server_builder.h
  9. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  10. 14
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
  11. 93
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
  12. 155
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  13. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
  14. 11
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
  15. 29
      src/core/lib/channel/handshaker.cc
  16. 65
      src/core/lib/iomgr/ev_epollex_linux.cc
  17. 1
      src/core/lib/iomgr/is_epollexclusive_available.cc
  18. 5
      src/core/tsi/ssl_transport_security.cc
  19. 3
      src/php/README.md
  20. 2
      src/python/grpcio_tests/commands.py
  21. 8
      test/core/client_channel/resolvers/dns_resolver_connectivity_test.cc
  22. 18
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  23. 14
      test/core/end2end/fuzzers/api_fuzzer.cc
  24. 20
      test/core/end2end/goaway_server_test.cc
  25. 1
      third_party/cares/BUILD
  26. 22
      third_party/cares/cares.BUILD
  27. 423
      third_party/cares/config_windows/ares_config.h
  28. 2
      tools/run_tests/generated/tests.json
  29. 59
      tools/run_tests/python_utils/upload_test_results.py

@ -2032,6 +2032,7 @@ targets:
dict: test/core/end2end/fuzzers/api_fuzzer.dictionary dict: test/core/end2end/fuzzers/api_fuzzer.dictionary
maxlen: 2048 maxlen: 2048
- name: arena_test - name: arena_test
cpu_cost: 10
build: test build: test
language: c language: c
src: src:

@ -138,4 +138,4 @@ Versioning
Browser-specific features Browser-specific features
* For features that are unique to browser or HTML clients, check the [spec doc](https://github.com/grpc/grpc-web/blob/master/PROTOCOL-WEB.md) published in the grpc/grpc-web repo. * For features that are unique to browser or HTML clients, check the [spec doc](https://github.com/grpc/grpc-web/blob/master/BROWSER-FEATURES.md) published in the grpc/grpc-web repo.

@ -22,9 +22,12 @@ import helloworld_pb2_grpc
def run(): def run():
channel = grpc.insecure_channel('localhost:50051') # NOTE(gRPC Python Team): .close() is possible on a channel and should be
stub = helloworld_pb2_grpc.GreeterStub(channel) # used in circumstances in which the with statement does not fit the needs
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) # of the code.
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message) print("Greeter client received: " + response.message)

@ -27,10 +27,14 @@ def run():
message='Hello from your local interceptor!') message='Hello from your local interceptor!')
default_value_interceptor = default_value_client_interceptor.DefaultValueClientInterceptor( default_value_interceptor = default_value_client_interceptor.DefaultValueClientInterceptor(
default_value) default_value)
channel = grpc.insecure_channel('localhost:50051') # NOTE(gRPC Python Team): .close() is possible on a channel and should be
channel = grpc.intercept_channel(channel, default_value_interceptor) # used in circumstances in which the with statement does not fit the needs
stub = helloworld_pb2_grpc.GreeterStub(channel) # of the code.
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) with grpc.insecure_channel('localhost:50051') as channel:
intercept_channel = grpc.intercept_channel(channel,
default_value_interceptor)
stub = helloworld_pb2_grpc.GreeterStub(intercept_channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message) print("Greeter client received: " + response.message)

@ -25,10 +25,14 @@ import header_manipulator_client_interceptor
def run(): def run():
header_adder_interceptor = header_manipulator_client_interceptor.header_adder_interceptor( header_adder_interceptor = header_manipulator_client_interceptor.header_adder_interceptor(
'one-time-password', '42') 'one-time-password', '42')
channel = grpc.insecure_channel('localhost:50051') # NOTE(gRPC Python Team): .close() is possible on a channel and should be
channel = grpc.intercept_channel(channel, header_adder_interceptor) # used in circumstances in which the with statement does not fit the needs
stub = helloworld_pb2_grpc.GreeterStub(channel) # of the code.
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you')) with grpc.insecure_channel('localhost:50051') as channel:
intercept_channel = grpc.intercept_channel(channel,
header_adder_interceptor)
stub = helloworld_pb2_grpc.GreeterStub(intercept_channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
print("Greeter client received: " + response.message) print("Greeter client received: " + response.message)

@ -106,20 +106,23 @@ def guide_route_chat(route_guide_stub):
def run(): def run():
channel = grpc.insecure_channel('localhost:50051') # NOTE(gRPC Python Team): .close() is possible on a channel and should be
greeter_stub = helloworld_pb2_grpc.GreeterStub(channel) # used in circumstances in which the with statement does not fit the needs
route_guide_stub = route_guide_pb2_grpc.RouteGuideStub(channel) # of the code.
greeter_response = greeter_stub.SayHello( with grpc.insecure_channel('localhost:50051') as channel:
helloworld_pb2.HelloRequest(name='you')) greeter_stub = helloworld_pb2_grpc.GreeterStub(channel)
print("Greeter client received: " + greeter_response.message) route_guide_stub = route_guide_pb2_grpc.RouteGuideStub(channel)
print("-------------- GetFeature --------------") greeter_response = greeter_stub.SayHello(
guide_get_feature(route_guide_stub) helloworld_pb2.HelloRequest(name='you'))
print("-------------- ListFeatures --------------") print("Greeter client received: " + greeter_response.message)
guide_list_features(route_guide_stub) print("-------------- GetFeature --------------")
print("-------------- RecordRoute --------------") guide_get_feature(route_guide_stub)
guide_record_route(route_guide_stub) print("-------------- ListFeatures --------------")
print("-------------- RouteChat --------------") guide_list_features(route_guide_stub)
guide_route_chat(route_guide_stub) print("-------------- RecordRoute --------------")
guide_record_route(route_guide_stub)
print("-------------- RouteChat --------------")
guide_route_chat(route_guide_stub)
if __name__ == '__main__': if __name__ == '__main__':

@ -100,16 +100,19 @@ def guide_route_chat(stub):
def run(): def run():
channel = grpc.insecure_channel('localhost:50051') # NOTE(gRPC Python Team): .close() is possible on a channel and should be
stub = route_guide_pb2_grpc.RouteGuideStub(channel) # used in circumstances in which the with statement does not fit the needs
print("-------------- GetFeature --------------") # of the code.
guide_get_feature(stub) with grpc.insecure_channel('localhost:50051') as channel:
print("-------------- ListFeatures --------------") stub = route_guide_pb2_grpc.RouteGuideStub(channel)
guide_list_features(stub) print("-------------- GetFeature --------------")
print("-------------- RecordRoute --------------") guide_get_feature(stub)
guide_record_route(stub) print("-------------- ListFeatures --------------")
print("-------------- RouteChat --------------") guide_list_features(stub)
guide_route_chat(stub) print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)
if __name__ == '__main__': if __name__ == '__main__':

@ -86,8 +86,8 @@ class ServerBuilder {
/// \param creds The credentials associated with the server. /// \param creds The credentials associated with the server.
/// \param selected_port[out] If not `nullptr`, gets populated with the port /// \param selected_port[out] If not `nullptr`, gets populated with the port
/// number bound to the \a grpc::Server for the corresponding endpoint after /// number bound to the \a grpc::Server for the corresponding endpoint after
/// it is successfully bound, 0 otherwise. /// it is successfully bound by BuildAndStart(), 0 otherwise. AddListeningPort
/// /// does not modify this pointer.
ServerBuilder& AddListeningPort(const grpc::string& addr_uri, ServerBuilder& AddListeningPort(const grpc::string& addr_uri,
std::shared_ptr<ServerCredentials> creds, std::shared_ptr<ServerCredentials> creds,
int* selected_port = nullptr); int* selected_port = nullptr);

@ -414,10 +414,10 @@ void AresDnsResolver::StartResolvingLocked() {
resolving_ = true; resolving_ = true;
lb_addresses_ = nullptr; lb_addresses_ = nullptr;
service_config_json_ = nullptr; service_config_json_ = nullptr;
pending_request_ = grpc_dns_lookup_ares( pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_, dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
&on_resolved_, &lb_addresses_, true /* check_grpclb */, &on_resolved_, &lb_addresses_, true /* check_grpclb */,
request_service_config_ ? &service_config_json_ : nullptr); request_service_config_ ? &service_config_json_ : nullptr, combiner());
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now(); last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
} }

@ -29,25 +29,27 @@ typedef struct grpc_ares_ev_driver grpc_ares_ev_driver;
/* Start \a ev_driver. It will keep working until all IO on its ares_channel is /* Start \a ev_driver. It will keep working until all IO on its ares_channel is
done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks done, or grpc_ares_ev_driver_destroy() is called. It may notify the callbacks
bound to its ares_channel when necessary. */ bound to its ares_channel when necessary. */
void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver);
/* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to /* Returns the ares_channel owned by \a ev_driver. To bind a c-ares query to
\a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the \a ev_driver, use the ares_channel owned by \a ev_driver as the arg of the
query. */ query. */
ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver); ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver);
/* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is /* Creates a new grpc_ares_ev_driver. Returns GRPC_ERROR_NONE if \a ev_driver is
created successfully. */ created successfully. */
grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set); grpc_pollset_set* pollset_set,
grpc_combiner* combiner);
/* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver /* Destroys \a ev_driver asynchronously. Pending lookups made on \a ev_driver
will be cancelled and their on_done callbacks will be invoked with a status will be cancelled and their on_done callbacks will be invoked with a status
of ARES_ECANCELLED. */ of ARES_ECANCELLED. */
void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver);
/* Shutdown all the grpc_fds used by \a ev_driver */ /* Shutdown all the grpc_fds used by \a ev_driver */
void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver); void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver);
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \ #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_RESOLVER_DNS_C_ARES_GRPC_ARES_EV_DRIVER_H \
*/ */

@ -39,17 +39,15 @@
typedef struct fd_node { typedef struct fd_node {
/** the owner of this fd node */ /** the owner of this fd node */
grpc_ares_ev_driver* ev_driver; grpc_ares_ev_driver* ev_driver;
/** a closure wrapping on_readable_cb, which should be invoked when the /** a closure wrapping on_readable_locked, which should be
grpc_fd in this node becomes readable. */ invoked when the grpc_fd in this node becomes readable. */
grpc_closure read_closure; grpc_closure read_closure;
/** a closure wrapping on_writable_cb, which should be invoked when the /** a closure wrapping on_writable_locked, which should be
grpc_fd in this node becomes writable. */ invoked when the grpc_fd in this node becomes writable. */
grpc_closure write_closure; grpc_closure write_closure;
/** next fd node in the list */ /** next fd node in the list */
struct fd_node* next; struct fd_node* next;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** the grpc_fd owned by this fd node */ /** the grpc_fd owned by this fd node */
grpc_fd* fd; grpc_fd* fd;
/** if the readable closure has been registered */ /** if the readable closure has been registered */
@ -68,8 +66,8 @@ struct grpc_ares_ev_driver {
/** refcount of the event driver */ /** refcount of the event driver */
gpr_refcount refs; gpr_refcount refs;
/** mutex guarding the rest of the state */ /** combiner to synchronize c-ares and I/O callbacks on */
gpr_mu mu; grpc_combiner* combiner;
/** a list of grpc_fd that this event driver is currently using. */ /** a list of grpc_fd that this event driver is currently using. */
fd_node* fds; fd_node* fds;
/** is this event driver currently working? */ /** is this event driver currently working? */
@ -92,19 +90,18 @@ static void grpc_ares_ev_driver_unref(grpc_ares_ev_driver* ev_driver) {
if (gpr_unref(&ev_driver->refs)) { if (gpr_unref(&ev_driver->refs)) {
gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver); gpr_log(GPR_DEBUG, "destroy ev_driver %" PRIuPTR, (uintptr_t)ev_driver);
GPR_ASSERT(ev_driver->fds == nullptr); GPR_ASSERT(ev_driver->fds == nullptr);
gpr_mu_destroy(&ev_driver->mu); GRPC_COMBINER_UNREF(ev_driver->combiner, "free ares event driver");
ares_destroy(ev_driver->channel); ares_destroy(ev_driver->channel);
gpr_free(ev_driver); gpr_free(ev_driver);
} }
} }
static void fd_node_destroy(fd_node* fdn) { static void fd_node_destroy_locked(fd_node* fdn) {
gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd)); gpr_log(GPR_DEBUG, "delete fd: %d", grpc_fd_wrapped_fd(fdn->fd));
GPR_ASSERT(!fdn->readable_registered); GPR_ASSERT(!fdn->readable_registered);
GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(!fdn->writable_registered);
GPR_ASSERT(fdn->already_shutdown); GPR_ASSERT(fdn->already_shutdown);
gpr_mu_destroy(&fdn->mu); /* c-ares library will close the fd inside grpc_fd. This fd may be picked up
/* c-ares library has closed the fd inside grpc_fd. This fd may be picked up
immediately by another thread, and should not be closed by the following immediately by another thread, and should not be closed by the following
grpc_fd_orphan. */ grpc_fd_orphan. */
int dummy_release_fd; int dummy_release_fd;
@ -119,15 +116,16 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
} }
} }
grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver, grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set) { grpc_pollset_set* pollset_set,
grpc_combiner* combiner) {
*ev_driver = static_cast<grpc_ares_ev_driver*>( *ev_driver = static_cast<grpc_ares_ev_driver*>(
gpr_malloc(sizeof(grpc_ares_ev_driver))); gpr_malloc(sizeof(grpc_ares_ev_driver)));
ares_options opts; ares_options opts;
memset(&opts, 0, sizeof(opts)); memset(&opts, 0, sizeof(opts));
opts.flags |= ARES_FLAG_STAYOPEN; opts.flags |= ARES_FLAG_STAYOPEN;
int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS); int status = ares_init_options(&(*ev_driver)->channel, &opts, ARES_OPT_FLAGS);
gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create"); gpr_log(GPR_DEBUG, "grpc_ares_ev_driver_create_locked");
if (status != ARES_SUCCESS) { if (status != ARES_SUCCESS) {
char* err_msg; char* err_msg;
gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s", gpr_asprintf(&err_msg, "Failed to init ares channel. C-ares error: %s",
@ -137,7 +135,7 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
gpr_free(*ev_driver); gpr_free(*ev_driver);
return err; return err;
} }
gpr_mu_init(&(*ev_driver)->mu); (*ev_driver)->combiner = GRPC_COMBINER_REF(combiner, "ares event driver");
gpr_ref_init(&(*ev_driver)->refs, 1); gpr_ref_init(&(*ev_driver)->refs, 1);
(*ev_driver)->pollset_set = pollset_set; (*ev_driver)->pollset_set = pollset_set;
(*ev_driver)->fds = nullptr; (*ev_driver)->fds = nullptr;
@ -146,34 +144,26 @@ grpc_error* grpc_ares_ev_driver_create(grpc_ares_ev_driver** ev_driver,
return GRPC_ERROR_NONE; return GRPC_ERROR_NONE;
} }
void grpc_ares_ev_driver_destroy(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_destroy_locked(grpc_ares_ev_driver* ev_driver) {
// It's not safe to shut down remaining fds here directly, becauses // We mark the event driver as being shut down. If the event driver
// ares_host_callback does not provide an exec_ctx. We mark the event driver // is working, grpc_ares_notify_on_event_locked will shut down the
// as being shut down. If the event driver is working, // fds; if it's not working, there are no fds to shut down.
// grpc_ares_notify_on_event_locked will shut down the fds; if it's not
// working, there are no fds to shut down.
gpr_mu_lock(&ev_driver->mu);
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
void grpc_ares_ev_driver_shutdown(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_shutdown_locked(grpc_ares_ev_driver* ev_driver) {
gpr_mu_lock(&ev_driver->mu);
ev_driver->shutting_down = true; ev_driver->shutting_down = true;
fd_node* fn = ev_driver->fds; fd_node* fn = ev_driver->fds;
while (fn != nullptr) { while (fn != nullptr) {
gpr_mu_lock(&fn->mu);
fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown"); fd_node_shutdown_locked(fn, "grpc_ares_ev_driver_shutdown");
gpr_mu_unlock(&fn->mu);
fn = fn->next; fn = fn->next;
} }
gpr_mu_unlock(&ev_driver->mu);
} }
// Search fd in the fd_node list head. This is an O(n) search, the max possible // Search fd in the fd_node list head. This is an O(n) search, the max possible
// value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests. // value of n is ARES_GETSOCK_MAXNUM (16). n is typically 1 - 2 in our tests.
static fd_node* pop_fd_node(fd_node** head, int fd) { static fd_node* pop_fd_node_locked(fd_node** head, int fd) {
fd_node dummy_head; fd_node dummy_head;
dummy_head.next = *head; dummy_head.next = *head;
fd_node* node = &dummy_head; fd_node* node = &dummy_head;
@ -190,24 +180,22 @@ static fd_node* pop_fd_node(fd_node** head, int fd) {
} }
/* Check if \a fd is still readable */ /* Check if \a fd is still readable */
static bool grpc_ares_is_fd_still_readable(grpc_ares_ev_driver* ev_driver, static bool grpc_ares_is_fd_still_readable_locked(
int fd) { grpc_ares_ev_driver* ev_driver, int fd) {
size_t bytes_available = 0; size_t bytes_available = 0;
return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0; return ioctl(fd, FIONREAD, &bytes_available) == 0 && bytes_available > 0;
} }
static void on_readable_cb(void* arg, grpc_error* error) { static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg); fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->readable_registered = false; fdn->readable_registered = false;
gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "readable on %d", fd); gpr_log(GPR_DEBUG, "readable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
do { do {
ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD); ares_process_fd(ev_driver->channel, fd, ARES_SOCKET_BAD);
} while (grpc_ares_is_fd_still_readable(ev_driver, fd)); } while (grpc_ares_is_fd_still_readable_locked(ev_driver, fd));
} else { } else {
// If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or // If error is not GRPC_ERROR_NONE, it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled // timed out. The pending lookups made on this ev_driver will be cancelled
@ -217,19 +205,15 @@ static void on_readable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
static void on_writable_cb(void* arg, grpc_error* error) { static void on_writable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg); fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver; grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
gpr_mu_lock(&fdn->mu);
const int fd = grpc_fd_wrapped_fd(fdn->fd); const int fd = grpc_fd_wrapped_fd(fdn->fd);
fdn->writable_registered = false; fdn->writable_registered = false;
gpr_mu_unlock(&fdn->mu);
gpr_log(GPR_DEBUG, "writable on %d", fd); gpr_log(GPR_DEBUG, "writable on %d", fd);
if (error == GRPC_ERROR_NONE) { if (error == GRPC_ERROR_NONE) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd); ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, fd);
@ -242,13 +226,12 @@ static void on_writable_cb(void* arg, grpc_error* error) {
// grpc_ares_notify_on_event_locked(). // grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel); ares_cancel(ev_driver->channel);
} }
gpr_mu_lock(&ev_driver->mu);
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
gpr_mu_unlock(&ev_driver->mu);
grpc_ares_ev_driver_unref(ev_driver); grpc_ares_ev_driver_unref(ev_driver);
} }
ares_channel* grpc_ares_ev_driver_get_channel(grpc_ares_ev_driver* ev_driver) { ares_channel* grpc_ares_ev_driver_get_channel_locked(
grpc_ares_ev_driver* ev_driver) {
return &ev_driver->channel; return &ev_driver->channel;
} }
@ -263,7 +246,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) { for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
if (ARES_GETSOCK_READABLE(socks_bitmask, i) || if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
ARES_GETSOCK_WRITABLE(socks_bitmask, i)) { ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
fd_node* fdn = pop_fd_node(&ev_driver->fds, socks[i]); fd_node* fdn = pop_fd_node_locked(&ev_driver->fds, socks[i]);
// Create a new fd_node if sock[i] is not in the fd_node list. // Create a new fd_node if sock[i] is not in the fd_node list.
if (fdn == nullptr) { if (fdn == nullptr) {
char* fd_name; char* fd_name;
@ -275,17 +258,15 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
fdn->readable_registered = false; fdn->readable_registered = false;
fdn->writable_registered = false; fdn->writable_registered = false;
fdn->already_shutdown = false; fdn->already_shutdown = false;
gpr_mu_init(&fdn->mu); GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_locked, fdn,
GRPC_CLOSURE_INIT(&fdn->read_closure, on_readable_cb, fdn, grpc_combiner_scheduler(ev_driver->combiner));
grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_locked, fdn,
GRPC_CLOSURE_INIT(&fdn->write_closure, on_writable_cb, fdn, grpc_combiner_scheduler(ev_driver->combiner));
grpc_schedule_on_exec_ctx);
grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd); grpc_pollset_set_add_fd(ev_driver->pollset_set, fdn->fd);
gpr_free(fd_name); gpr_free(fd_name);
} }
fdn->next = new_list; fdn->next = new_list;
new_list = fdn; new_list = fdn;
gpr_mu_lock(&fdn->mu);
// Register read_closure if the socket is readable and read_closure has // Register read_closure if the socket is readable and read_closure has
// not been registered with this socket. // not been registered with this socket.
if (ARES_GETSOCK_READABLE(socks_bitmask, i) && if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
@ -305,7 +286,6 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure); grpc_fd_notify_on_write(fdn->fd, &fdn->write_closure);
fdn->writable_registered = true; fdn->writable_registered = true;
} }
gpr_mu_unlock(&fdn->mu);
} }
} }
} }
@ -315,15 +295,12 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
while (ev_driver->fds != nullptr) { while (ev_driver->fds != nullptr) {
fd_node* cur = ev_driver->fds; fd_node* cur = ev_driver->fds;
ev_driver->fds = ev_driver->fds->next; ev_driver->fds = ev_driver->fds->next;
gpr_mu_lock(&cur->mu);
fd_node_shutdown_locked(cur, "c-ares fd shutdown"); fd_node_shutdown_locked(cur, "c-ares fd shutdown");
if (!cur->readable_registered && !cur->writable_registered) { if (!cur->readable_registered && !cur->writable_registered) {
gpr_mu_unlock(&cur->mu); fd_node_destroy_locked(cur);
fd_node_destroy(cur);
} else { } else {
cur->next = new_list; cur->next = new_list;
new_list = cur; new_list = cur;
gpr_mu_unlock(&cur->mu);
} }
} }
ev_driver->fds = new_list; ev_driver->fds = new_list;
@ -334,13 +311,11 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) {
} }
} }
void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
gpr_mu_lock(&ev_driver->mu);
if (!ev_driver->working) { if (!ev_driver->working) {
ev_driver->working = true; ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver); grpc_ares_notify_on_event_locked(ev_driver);
} }
gpr_mu_unlock(&ev_driver->mu);
} }
#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ #endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */

@ -65,8 +65,6 @@ struct grpc_ares_request {
/** number of ongoing queries */ /** number of ongoing queries */
gpr_refcount pending_queries; gpr_refcount pending_queries;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** is there at least one successful query, set in on_done_cb */ /** is there at least one successful query, set in on_done_cb */
bool success; bool success;
/** the errors explaining the request failure, set in on_done_cb */ /** the errors explaining the request failure, set in on_done_cb */
@ -74,7 +72,8 @@ struct grpc_ares_request {
}; };
typedef struct grpc_ares_hostbyname_request { typedef struct grpc_ares_hostbyname_request {
/** following members are set in create_hostbyname_request */ /** following members are set in create_hostbyname_request_locked
*/
/** the top-level request instance */ /** the top-level request instance */
grpc_ares_request* parent_request; grpc_ares_request* parent_request;
/** host to resolve, parsed from the name to resolve */ /** host to resolve, parsed from the name to resolve */
@ -96,10 +95,6 @@ static uint16_t strhtons(const char* port) {
return htons(static_cast<unsigned short>(atoi(port))); return htons(static_cast<unsigned short>(atoi(port)));
} }
static void grpc_ares_request_ref(grpc_ares_request* r) {
gpr_ref(&r->pending_queries);
}
static void log_address_sorting_list(grpc_lb_addresses* lb_addrs, static void log_address_sorting_list(grpc_lb_addresses* lb_addrs,
const char* input_output_str) { const char* input_output_str) {
for (size_t i = 0; i < lb_addrs->num_addresses; i++) { for (size_t i = 0; i < lb_addrs->num_addresses; i++) {
@ -149,7 +144,11 @@ void grpc_cares_wrapper_test_only_address_sorting_sort(
grpc_cares_wrapper_address_sorting_sort(lb_addrs); grpc_cares_wrapper_address_sorting_sort(lb_addrs);
} }
static void grpc_ares_request_unref(grpc_ares_request* r) { static void grpc_ares_request_ref_locked(grpc_ares_request* r) {
gpr_ref(&r->pending_queries);
}
static void grpc_ares_request_unref_locked(grpc_ares_request* r) {
/* If there are no pending queries, invoke on_done callback and destroy the /* If there are no pending queries, invoke on_done callback and destroy the
request */ request */
if (gpr_unref(&r->pending_queries)) { if (gpr_unref(&r->pending_queries)) {
@ -158,13 +157,12 @@ static void grpc_ares_request_unref(grpc_ares_request* r) {
grpc_cares_wrapper_address_sorting_sort(lb_addrs); grpc_cares_wrapper_address_sorting_sort(lb_addrs);
} }
GRPC_CLOSURE_SCHED(r->on_done, r->error); GRPC_CLOSURE_SCHED(r->on_done, r->error);
gpr_mu_destroy(&r->mu); grpc_ares_ev_driver_destroy_locked(r->ev_driver);
grpc_ares_ev_driver_destroy(r->ev_driver);
gpr_free(r); gpr_free(r);
} }
} }
static grpc_ares_hostbyname_request* create_hostbyname_request( static grpc_ares_hostbyname_request* create_hostbyname_request_locked(
grpc_ares_request* parent_request, char* host, uint16_t port, grpc_ares_request* parent_request, char* host, uint16_t port,
bool is_balancer) { bool is_balancer) {
grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>( grpc_ares_hostbyname_request* hr = static_cast<grpc_ares_hostbyname_request*>(
@ -173,22 +171,22 @@ static grpc_ares_hostbyname_request* create_hostbyname_request(
hr->host = gpr_strdup(host); hr->host = gpr_strdup(host);
hr->port = port; hr->port = port;
hr->is_balancer = is_balancer; hr->is_balancer = is_balancer;
grpc_ares_request_ref(parent_request); grpc_ares_request_ref_locked(parent_request);
return hr; return hr;
} }
static void destroy_hostbyname_request(grpc_ares_hostbyname_request* hr) { static void destroy_hostbyname_request_locked(
grpc_ares_request_unref(hr->parent_request); grpc_ares_hostbyname_request* hr) {
grpc_ares_request_unref_locked(hr->parent_request);
gpr_free(hr->host); gpr_free(hr->host);
gpr_free(hr); gpr_free(hr);
} }
static void on_hostbyname_done_cb(void* arg, int status, int timeouts, static void on_hostbyname_done_locked(void* arg, int status, int timeouts,
struct hostent* hostent) { struct hostent* hostent) {
grpc_ares_hostbyname_request* hr = grpc_ares_hostbyname_request* hr =
static_cast<grpc_ares_hostbyname_request*>(arg); static_cast<grpc_ares_hostbyname_request*>(arg);
grpc_ares_request* r = hr->parent_request; grpc_ares_request* r = hr->parent_request;
gpr_mu_lock(&r->mu);
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
GRPC_ERROR_UNREF(r->error); GRPC_ERROR_UNREF(r->error);
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
@ -263,33 +261,33 @@ static void on_hostbyname_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
} }
gpr_mu_unlock(&r->mu); destroy_hostbyname_request_locked(hr);
destroy_hostbyname_request(hr);
} }
static void on_srv_query_done_cb(void* arg, int status, int timeouts, static void on_srv_query_done_locked(void* arg, int status, int timeouts,
unsigned char* abuf, int alen) { unsigned char* abuf, int alen) {
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
gpr_log(GPR_DEBUG, "on_query_srv_done_cb"); gpr_log(GPR_DEBUG, "on_query_srv_done_locked");
if (status == ARES_SUCCESS) { if (status == ARES_SUCCESS) {
gpr_log(GPR_DEBUG, "on_query_srv_done_cb ARES_SUCCESS"); gpr_log(GPR_DEBUG, "on_query_srv_done_locked ARES_SUCCESS");
struct ares_srv_reply* reply; struct ares_srv_reply* reply;
const int parse_status = ares_parse_srv_reply(abuf, alen, &reply); const int parse_status = ares_parse_srv_reply(abuf, alen, &reply);
if (parse_status == ARES_SUCCESS) { if (parse_status == ARES_SUCCESS) {
ares_channel* channel = grpc_ares_ev_driver_get_channel(r->ev_driver); ares_channel* channel =
grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr; for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
srv_it = srv_it->next) { srv_it = srv_it->next) {
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
grpc_ares_hostbyname_request* hr = create_hostbyname_request( grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */); r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, ares_gethostbyname(*channel, hr->host, AF_INET6,
on_hostbyname_done_cb, hr); on_hostbyname_done_locked, hr);
} }
grpc_ares_hostbyname_request* hr = create_hostbyname_request( grpc_ares_hostbyname_request* hr = create_hostbyname_request_locked(
r, srv_it->host, htons(srv_it->port), true /* is_balancer */); r, srv_it->host, htons(srv_it->port), true /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, ares_gethostbyname(*channel, hr->host, AF_INET,
hr); on_hostbyname_done_locked, hr);
grpc_ares_ev_driver_start(r->ev_driver); grpc_ares_ev_driver_start_locked(r->ev_driver);
} }
} }
if (reply != nullptr) { if (reply != nullptr) {
@ -307,21 +305,20 @@ static void on_srv_query_done_cb(void* arg, int status, int timeouts,
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
} }
grpc_ares_request_unref(r); grpc_ares_request_unref_locked(r);
} }
static const char g_service_config_attribute_prefix[] = "grpc_config="; static const char g_service_config_attribute_prefix[] = "grpc_config=";
static void on_txt_done_cb(void* arg, int status, int timeouts, static void on_txt_done_locked(void* arg, int status, int timeouts,
unsigned char* buf, int len) { unsigned char* buf, int len) {
gpr_log(GPR_DEBUG, "on_txt_done_cb"); gpr_log(GPR_DEBUG, "on_txt_done_locked");
char* error_msg; char* error_msg;
grpc_ares_request* r = static_cast<grpc_ares_request*>(arg); grpc_ares_request* r = static_cast<grpc_ares_request*>(arg);
const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1; const size_t prefix_len = sizeof(g_service_config_attribute_prefix) - 1;
struct ares_txt_ext* result = nullptr; struct ares_txt_ext* result = nullptr;
struct ares_txt_ext* reply = nullptr; struct ares_txt_ext* reply = nullptr;
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
gpr_mu_lock(&r->mu);
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
status = ares_parse_txt_reply_ext(buf, len, &reply); status = ares_parse_txt_reply_ext(buf, len, &reply);
if (status != ARES_SUCCESS) goto fail; if (status != ARES_SUCCESS) goto fail;
@ -366,14 +363,14 @@ fail:
r->error = grpc_error_add_child(error, r->error); r->error = grpc_error_add_child(error, r->error);
} }
done: done:
gpr_mu_unlock(&r->mu); grpc_ares_request_unref_locked(r);
grpc_ares_request_unref(r);
} }
static grpc_ares_request* grpc_dns_lookup_ares_impl( static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr; grpc_ares_hostbyname_request* hr = nullptr;
grpc_ares_request* r = nullptr; grpc_ares_request* r = nullptr;
@ -402,20 +399,19 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
} }
port = gpr_strdup(default_port); port = gpr_strdup(default_port);
} }
grpc_ares_ev_driver* ev_driver; grpc_ares_ev_driver* ev_driver;
error = grpc_ares_ev_driver_create(&ev_driver, interested_parties); error = grpc_ares_ev_driver_create_locked(&ev_driver, interested_parties,
combiner);
if (error != GRPC_ERROR_NONE) goto error_cleanup; if (error != GRPC_ERROR_NONE) goto error_cleanup;
r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request))); r = static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
gpr_mu_init(&r->mu);
r->ev_driver = ev_driver; r->ev_driver = ev_driver;
r->on_done = on_done; r->on_done = on_done;
r->lb_addrs_out = addrs; r->lb_addrs_out = addrs;
r->service_config_json_out = service_config_json; r->service_config_json_out = service_config_json;
r->success = false; r->success = false;
r->error = GRPC_ERROR_NONE; r->error = GRPC_ERROR_NONE;
channel = grpc_ares_ev_driver_get_channel(r->ev_driver); channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
// If dns_server is specified, use it. // If dns_server is specified, use it.
if (dns_server != nullptr) { if (dns_server != nullptr) {
@ -457,32 +453,34 @@ static grpc_ares_request* grpc_dns_lookup_ares_impl(
} }
gpr_ref_init(&r->pending_queries, 1); gpr_ref_init(&r->pending_queries, 1);
if (grpc_ipv6_loopback_available()) { if (grpc_ipv6_loopback_available()) {
hr = create_hostbyname_request(r, host, strhtons(port), hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */); false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_locked,
hr);
} }
hr = create_hostbyname_request(r, host, strhtons(port), hr = create_hostbyname_request_locked(r, host, strhtons(port),
false /* is_balancer */); false /* is_balancer */);
ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_locked,
hr);
if (check_grpclb) { if (check_grpclb) {
/* Query the SRV record */ /* Query the SRV record */
grpc_ares_request_ref(r); grpc_ares_request_ref_locked(r);
char* service_name; char* service_name;
gpr_asprintf(&service_name, "_grpclb._tcp.%s", host); gpr_asprintf(&service_name, "_grpclb._tcp.%s", host);
ares_query(*channel, service_name, ns_c_in, ns_t_srv, on_srv_query_done_cb, ares_query(*channel, service_name, ns_c_in, ns_t_srv,
r); on_srv_query_done_locked, r);
gpr_free(service_name); gpr_free(service_name);
} }
if (service_config_json != nullptr) { if (service_config_json != nullptr) {
grpc_ares_request_ref(r); grpc_ares_request_ref_locked(r);
char* config_name; char* config_name;
gpr_asprintf(&config_name, "_grpc_config.%s", host); gpr_asprintf(&config_name, "_grpc_config.%s", host);
ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_cb, r); ares_search(*channel, config_name, ns_c_in, ns_t_txt, on_txt_done_locked,
r);
gpr_free(config_name); gpr_free(config_name);
} }
/* TODO(zyc): Handle CNAME records here. */ grpc_ares_ev_driver_start_locked(r->ev_driver);
grpc_ares_ev_driver_start(r->ev_driver); grpc_ares_request_unref_locked(r);
grpc_ares_request_unref(r);
gpr_free(host); gpr_free(host);
gpr_free(port); gpr_free(port);
return r; return r;
@ -494,15 +492,15 @@ error_cleanup:
return nullptr; return nullptr;
} }
grpc_ares_request* (*grpc_dns_lookup_ares)( grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) = grpc_dns_lookup_ares_impl; grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) { void grpc_cancel_ares_request(grpc_ares_request* r) {
if (grpc_dns_lookup_ares == grpc_dns_lookup_ares_impl) { if (grpc_dns_lookup_ares_locked == grpc_dns_lookup_ares_locked_impl) {
grpc_ares_ev_driver_shutdown(r->ev_driver); grpc_ares_ev_driver_shutdown_locked(r->ev_driver);
} }
} }
@ -534,6 +532,8 @@ void grpc_ares_cleanup(void) {
*/ */
typedef struct grpc_resolve_address_ares_request { typedef struct grpc_resolve_address_ares_request {
/* combiner that queries and related callbacks run under */
grpc_combiner* combiner;
/** the pointer to receive the resolved addresses */ /** the pointer to receive the resolved addresses */
grpc_resolved_addresses** addrs_out; grpc_resolved_addresses** addrs_out;
/** currently resolving lb addresses */ /** currently resolving lb addresses */
@ -541,8 +541,14 @@ typedef struct grpc_resolve_address_ares_request {
/** closure to call when the resolve_address_ares request completes */ /** closure to call when the resolve_address_ares request completes */
grpc_closure* on_resolve_address_done; grpc_closure* on_resolve_address_done;
/** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the /** a closure wrapping on_dns_lookup_done_cb, which should be invoked when the
grpc_dns_lookup_ares operation is done. */ grpc_dns_lookup_ares_locked operation is done. */
grpc_closure on_dns_lookup_done; grpc_closure on_dns_lookup_done;
/* target name */
const char* name;
/* default port to use if none is specified */
const char* default_port;
/* pollset_set to be driven by */
grpc_pollset_set* interested_parties;
} grpc_resolve_address_ares_request; } grpc_resolve_address_ares_request;
static void on_dns_lookup_done_cb(void* arg, grpc_error* error) { static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
@ -566,9 +572,20 @@ static void on_dns_lookup_done_cb(void* arg, grpc_error* error) {
} }
GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error)); GRPC_CLOSURE_SCHED(r->on_resolve_address_done, GRPC_ERROR_REF(error));
if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs); if (r->lb_addrs != nullptr) grpc_lb_addresses_destroy(r->lb_addrs);
GRPC_COMBINER_UNREF(r->combiner, "on_dns_lookup_done_cb");
gpr_free(r); gpr_free(r);
} }
static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
void* arg, grpc_error* unused_error) {
grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>(arg);
grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
&r->on_dns_lookup_done, &r->lb_addrs, false /* check_grpclb */,
nullptr /* service_config_json */, r->combiner);
}
static void grpc_resolve_address_ares_impl(const char* name, static void grpc_resolve_address_ares_impl(const char* name,
const char* default_port, const char* default_port,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties,
@ -577,14 +594,18 @@ static void grpc_resolve_address_ares_impl(const char* name,
grpc_resolve_address_ares_request* r = grpc_resolve_address_ares_request* r =
static_cast<grpc_resolve_address_ares_request*>( static_cast<grpc_resolve_address_ares_request*>(
gpr_zalloc(sizeof(grpc_resolve_address_ares_request))); gpr_zalloc(sizeof(grpc_resolve_address_ares_request)));
r->combiner = grpc_combiner_create();
r->addrs_out = addrs; r->addrs_out = addrs;
r->on_resolve_address_done = on_done; r->on_resolve_address_done = on_done;
GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r, GRPC_CLOSURE_INIT(&r->on_dns_lookup_done, on_dns_lookup_done_cb, r,
grpc_schedule_on_exec_ctx); grpc_schedule_on_exec_ctx);
grpc_dns_lookup_ares(nullptr /* dns_server */, name, default_port, r->name = name;
interested_parties, &r->on_dns_lookup_done, &r->lb_addrs, r->default_port = default_port;
false /* check_grpclb */, r->interested_parties = interested_parties;
nullptr /* service_config_json */); GRPC_CLOSURE_SCHED(
GRPC_CLOSURE_CREATE(grpc_resolve_address_invoke_dns_lookup_ares_locked, r,
grpc_combiner_scheduler(r->combiner)),
GRPC_ERROR_NONE);
} }
void (*grpc_resolve_address_ares)( void (*grpc_resolve_address_ares)(

@ -48,11 +48,11 @@ extern void (*grpc_resolve_address_ares)(const char* name,
function. \a on_done may be called directly in this function without being function. \a on_done may be called directly in this function without being
scheduled with \a exec_ctx, so it must not try to acquire locks that are scheduled with \a exec_ctx, so it must not try to acquire locks that are
being held by the caller. */ being held by the caller. */
extern grpc_ares_request* (*grpc_dns_lookup_ares)( extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb, grpc_lb_addresses** addresses, bool check_grpclb,
char** service_config_json); char** service_config_json, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */ /* Cancel the pending grpc_ares_request \a request */
void grpc_cancel_ares_request(grpc_ares_request* request); void grpc_cancel_ares_request(grpc_ares_request* request);

@ -26,18 +26,19 @@ struct grpc_ares_request {
char val; char val;
}; };
static grpc_ares_request* grpc_dns_lookup_ares_impl( static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner) {
return NULL; return NULL;
} }
grpc_ares_request* (*grpc_dns_lookup_ares)( grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) = grpc_dns_lookup_ares_impl; grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
void grpc_cancel_ares_request(grpc_ares_request* r) {} void grpc_cancel_ares_request(grpc_ares_request* r) {}

@ -223,18 +223,23 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr,
mgr->index == mgr->count) { mgr->index == mgr->count) {
if (error == GRPC_ERROR_NONE && mgr->shutdown) { if (error == GRPC_ERROR_NONE && mgr->shutdown) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown"); error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown");
// TODO(roth): It is currently necessary to shutdown endpoints // It is possible that the endpoint has already been destroyed by
// before destroying then, even when we know that there are no // a shutdown call while this callback was sitting on the ExecCtx
// pending read/write callbacks. This should be fixed, at which // with no error.
// point this can be removed. if (mgr->args.endpoint != nullptr) {
grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error)); // TODO(roth): It is currently necessary to shutdown endpoints
grpc_endpoint_destroy(mgr->args.endpoint); // before destroying then, even when we know that there are no
mgr->args.endpoint = nullptr; // pending read/write callbacks. This should be fixed, at which
grpc_channel_args_destroy(mgr->args.args); // point this can be removed.
mgr->args.args = nullptr; grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error));
grpc_slice_buffer_destroy_internal(mgr->args.read_buffer); grpc_endpoint_destroy(mgr->args.endpoint);
gpr_free(mgr->args.read_buffer); mgr->args.endpoint = nullptr;
mgr->args.read_buffer = nullptr; grpc_channel_args_destroy(mgr->args.args);
mgr->args.args = nullptr;
grpc_slice_buffer_destroy_internal(mgr->args.read_buffer);
gpr_free(mgr->args.read_buffer);
mgr->args.read_buffer = nullptr;
}
} }
if (grpc_handshaker_trace.enabled()) { if (grpc_handshaker_trace.enabled()) {
gpr_log(GPR_INFO, gpr_log(GPR_INFO,

@ -33,6 +33,7 @@
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <string.h> #include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <unistd.h> #include <unistd.h>
@ -63,7 +64,7 @@
// a keepalive ping timeout issue. We may want to revert https://github // a keepalive ping timeout issue. We may want to revert https://github
// .com/grpc/grpc/pull/14943 once we figure out the root cause. // .com/grpc/grpc/pull/14943 once we figure out the root cause.
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
#define MAX_PROBE_EPOLL_FDS 32 #define MAX_FDS_IN_CACHE 32
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount"); "pollable_refcount");
@ -77,8 +78,14 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable; typedef struct pollable pollable;
typedef struct cached_fd { typedef struct cached_fd {
// Set to the grpc_fd's salt value. See 'salt' variable' in grpc_fd for more
// details
intptr_t salt; intptr_t salt;
// The underlying fd
int fd; int fd;
// A recency time counter that helps to determine the LRU fd in the cache
uint64_t last_used; uint64_t last_used;
} cached_fd; } cached_fd;
@ -111,10 +118,32 @@ struct pollable {
int event_count; int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS]; struct epoll_event events[MAX_EPOLL_EVENTS];
// Maintain a LRU-eviction cache of fds in this pollable // We may be calling pollable_add_fd() on the same (pollable, fd) multiple
cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; // times. To prevent pollable_add_fd() from making multiple sys calls to
// epoll_ctl() to add the fd, we maintain a cache of what fds are already
// present in the underlying epoll-set.
//
// Since this is not a correctness issue, we do not need to maintain all the
// fds in the cache. Hence we just use an LRU cache of size 'MAX_FDS_IN_CACHE'
//
// NOTE: An ideal implementation of this should do the following:
// 1) Add fds to the cache in pollable_add_fd() function (i.e whenever the fd
// is added to the pollable's epoll set)
// 2) Remove the fd from the cache whenever the fd is removed from the
// underlying epoll set (i.e whenever fd_orphan() is called).
//
// Implementing (2) above (i.e removing fds from cache on fd_orphan) adds a
// lot of complexity since an fd can be present in multiple pollalbles. So our
// implementation ONLY DOES (1) and NOT (2).
//
// The cache_fd.salt variable helps here to maintain correctness (it serves as
// an epoch that differentiates one grpc_fd from the other even though both of
// them may have the same fd number)
//
// The following implements LRU-eviction cache of fds in this pollable
cached_fd fd_cache[MAX_FDS_IN_CACHE];
int fd_cache_size; int fd_cache_size;
uint64_t fd_cache_counter; uint64_t fd_cache_counter; // Recency timer tick counter
}; };
static const char* pollable_type_string(pollable_type t) { static const char* pollable_type_string(pollable_type t) {
@ -157,15 +186,24 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations * Fd Declarations
*/ */
// Monotonically increasing Epoch counter that is assinged to each grpc_fd. See
// the description of 'salt' variable in 'grpc_fd' for more details
// TODO: (sreek/kpayson) gpr_atm is intptr_t which may not be wide-enough on
// 32-bit systems. Change this to int_64 - atleast on 32-bit systems
static gpr_atm g_fd_salt; static gpr_atm g_fd_salt;
struct grpc_fd { struct grpc_fd {
int fd; int fd;
// Since fd numbers can be reused (after old fds are closed), this serves as
// an epoch that uniquely identifies this fd (i.e the pair (salt, fd) is
// unique (until the salt counter (i.e g_fd_salt) overflows)
intptr_t salt; intptr_t salt;
/* refst format:
bit 0 : 1=Active / 0=Orphaned // refst format:
bits 1-n : refcount // bit 0 : 1=Active / 0=Orphaned
Ref/Unref by two to avoid altering the orphaned bit */ // bits 1-n : refcount
// Ref/Unref by two to avoid altering the orphaned bit
gpr_atm refst; gpr_atm refst;
gpr_mu orphan_mu; gpr_mu orphan_mu;
@ -180,13 +218,13 @@ struct grpc_fd {
struct grpc_fd* freelist_next; struct grpc_fd* freelist_next;
grpc_closure* on_done_closure; grpc_closure* on_done_closure;
/* The pollset that last noticed that the fd is readable. The actual type // The pollset that last noticed that the fd is readable. The actual type
* stored in this is (grpc_pollset *) */ // stored in this is (grpc_pollset *)
gpr_atm read_notifier_pollset; gpr_atm read_notifier_pollset;
grpc_iomgr_object iomgr_object; grpc_iomgr_object iomgr_object;
/* Do we need to track EPOLLERR events separately? */ // Do we need to track EPOLLERR events separately?
bool track_err; bool track_err;
}; };
@ -562,6 +600,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
const int epfd = p->epfd; const int epfd = p->epfd;
gpr_mu_lock(&p->mu); gpr_mu_lock(&p->mu);
p->fd_cache_counter++; p->fd_cache_counter++;
// Handle the case of overflow for our cache counter by // Handle the case of overflow for our cache counter by
// reseting the recency-counter on all cache objects // reseting the recency-counter on all cache objects
if (p->fd_cache_counter == 0) { if (p->fd_cache_counter == 0) {
@ -581,8 +620,9 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
lru_idx = i; lru_idx = i;
} }
} }
// Add to cache // Add to cache
if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { if (p->fd_cache_size < MAX_FDS_IN_CACHE) {
lru_idx = p->fd_cache_size; lru_idx = p->fd_cache_size;
p->fd_cache_size++; p->fd_cache_size++;
} }
@ -590,6 +630,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
p->fd_cache[lru_idx].salt = fd->salt; p->fd_cache[lru_idx].salt = fd->salt;
p->fd_cache[lru_idx].last_used = p->fd_cache_counter; p->fd_cache[lru_idx].last_used = p->fd_cache_counter;
gpr_mu_unlock(&p->mu); gpr_mu_unlock(&p->mu);
if (grpc_polling_trace.enabled()) { if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
} }

@ -27,6 +27,7 @@
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include <errno.h> #include <errno.h>
#include <sys/epoll.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <unistd.h> #include <unistd.h>

@ -260,14 +260,13 @@ static tsi_result ssl_get_x509_common_name(X509* cert, unsigned char** utf8,
X509_NAME* subject_name = X509_get_subject_name(cert); X509_NAME* subject_name = X509_get_subject_name(cert);
int utf8_returned_size = 0; int utf8_returned_size = 0;
if (subject_name == nullptr) { if (subject_name == nullptr) {
gpr_log(GPR_ERROR, "Could not get subject name from certificate."); gpr_log(GPR_INFO, "Could not get subject name from certificate.");
return TSI_NOT_FOUND; return TSI_NOT_FOUND;
} }
common_name_index = common_name_index =
X509_NAME_get_index_by_NID(subject_name, NID_commonName, -1); X509_NAME_get_index_by_NID(subject_name, NID_commonName, -1);
if (common_name_index == -1) { if (common_name_index == -1) {
gpr_log(GPR_ERROR, gpr_log(GPR_INFO, "Could not get common name of subject from certificate.");
"Could not get common name of subject from certificate.");
return TSI_NOT_FOUND; return TSI_NOT_FOUND;
} }
common_name_entry = X509_NAME_get_entry(subject_name, common_name_index); common_name_entry = X509_NAME_get_entry(subject_name, common_name_index);

@ -9,7 +9,8 @@ gRPC PHP installation instructions for Google Cloud Platform is in
## Environment ## Environment
###Prerequisite: ### Prerequisite:
* `php` 5.5 or above, 7.0 or above * `php` 5.5 or above, 7.0 or above
* `pecl` * `pecl`
* `composer` * `composer`

@ -119,6 +119,8 @@ class TestGevent(setuptools.Command):
# I have no idea why this doesn't work in gevent, but it shouldn't even be # I have no idea why this doesn't work in gevent, but it shouldn't even be
# using the c-core # using the c-core
'testing._client_test.ClientTest.test_infinite_request_stream_real_time', 'testing._client_test.ClientTest.test_infinite_request_stream_real_time',
# TODO(https://github.com/grpc/grpc/issues/15743) enable this test
'unit._session_cache_test.SSLSessionCacheTest.testSSLSessionCacheLRU',
# TODO(https://github.com/grpc/grpc/issues/14789) enable this test # TODO(https://github.com/grpc/grpc/issues/14789) enable this test
'unit._server_ssl_cert_config_test', 'unit._server_ssl_cert_config_test',
# TODO(https://github.com/grpc/grpc/issues/14901) enable this test # TODO(https://github.com/grpc/grpc/issues/14901) enable this test

@ -60,11 +60,11 @@ static void my_resolve_address(const char* addr, const char* default_port,
static grpc_address_resolver_vtable test_resolver = {my_resolve_address, static grpc_address_resolver_vtable test_resolver = {my_resolve_address,
nullptr}; nullptr};
static grpc_ares_request* my_dns_lookup_ares( static grpc_ares_request* my_dns_lookup_ares_locked(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** lb_addrs, bool check_grpclb, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) { grpc_combiner* combiner) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
GPR_ASSERT(0 == strcmp("test", addr)); GPR_ASSERT(0 == strcmp("test", addr));
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
@ -147,7 +147,7 @@ int main(int argc, char** argv) {
gpr_mu_init(&g_mu); gpr_mu_init(&g_mu);
g_combiner = grpc_combiner_create(); g_combiner = grpc_combiner_create();
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
grpc_channel_args* result = (grpc_channel_args*)1; grpc_channel_args* result = (grpc_channel_args*)1;
{ {

@ -33,10 +33,11 @@ static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner; static grpc_combiner* g_combiner;
grpc_ares_request* (*g_default_dns_lookup_ares)( grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json); grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_combiner* combiner);
// Counter incremented by test_resolve_address_impl indicating the number of // Counter incremented by test_resolve_address_impl indicating the number of
// times a system-level resolution has happened. // times a system-level resolution has happened.
@ -72,13 +73,14 @@ static grpc_error* test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = { static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl}; test_resolve_address_impl, test_blocking_resolve_address_impl};
grpc_ares_request* test_dns_lookup_ares( grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port, const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json) { grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
grpc_ares_request* result = g_default_dns_lookup_ares( grpc_combiner* combiner) {
grpc_ares_request* result = g_default_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs, dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs,
check_grpclb, service_config_json); check_grpclb, service_config_json, combiner);
++g_resolution_count; ++g_resolution_count;
return result; return result;
} }
@ -308,8 +310,8 @@ int main(int argc, char** argv) {
g_combiner = grpc_combiner_create(); g_combiner = grpc_combiner_create();
g_default_dns_lookup_ares = grpc_dns_lookup_ares; g_default_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares = test_dns_lookup_ares; grpc_dns_lookup_ares_locked = test_dns_lookup_ares_locked;
default_resolve_address = grpc_resolve_address_impl; default_resolve_address = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);

@ -374,13 +374,11 @@ void my_resolve_address(const char* addr, const char* default_port,
static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address, static grpc_address_resolver_vtable fuzzer_resolver = {my_resolve_address,
nullptr}; nullptr};
grpc_ares_request* my_dns_lookup_ares(const char* dns_server, const char* addr, grpc_ares_request* my_dns_lookup_ares_locked(
const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_closure* on_done, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
grpc_lb_addresses** lb_addrs, grpc_combiner* combiner) {
bool check_grpclb,
char** service_config_json) {
addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r))); addr_req* r = static_cast<addr_req*>(gpr_malloc(sizeof(*r)));
r->addr = gpr_strdup(addr); r->addr = gpr_strdup(addr);
r->on_done = on_done; r->on_done = on_done;
@ -706,7 +704,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_executor_set_threading(false); grpc_executor_set_threading(false);
} }
grpc_set_resolver_impl(&fuzzer_resolver); grpc_set_resolver_impl(&fuzzer_resolver);
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
GPR_ASSERT(g_channel == nullptr); GPR_ASSERT(g_channel == nullptr);
GPR_ASSERT(g_server == nullptr); GPR_ASSERT(g_server == nullptr);

@ -44,11 +44,11 @@ static void* tag(intptr_t i) { return (void*)i; }
static gpr_mu g_mu; static gpr_mu g_mu;
static int g_resolve_port = -1; static int g_resolve_port = -1;
static grpc_ares_request* (*iomgr_dns_lookup_ares)( static grpc_ares_request* (*iomgr_dns_lookup_ares_locked)(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb, grpc_lb_addresses** addresses, bool check_grpclb,
char** service_config_json); char** service_config_json, grpc_combiner* combiner);
static void set_resolve_port(int port) { static void set_resolve_port(int port) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
@ -98,15 +98,15 @@ static grpc_error* my_blocking_resolve_address(
static grpc_address_resolver_vtable test_resolver = { static grpc_address_resolver_vtable test_resolver = {
my_resolve_address, my_blocking_resolve_address}; my_resolve_address, my_blocking_resolve_address};
static grpc_ares_request* my_dns_lookup_ares( static grpc_ares_request* my_dns_lookup_ares_locked(
const char* dns_server, const char* addr, const char* default_port, const char* dns_server, const char* addr, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done, grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** lb_addrs, bool check_grpclb, grpc_lb_addresses** lb_addrs, bool check_grpclb, char** service_config_json,
char** service_config_json) { grpc_combiner* combiner) {
if (0 != strcmp(addr, "test")) { if (0 != strcmp(addr, "test")) {
return iomgr_dns_lookup_ares(dns_server, addr, default_port, return iomgr_dns_lookup_ares_locked(
interested_parties, on_done, lb_addrs, dns_server, addr, default_port, interested_parties, on_done, lb_addrs,
check_grpclb, service_config_json); check_grpclb, service_config_json, combiner);
} }
grpc_error* error = GRPC_ERROR_NONE; grpc_error* error = GRPC_ERROR_NONE;
@ -142,8 +142,8 @@ int main(int argc, char** argv) {
grpc_init(); grpc_init();
default_resolver = grpc_resolve_address_impl; default_resolver = grpc_resolve_address_impl;
grpc_set_resolver_impl(&test_resolver); grpc_set_resolver_impl(&test_resolver);
iomgr_dns_lookup_ares = grpc_dns_lookup_ares; iomgr_dns_lookup_ares_locked = grpc_dns_lookup_ares_locked;
grpc_dns_lookup_ares = my_dns_lookup_ares; grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked;
int was_cancelled1; int was_cancelled1;
int was_cancelled2; int was_cancelled2;

@ -6,4 +6,5 @@ exports_files([
"config_freebsd/ares_config.h", "config_freebsd/ares_config.h",
"config_linux/ares_config.h", "config_linux/ares_config.h",
"config_openbsd/ares_config.h", "config_openbsd/ares_config.h",
"config_windows/ares_config.h",
]) ])

@ -3,6 +3,11 @@ config_setting(
values = {"cpu": "darwin"}, values = {"cpu": "darwin"},
) )
config_setting(
name = "windows",
values = {"cpu": "x64_windows"},
)
# Android is not officially supported through C++. # Android is not officially supported through C++.
# This just helps with the build for now. # This just helps with the build for now.
config_setting( config_setting(
@ -49,6 +54,7 @@ genrule(
":ios_armv7s": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"], ":ios_armv7s": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"],
":ios_arm64": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"], ":ios_arm64": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"],
":darwin": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"], ":darwin": ["@com_github_grpc_grpc//third_party/cares:config_darwin/ares_config.h"],
":windows": ["@com_github_grpc_grpc//third_party/cares:config_windows/ares_config.h"],
":android": ["@com_github_grpc_grpc//third_party/cares:config_android/ares_config.h"], ":android": ["@com_github_grpc_grpc//third_party/cares:config_android/ares_config.h"],
"//conditions:default": ["@com_github_grpc_grpc//third_party/cares:config_linux/ares_config.h"], "//conditions:default": ["@com_github_grpc_grpc//third_party/cares:config_linux/ares_config.h"],
}), }),
@ -138,10 +144,22 @@ cc_library(
copts = [ copts = [
"-D_GNU_SOURCE", "-D_GNU_SOURCE",
"-D_HAS_EXCEPTIONS=0", "-D_HAS_EXCEPTIONS=0",
"-DNOMINMAX",
"-DHAVE_CONFIG_H", "-DHAVE_CONFIG_H",
], ] + select({
":windows": [
"-DNOMINMAX",
"-D_CRT_SECURE_NO_DEPRECATE",
"-D_CRT_NONSTDC_NO_DEPRECATE",
"-D_WIN32_WINNT=0x0600",
],
"//conditions:default": [],
}),
defines = ["CARES_STATICLIB"],
includes = ["."], includes = ["."],
linkopts = select({
":windows": ["-defaultlib:ws2_32.lib"],
"//conditions:default": [],
}),
linkstatic = 1, linkstatic = 1,
visibility = [ visibility = [
"//visibility:public", "//visibility:public",

@ -0,0 +1,423 @@
/* Generated from ares_config.h.cmake*/
/* Define if building universal (internal helper macro) */
#undef AC_APPLE_UNIVERSAL_BUILD
/* define this if ares is built for a big endian system */
#undef ARES_BIG_ENDIAN
/* when building as static part of libcurl */
#undef BUILDING_LIBCURL
/* Defined for build that exposes internal static functions for testing. */
#undef CARES_EXPOSE_STATICS
/* Defined for build with symbol hiding. */
#undef CARES_SYMBOL_HIDING
/* Definition to make a library symbol externally visible. */
#undef CARES_SYMBOL_SCOPE_EXTERN
/* Use resolver library to configure cares */
/* #undef CARES_USE_LIBRESOLV */
/* if a /etc/inet dir is being used */
#undef ETC_INET
/* Define to the type of arg 2 for gethostname. */
#define GETHOSTNAME_TYPE_ARG2 int
/* Define to the type qualifier of arg 1 for getnameinfo. */
#define GETNAMEINFO_QUAL_ARG1
/* Define to the type of arg 1 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG1 struct sockaddr *
/* Define to the type of arg 2 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG2 socklen_t
/* Define to the type of args 4 and 6 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG46 socklen_t
/* Define to the type of arg 7 for getnameinfo. */
#define GETNAMEINFO_TYPE_ARG7 int
/* Specifies the number of arguments to getservbyport_r */
#define GETSERVBYPORT_R_ARGS
/* Define to 1 if you have AF_INET6. */
#define HAVE_AF_INET6
/* Define to 1 if you have the <arpa/inet.h> header file. */
/* #undef HAVE_ARPA_INET_H */
/* Define to 1 if you have the <arpa/nameser_compat.h> header file. */
/* #undef HAVE_ARPA_NAMESER_COMPAT_H */
/* Define to 1 if you have the <arpa/nameser.h> header file. */
/* #undef HAVE_ARPA_NAMESER_H */
/* Define to 1 if you have the <assert.h> header file. */
#define HAVE_ASSERT_H
/* Define to 1 if you have the `bitncmp' function. */
/* #undef HAVE_BITNCMP */
/* Define to 1 if bool is an available type. */
#define HAVE_BOOL_T
/* Define to 1 if you have the clock_gettime function and monotonic timer. */
/* #undef HAVE_CLOCK_GETTIME_MONOTONIC */
/* Define to 1 if you have the closesocket function. */
#define HAVE_CLOSESOCKET
/* Define to 1 if you have the CloseSocket camel case function. */
/* #undef HAVE_CLOSESOCKET_CAMEL */
/* Define to 1 if you have the connect function. */
#define HAVE_CONNECT
/* define if the compiler supports basic C++11 syntax */
/* #undef HAVE_CXX11 */
/* Define to 1 if you have the <dlfcn.h> header file. */
/* #undef HAVE_DLFCN_H */
/* Define to 1 if you have the <errno.h> header file. */
#define HAVE_ERRNO_H
/* Define to 1 if you have the fcntl function. */
/* #undef HAVE_FCNTL */
/* Define to 1 if you have the <fcntl.h> header file. */
#define HAVE_FCNTL_H
/* Define to 1 if you have a working fcntl O_NONBLOCK function. */
/* #undef HAVE_FCNTL_O_NONBLOCK */
/* Define to 1 if you have the freeaddrinfo function. */
#define HAVE_FREEADDRINFO
/* Define to 1 if you have a working getaddrinfo function. */
#define HAVE_GETADDRINFO
/* Define to 1 if the getaddrinfo function is threadsafe. */
#define HAVE_GETADDRINFO_THREADSAFE
/* Define to 1 if you have the getenv function. */
#define HAVE_GETENV
/* Define to 1 if you have the gethostbyaddr function. */
#define HAVE_GETHOSTBYADDR
/* Define to 1 if you have the gethostbyname function. */
#define HAVE_GETHOSTBYNAME
/* Define to 1 if you have the gethostname function. */
#define HAVE_GETHOSTNAME
/* Define to 1 if you have the getnameinfo function. */
#define HAVE_GETNAMEINFO
/* Define to 1 if you have the getservbyport_r function. */
/* #undef HAVE_GETSERVBYPORT_R */
/* Define to 1 if you have the `gettimeofday' function. */
/* #undef HAVE_GETTIMEOFDAY */
/* Define to 1 if you have the `if_indextoname' function. */
/* #undef HAVE_IF_INDEXTONAME */
/* Define to 1 if you have a IPv6 capable working inet_net_pton function. */
/* #undef HAVE_INET_NET_PTON */
/* Define to 1 if you have a IPv6 capable working inet_ntop function. */
/* #undef HAVE_INET_NTOP */
/* Define to 1 if you have a IPv6 capable working inet_pton function. */
/* #undef HAVE_INET_PTON */
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H
/* Define to 1 if you have the ioctl function. */
/* #undef HAVE_IOCTL */
/* Define to 1 if you have the ioctlsocket function. */
#define HAVE_IOCTLSOCKET
/* Define to 1 if you have the IoctlSocket camel case function. */
/* #undef HAVE_IOCTLSOCKET_CAMEL */
/* Define to 1 if you have a working IoctlSocket camel case FIONBIO function.
*/
/* #undef HAVE_IOCTLSOCKET_CAMEL_FIONBIO */
/* Define to 1 if you have a working ioctlsocket FIONBIO function. */
#define HAVE_IOCTLSOCKET_FIONBIO
/* Define to 1 if you have a working ioctl FIONBIO function. */
/* #undef HAVE_IOCTL_FIONBIO */
/* Define to 1 if you have a working ioctl SIOCGIFADDR function. */
/* #undef HAVE_IOCTL_SIOCGIFADDR */
/* Define to 1 if you have the `resolve' library (-lresolve). */
/* #undef HAVE_LIBRESOLV */
/* Define to 1 if you have the <limits.h> header file. */
#define HAVE_LIMITS_H
/* if your compiler supports LL */
#define HAVE_LL
/* Define to 1 if the compiler supports the 'long long' data type. */
#define HAVE_LONGLONG
/* Define to 1 if you have the malloc.h header file. */
#define HAVE_MALLOC_H
/* Define to 1 if you have the memory.h header file. */
#define HAVE_MEMORY_H
/* Define to 1 if you have the MSG_NOSIGNAL flag. */
/* #undef HAVE_MSG_NOSIGNAL */
/* Define to 1 if you have the <netdb.h> header file. */
/* #undef HAVE_NETDB_H */
/* Define to 1 if you have the <netinet/in.h> header file. */
/* #undef HAVE_NETINET_IN_H */
/* Define to 1 if you have the <netinet/tcp.h> header file. */
/* #undef HAVE_NETINET_TCP_H */
/* Define to 1 if you have the <net/if.h> header file. */
/* #undef HAVE_NET_IF_H */
/* Define to 1 if you have PF_INET6. */
#define HAVE_PF_INET6
/* Define to 1 if you have the recv function. */
#define HAVE_RECV
/* Define to 1 if you have the recvfrom function. */
#define HAVE_RECVFROM
/* Define to 1 if you have the send function. */
#define HAVE_SEND
/* Define to 1 if you have the setsockopt function. */
#define HAVE_SETSOCKOPT
/* Define to 1 if you have a working setsockopt SO_NONBLOCK function. */
/* #undef HAVE_SETSOCKOPT_SO_NONBLOCK */
/* Define to 1 if you have the <signal.h> header file. */
#define HAVE_SIGNAL_H
/* Define to 1 if sig_atomic_t is an available typedef. */
#define HAVE_SIG_ATOMIC_T
/* Define to 1 if sig_atomic_t is already defined as volatile. */
/* #undef HAVE_SIG_ATOMIC_T_VOLATILE */
/* Define to 1 if your struct sockaddr_in6 has sin6_scope_id. */
#define HAVE_SOCKADDR_IN6_SIN6_SCOPE_ID
/* Define to 1 if you have the socket function. */
#define HAVE_SOCKET
/* Define to 1 if you have the <socket.h> header file. */
/* #undef HAVE_SOCKET_H */
/* Define to 1 if you have the <stdbool.h> header file. */
#define HAVE_STDBOOL_H
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H
/* Define to 1 if you have the strcasecmp function. */
/* #undef HAVE_STRCASECMP */
/* Define to 1 if you have the strcmpi function. */
#define HAVE_STRCMPI
/* Define to 1 if you have the strdup function. */
#define HAVE_STRDUP
/* Define to 1 if you have the stricmp function. */
#define HAVE_STRICMP
/* Define to 1 if you have the <strings.h> header file. */
/* #undef HAVE_STRINGS_H */
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H
/* Define to 1 if you have the strncasecmp function. */
/* #undef HAVE_STRNCASECMP */
/* Define to 1 if you have the strncmpi function. */
/* #undef HAVE_STRNCMPI */
/* Define to 1 if you have the strnicmp function. */
#define HAVE_STRNICMP
/* Define to 1 if you have the <stropts.h> header file. */
/* #undef HAVE_STROPTS_H */
/* Define to 1 if you have struct addrinfo. */
#define HAVE_STRUCT_ADDRINFO
/* Define to 1 if you have struct in6_addr. */
#define HAVE_STRUCT_IN6_ADDR
/* Define to 1 if you have struct sockaddr_in6. */
#define HAVE_STRUCT_SOCKADDR_IN6
/* if struct sockaddr_storage is defined */
#define HAVE_STRUCT_SOCKADDR_STORAGE
/* Define to 1 if you have the timeval struct. */
#define HAVE_STRUCT_TIMEVAL
/* Define to 1 if you have the <sys/ioctl.h> header file. */
/* #undef HAVE_SYS_IOCTL_H */
/* Define to 1 if you have the <sys/param.h> header file. */
/* #undef HAVE_SYS_PARAM_H */
/* Define to 1 if you have the <sys/select.h> header file. */
/* #undef HAVE_SYS_SELECT_H */
/* Define to 1 if you have the <sys/socket.h> header file. */
/* #undef HAVE_SYS_SOCKET_H */
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H
/* Define to 1 if you have the <sys/time.h> header file. */
/* #undef HAVE_SYS_TIME_H */
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H
/* Define to 1 if you have the <sys/uio.h> header file. */
/* #undef HAVE_SYS_UIO_H */
/* Define to 1 if you have the <time.h> header file. */
#define HAVE_TIME_H
/* Define to 1 if you have the <unistd.h> header file. */
/* #undef HAVE_UNISTD_H */
/* Define to 1 if you have the windows.h header file. */
#define HAVE_WINDOWS_H
/* Define to 1 if you have the winsock2.h header file. */
#define HAVE_WINSOCK2_H
/* Define to 1 if you have the winsock.h header file. */
#define HAVE_WINSOCK_H
/* Define to 1 if you have the writev function. */
/* #undef HAVE_WRITEV */
/* Define to 1 if you have the ws2tcpip.h header file. */
#define HAVE_WS2TCPIP_H
/* Define to 1 if you need the malloc.h header file even with stdlib.h */
/* #undef NEED_MALLOC_H */
/* Define to 1 if you need the memory.h header file even with stdlib.h */
/* #undef NEED_MEMORY_H */
/* a suitable file/device to read random data from */
/* #undef RANDOM_FILE */
/* Define to the type qualifier pointed by arg 5 for recvfrom. */
#define RECVFROM_QUAL_ARG5
/* Define to the type of arg 1 for recvfrom. */
#define RECVFROM_TYPE_ARG1 SOCKET
/* Define to the type pointed by arg 2 for recvfrom. */
#define RECVFROM_TYPE_ARG2 void *
/* Define to 1 if the type pointed by arg 2 for recvfrom is void. */
#define RECVFROM_TYPE_ARG2_IS_VOID 0
/* Define to the type of arg 3 for recvfrom. */
#define RECVFROM_TYPE_ARG3 int
/* Define to the type of arg 4 for recvfrom. */
#define RECVFROM_TYPE_ARG4 int
/* Define to the type pointed by arg 5 for recvfrom. */
#define RECVFROM_TYPE_ARG5 struct sockaddr *
/* Define to 1 if the type pointed by arg 5 for recvfrom is void. */
#define RECVFROM_TYPE_ARG5_IS_VOID 0
/* Define to the type pointed by arg 6 for recvfrom. */
#define RECVFROM_TYPE_ARG6 socklen_t *
/* Define to 1 if the type pointed by arg 6 for recvfrom is void. */
#define RECVFROM_TYPE_ARG6_IS_VOID 0
/* Define to the function return type for recvfrom. */
#define RECVFROM_TYPE_RETV int
/* Define to the type of arg 1 for recv. */
#define RECV_TYPE_ARG1 SOCKET
/* Define to the type of arg 2 for recv. */
#define RECV_TYPE_ARG2 void *
/* Define to the type of arg 3 for recv. */
#define RECV_TYPE_ARG3 int
/* Define to the type of arg 4 for recv. */
#define RECV_TYPE_ARG4 int
/* Define to the function return type for recv. */
#define RECV_TYPE_RETV int
/* Define as the return type of signal handlers (`int' or `void'). */
#define RETSIGTYPE
/* Define to the type qualifier of arg 2 for send. */
#define SEND_QUAL_ARG2
/* Define to the type of arg 1 for send. */
#define SEND_TYPE_ARG1 SOCKET
/* Define to the type of arg 2 for send. */
#define SEND_TYPE_ARG2 void *
/* Define to the type of arg 3 for send. */
#define SEND_TYPE_ARG3 int
/* Define to the type of arg 4 for send. */
#define SEND_TYPE_ARG4 int
/* Define to the function return type for send. */
#define SEND_TYPE_RETV int
/* Define to 1 if you can safely include both <sys/time.h> and <time.h>. */
/* #undef TIME_WITH_SYS_TIME */
/* Define to disable non-blocking sockets. */
#undef USE_BLOCKING_SOCKETS
/* Define to avoid automatic inclusion of winsock.h */
#undef WIN32_LEAN_AND_MEAN
/* Type to use in place of in_addr_t when system does not provide it. */
#undef in_addr_t

@ -82,7 +82,7 @@
"posix", "posix",
"windows" "windows"
], ],
"cpu_cost": 1.0, "cpu_cost": 10,
"exclude_configs": [], "exclude_configs": [],
"exclude_iomgrs": [], "exclude_iomgrs": [],
"flaky": false, "flaky": false,

@ -86,6 +86,26 @@ def _get_build_metadata(test_results):
test_results['job_name'] = job_name test_results['job_name'] = job_name
def _insert_rows_with_retries(bq, bq_table, bq_rows):
"""Insert rows to bq table. Retry on error."""
# BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
for i in range((len(bq_rows) / 1000) + 1):
max_retries = 3
for attempt in range(max_retries):
if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID,
bq_table,
bq_rows[i * 1000:(i + 1) * 1000]):
break
else:
if attempt < max_retries - 1:
print('Error uploading result to bigquery, will retry.')
else:
print(
'Error uploading result to bigquery, all attempts failed.'
)
sys.exit(1)
def upload_results_to_bq(resultset, bq_table, args, platform): def upload_results_to_bq(resultset, bq_table, args, platform):
"""Upload test results to a BQ table. """Upload test results to a BQ table.
@ -106,6 +126,7 @@ def upload_results_to_bq(resultset, bq_table, args, platform):
partition_type=_PARTITION_TYPE, partition_type=_PARTITION_TYPE,
expiration_ms=_EXPIRATION_MS) expiration_ms=_EXPIRATION_MS)
bq_rows = []
for shortname, results in six.iteritems(resultset): for shortname, results in six.iteritems(resultset):
for result in results: for result in results:
test_results = {} test_results = {}
@ -124,23 +145,9 @@ def upload_results_to_bq(resultset, bq_table, args, platform):
test_results['return_code'] = result.returncode test_results['return_code'] = result.returncode
test_results['test_name'] = shortname test_results['test_name'] = shortname
test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
row = big_query_utils.make_row(str(uuid.uuid4()), test_results) row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
bq_rows.append(row)
# TODO(jtattermusch): rows are inserted one by one, very inefficient _insert_rows_with_retries(bq, bq_table, bq_rows)
max_retries = 3
for attempt in range(max_retries):
if big_query_utils.insert_rows(bq, _PROJECT_ID, _DATASET_ID,
bq_table, [row]):
break
else:
if attempt < max_retries - 1:
print('Error uploading result to bigquery, will retry.')
else:
print(
'Error uploading result to bigquery, all attempts failed.'
)
sys.exit(1)
def upload_interop_results_to_bq(resultset, bq_table, args): def upload_interop_results_to_bq(resultset, bq_table, args):
@ -162,8 +169,8 @@ def upload_interop_results_to_bq(resultset, bq_table, args):
partition_type=_PARTITION_TYPE, partition_type=_PARTITION_TYPE,
expiration_ms=_EXPIRATION_MS) expiration_ms=_EXPIRATION_MS)
bq_rows = []
for shortname, results in six.iteritems(resultset): for shortname, results in six.iteritems(resultset):
bq_rows = []
for result in results: for result in results:
test_results = {} test_results = {}
_get_build_metadata(test_results) _get_build_metadata(test_results)
@ -177,20 +184,4 @@ def upload_interop_results_to_bq(resultset, bq_table, args):
test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S') test_results['timestamp'] = time.strftime('%Y-%m-%d %H:%M:%S')
row = big_query_utils.make_row(str(uuid.uuid4()), test_results) row = big_query_utils.make_row(str(uuid.uuid4()), test_results)
bq_rows.append(row) bq_rows.append(row)
_insert_rows_with_retries(bq, bq_table, bq_rows)
# BigQuery sometimes fails with large uploads, so batch 1,000 rows at a time.
for i in range((len(bq_rows) / 1000) + 1):
max_retries = 3
for attempt in range(max_retries):
if big_query_utils.insert_rows(
bq, _PROJECT_ID, _DATASET_ID, bq_table,
bq_rows[i * 1000:(i + 1) * 1000]):
break
else:
if attempt < max_retries - 1:
print('Error uploading result to bigquery, will retry.')
else:
print(
'Error uploading result to bigquery, all attempts failed.'
)
sys.exit(1)

Loading…
Cancel
Save