From 0d6304a38cc298c7f0f1138b617e38c550435322 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 17 Aug 2017 15:46:19 -0700 Subject: [PATCH 01/27] Fix uds handshake --- src/core/ext/transport/chttp2/client/chttp2_connector.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 202bcd47f5c..731595d348e 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -136,6 +136,8 @@ static void start_handshake_locked(grpc_exec_ctx *exec_ctx, c->handshake_mgr = grpc_handshake_manager_create(); grpc_handshakers_add(exec_ctx, HANDSHAKER_CLIENT, c->args.channel_args, c->handshake_mgr); + grpc_endpoint_add_to_pollset_set(exec_ctx, c->endpoint, + c->args.interested_parties); grpc_handshake_manager_do_handshake( exec_ctx, c->handshake_mgr, c->endpoint, c->args.channel_args, c->args.deadline, NULL /* acceptor */, on_handshake_done, c); From ab74a8c86e51262f96a39c5698b520ffc92bc6cc Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 2 Oct 2017 14:56:11 -0700 Subject: [PATCH 02/27] Add grpc_endpoint_delete_from_pollset_set --- src/core/lib/iomgr/endpoint.cc | 6 ++++++ src/core/lib/iomgr/endpoint.h | 11 ++++++++-- src/core/lib/iomgr/tcp_posix.cc | 21 +++++++++++++++---- .../lib/security/transport/secure_endpoint.cc | 8 +++++++ test/core/util/mock_endpoint.c | 15 +++++++++++-- test/core/util/passthru_endpoint.c | 15 +++++++++++-- test/core/util/trickle_endpoint.c | 21 +++++++++++++++---- 7 files changed, 83 insertions(+), 14 deletions(-) diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 37cce335cab..5eab1d31588 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -39,6 +39,12 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set); } +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + ep->vtable->delete_from_pollset_set(exec_ctx, ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_error* why) { ep->vtable->shutdown(exec_ctx, ep, why); diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 16ff0ab733e..c91198a393e 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -45,6 +45,8 @@ struct grpc_endpoint_vtable { grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset); + void (*delete_from_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); @@ -85,14 +87,19 @@ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); -/* Add an endpoint to a pollset, so that when the pollset is polled, events from - this endpoint are considered */ +/* Add an endpoint to a pollset or pollset_set, so that when the pollset is + polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); +/* Delete an endpoint from a pollset_set */ +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set); + grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint); struct grpc_endpoint { diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 7e271294fd6..00077d95eaa 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -706,6 +706,13 @@ static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd); } +static void tcp_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd); +} + static char *tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); @@ -721,10 +728,16 @@ static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_delete_from_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_resource_user, + tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index ae5633b82c4..859d04ae5a6 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -379,6 +379,13 @@ static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } +static void endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *secure_ep, + grpc_pollset_set *pollset_set) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); +} + static char *endpoint_get_peer(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; return grpc_endpoint_get_peer(ep->wrapped_ep); @@ -399,6 +406,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, endpoint_add_to_pollset_set, + endpoint_delete_from_pollset_set, endpoint_shutdown, endpoint_destroy, endpoint_get_resource_user, diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index bd386b21488..7cae5c045ea 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -69,6 +69,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} +static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; @@ -103,8 +107,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) { static int me_get_fd(grpc_endpoint *ep) { return -1; } static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_resource_user, me_get_peer, + me_read, + me_write, + me_add_to_pollset, + me_add_to_pollset_set, + me_delete_from_pollset_set, + me_shutdown, + me_destroy, + me_get_resource_user, + me_get_peer, me_get_fd, }; diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 38a47584d57..1bf28885039 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -107,6 +107,10 @@ static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, static void me_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} +static void me_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { half *m = (half *)ep; @@ -160,8 +164,15 @@ static grpc_resource_user *me_get_resource_user(grpc_endpoint *ep) { } static const grpc_endpoint_vtable vtable = { - me_read, me_write, me_add_to_pollset, me_add_to_pollset_set, - me_shutdown, me_destroy, me_get_resource_user, me_get_peer, + me_read, + me_write, + me_add_to_pollset, + me_add_to_pollset_set, + me_delete_from_pollset_set, + me_shutdown, + me_destroy, + me_get_resource_user, + me_get_peer, me_get_fd, }; diff --git a/test/core/util/trickle_endpoint.c b/test/core/util/trickle_endpoint.c index fc066f9d808..d761f722977 100644 --- a/test/core/util/trickle_endpoint.c +++ b/test/core/util/trickle_endpoint.c @@ -89,6 +89,13 @@ static void te_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_endpoint_add_to_pollset_set(exec_ctx, te->wrapped, pollset_set); } +static void te_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { + trickle_endpoint *te = (trickle_endpoint *)ep; + grpc_endpoint_delete_from_pollset_set(exec_ctx, te->wrapped, pollset_set); +} + static void te_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { trickle_endpoint *te = (trickle_endpoint *)ep; @@ -135,10 +142,16 @@ static void te_finish_write(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&te->mu); } -static const grpc_endpoint_vtable vtable = { - te_read, te_write, te_add_to_pollset, te_add_to_pollset_set, - te_shutdown, te_destroy, te_get_resource_user, te_get_peer, - te_get_fd}; +static const grpc_endpoint_vtable vtable = {te_read, + te_write, + te_add_to_pollset, + te_add_to_pollset_set, + te_delete_from_pollset_set, + te_shutdown, + te_destroy, + te_get_resource_user, + te_get_peer, + te_get_fd}; grpc_endpoint *grpc_trickle_endpoint_create(grpc_endpoint *wrap, double bytes_per_second) { From 59e16b1a7d59866cfdf663701721c9016e92f0ad Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 2 Oct 2017 14:59:52 -0700 Subject: [PATCH 03/27] Remove endpoint from connector's interested_parties after handshake is done --- src/core/ext/transport/chttp2/client/chttp2_connector.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index 731595d348e..74839f21560 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -115,6 +115,8 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } memset(c->result, 0, sizeof(*c->result)); } else { + grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint, + c->args.interested_parties); c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1); GPR_ASSERT(c->result->transport); From 01432b7cf3364077d711663fac8c1ef6d4b1d3f0 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 2 Oct 2017 16:14:06 -0700 Subject: [PATCH 04/27] Fix windows and uv build --- src/core/lib/iomgr/tcp_uv.cc | 23 +++++++++++++++++++---- src/core/lib/iomgr/tcp_windows.cc | 18 ++++++++++++++---- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index e311964dbcf..99b9f1ea2d3 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -304,6 +304,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, (void)pollset; } +static void uv_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; +} + static void shutdown_callback(uv_shutdown_t *req, int status) {} static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -340,10 +349,16 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { static int uv_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, - uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, - uv_get_resource_user, uv_get_peer, uv_get_fd}; +static grpc_endpoint_vtable vtable = {uv_endpoint_read, + uv_endpoint_write, + uv_add_to_pollset, + uv_add_to_pollset_set, + uv_delete_from_pollset_set, + uv_endpoint_shutdown, + uv_destroy, + uv_get_resource_user, + uv_get_peer, + uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index dc84e564a9b..5aba5078a88 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -371,6 +371,10 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_iocp_add_socket(tcp->socket); } +static void win_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pss) {} + /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks for the potential read and write operations. It is up to the caller to guarantee this isn't called in parallel to a read or write request, so @@ -412,10 +416,16 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { static int win_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_resource_user, win_get_peer, - win_get_fd}; +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_add_to_pollset, + win_add_to_pollset_set, + win_delete_from_pollset_set, + win_shutdown, + win_destroy, + win_get_resource_user, + win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, From 8ba4bf473a57123279c10ee062a6a4f21a9abd2b Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 5 Oct 2017 13:16:33 -0700 Subject: [PATCH 05/27] Return LB picks with an error upon shutdown. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 10 +++--- .../lb_policy/pick_first/pick_first.cc | 31 +++++++++---------- .../lb_policy/round_robin/round_robin.cc | 31 +++++++++---------- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index d8e314d1f9a..0d2241d9914 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1033,15 +1033,17 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); pp = next; } while (pping != NULL) { pending_ping *next = pping->next; - GRPC_CLOSURE_SCHED(exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); pping = next; } } diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index b07fc3b720b..56c261ba341 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -102,12 +102,23 @@ static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { + pending_pick *pp; + while ((pp = p->pending_picks) != NULL) { + p->pending_picks = pp->next; + *pp->target = NULL; + GRPC_CLOSURE_SCHED( + exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pp); + } +} + static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - pending_pick *pp; p->shutdown = true; - pp = p->pending_picks; - p->pending_picks = NULL; + fail_pending_picks_for_shutdown(exec_ctx, p); grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"), "shutdown"); @@ -120,13 +131,6 @@ static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); } - while (pp != NULL) { - pending_pick *next = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - pp = next; - } } static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -637,12 +641,7 @@ static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick first exhausted channels", &error, 1), "no_more_channels"); - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } + fail_pending_picks_for_shutdown(exec_ctx, p); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); } else { diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc index 6812bb50cd9..de163f63002 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc @@ -315,15 +315,10 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { - round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", - (void *)pol, (void *)pol); - } - p->shutdown = true; +static void fail_pending_picks_for_shutdown(grpc_exec_ctx *exec_ctx, + round_robin_lb_policy *p) { pending_pick *pp; - while ((pp = p->pending_picks)) { + while ((pp = p->pending_picks) != NULL) { p->pending_picks = pp->next; *pp->target = NULL; GRPC_CLOSURE_SCHED( @@ -331,6 +326,16 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); gpr_free(pp); } +} + +static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + round_robin_lb_policy *p = (round_robin_lb_policy *)pol; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", + (void *)pol, (void *)pol); + } + p->shutdown = true; + fail_pending_picks_for_shutdown(exec_ctx, p); grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); @@ -621,14 +626,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - // the policy is shutting down. Flush all the pending picks... - pending_pick *pp; - while ((pp = p->pending_picks)) { - p->pending_picks = pp->next; - *pp->target = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); - gpr_free(pp); - } + // The policy is shutting down. Fail all of the pending picks. + fail_pending_picks_for_shutdown(exec_ctx, p); } rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown+started_picking"); From 7a2db96ed70a56ab21d0bb0302d5da0a91fdda3f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 6 Oct 2017 15:06:12 -0700 Subject: [PATCH 06/27] Fix memory leak on grpclb shutdown. --- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 0d2241d9914..d417bd143d1 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1036,6 +1036,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_CLOSURE_SCHED( exec_ctx, &pp->wrapped_on_complete_arg.wrapper_closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pp); pp = next; } @@ -1044,6 +1045,7 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_CLOSURE_SCHED( exec_ctx, &pping->wrapped_notify_arg.wrapper_closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown")); + gpr_free(pping); pping = next; } } From bf6185c3b1f019886650578b3837ab13cf1e3cdf Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 9 Oct 2017 17:04:46 +1100 Subject: [PATCH 07/27] make ruby proxy worker accept script location This is done so that we can test other PHP clients. --- src/ruby/qps/proxy-worker.rb | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/ruby/qps/proxy-worker.rb b/src/ruby/qps/proxy-worker.rb index ae7006e7d60..478370c5bf2 100755 --- a/src/ruby/qps/proxy-worker.rb +++ b/src/ruby/qps/proxy-worker.rb @@ -31,9 +31,10 @@ require 'src/proto/grpc/testing/services_services_pb' require 'src/proto/grpc/testing/proxy-service_services_pb' class ProxyBenchmarkClientServiceImpl < Grpc::Testing::ProxyClientService::Service - def initialize(port, c_ext) + def initialize(port, c_ext, php_client_bin) @mytarget = "localhost:" + port.to_s @use_c_ext = c_ext + @php_client_bin = php_client_bin end def setup(config) @config = config @@ -44,11 +45,11 @@ class ProxyBenchmarkClientServiceImpl < Grpc::Testing::ProxyClientService::Servi # TODO(vjpai): Support multiple client channels by spawning off a PHP client per channel if @use_c_ext puts "Use protobuf c extension" - command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/vendor/google/protobuf/php/ext/google/protobuf/modules/protobuf.so " + "-d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget + command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/vendor/google/protobuf/php/ext/google/protobuf/modules/protobuf.so " + "-d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/" + @php_client_bin + " " + @mytarget else puts "Use protobuf php extension" - command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/../../php/tests/qps/client.php " + @mytarget - end + command = "php -d extension=" + File.expand_path(File.dirname(__FILE__)) + "/../../php/ext/grpc/modules/grpc.so " + File.expand_path(File.dirname(__FILE__)) + "/" + @php_client_bin + " " + @mytarget + end puts "Starting command: " + command @php_pid = spawn(command) end @@ -128,7 +129,8 @@ end def proxymain options = { - 'driver_port' => 0 + 'driver_port' => 0, + 'php_client_bin' => '../../php/tests/qps/client.php' } OptionParser.new do |opts| opts.banner = 'Usage: [--driver_port ]' @@ -138,6 +140,10 @@ def proxymain opts.on("-c", "--[no-]c_proto_ext", "Use protobuf C-extention") do |c| options[:c_ext] = c end + opts.on("-b" "--php_client_bin [FILE]", + "PHP client to execute; path relative to this script") do |c| + options['php_client_bin'] = c + end end.parse! # Configure any errors with client or server child threads to surface @@ -146,7 +152,7 @@ def proxymain s = GRPC::RpcServer.new port = s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) - bmc = ProxyBenchmarkClientServiceImpl.new(port, options[:c_ext]) + bmc = ProxyBenchmarkClientServiceImpl.new(port, options[:c_ext], options['php_client_bin']) s.handle(bmc) s.handle(ProxyWorkerServiceImpl.new(s, bmc)) s.run From 410ba6dada8ba698415e73d1cb30d9dbdddec8d9 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 13 Oct 2017 11:08:43 +1100 Subject: [PATCH 08/27] format --- src/ruby/qps/proxy-worker.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ruby/qps/proxy-worker.rb b/src/ruby/qps/proxy-worker.rb index fdea42d1db5..4c7c510fdbd 100755 --- a/src/ruby/qps/proxy-worker.rb +++ b/src/ruby/qps/proxy-worker.rb @@ -157,8 +157,8 @@ def proxymain opts.on("-c", "--[no-]use_protobuf_c_extension", "Use protobuf C-extention") do |c| options[:c_ext] = c end - opts.on("-b" "--php_client_bin [FILE]", - "PHP client to execute; path relative to this script") do |c| + opts.on("-b", "--php_client_bin [FILE]", + "PHP client to execute; path relative to this script") do |c| options['php_client_bin'] = c end end.parse! From 8db6f9b16ba954e15b232ead2670dd92fe0b3c01 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 20 Oct 2017 13:21:52 -0700 Subject: [PATCH 09/27] Removed wrong assert from grpclb --- src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index ffd58129c6e..71d5bfec98a 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1805,7 +1805,6 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, case GRPC_CHANNEL_IDLE: // lb channel inactive (probably shutdown prior to update). Restart lb // call to kick the lb channel into gear. - GPR_ASSERT(glb_policy->lb_call == NULL); /* fallthrough */ case GRPC_CHANNEL_READY: if (glb_policy->lb_call != NULL) { From 2b217d41c4c8cb4269b569591141b0d88ff9a9a2 Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Fri, 20 Oct 2017 15:56:30 -0700 Subject: [PATCH 10/27] clang-format --- .../ext/filters/client_channel/lb_policy/grpclb/grpclb.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 71d5bfec98a..e159188ad74 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -1803,8 +1803,8 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, break; } case GRPC_CHANNEL_IDLE: - // lb channel inactive (probably shutdown prior to update). Restart lb - // call to kick the lb channel into gear. + // lb channel inactive (probably shutdown prior to update). Restart lb + // call to kick the lb channel into gear. /* fallthrough */ case GRPC_CHANNEL_READY: if (glb_policy->lb_call != NULL) { From 0c4e14f0d440c4ec29e5f33014c45df59e369337 Mon Sep 17 00:00:00 2001 From: Adele Zhou Date: Fri, 20 Oct 2017 16:39:18 -0700 Subject: [PATCH 11/27] Do not reuse job_specs now that we incorporate release info in the shortname --- tools/interop_matrix/run_interop_matrix_tests.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py index d037e139218..bb7a8647c7c 100755 --- a/tools/interop_matrix/run_interop_matrix_tests.py +++ b/tools/interop_matrix/run_interop_matrix_tests.py @@ -122,15 +122,13 @@ def find_all_images_for_lang(lang): return images # caches test cases (list of JobSpec) loaded from file. Keyed by lang and runtime. -_loaded_testcases = {} def find_test_cases(lang, release, suite_name): """Returns the list of test cases from testcase files per lang/release.""" file_tmpl = os.path.join(os.path.dirname(__file__), 'testcases/%s__%s') + testcase_release = release if not os.path.exists(file_tmpl % (lang, release)): - release = 'master' - testcases = file_tmpl % (lang, release) - if lang in _loaded_testcases.keys() and release in _loaded_testcases[lang].keys(): - return _loaded_testcases[lang][release] + testcase_release = 'master' + testcases = file_tmpl % (lang, testcase_release) job_spec_list=[] try: @@ -155,9 +153,6 @@ def find_test_cases(lang, release, suite_name): do_newline=True) except IOError as err: jobset.message('FAILED', err, do_newline=True) - if lang not in _loaded_testcases.keys(): - _loaded_testcases[lang] = {} - _loaded_testcases[lang][release]=job_spec_list return job_spec_list _xml_report_tree = report_utils.new_junit_xml_tree() From ae6bca4420cac3dddbe7794a8725f7b11c0d7bc8 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Thu, 19 Oct 2017 18:14:02 -0700 Subject: [PATCH 12/27] Make fix objc build problem --- gRPC-Core.podspec | 1 + src/core/ext/transport/chttp2/transport/flow_control.cc | 2 +- templates/gRPC-Core.podspec.template | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index a9b451e3dea..df9ec5a7c57 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -89,6 +89,7 @@ Pod::Spec.new do |s| s.default_subspecs = 'Interface', 'Implementation' s.compiler_flags = '-DGRPC_ARES=0' + s.libraries = 'c++' # Like many other C libraries, gRPC-Core has its public headers under `include//` and its # sources and private headers in other directories outside `include/`. Cocoapods' linter doesn't diff --git a/src/core/ext/transport/chttp2/transport/flow_control.cc b/src/core/ext/transport/chttp2/transport/flow_control.cc index dd80036530c..40545bc74b4 100644 --- a/src/core/ext/transport/chttp2/transport/flow_control.cc +++ b/src/core/ext/transport/chttp2/transport/flow_control.cc @@ -165,7 +165,7 @@ TransportFlowControl::TransportFlowControl(grpc_exec_ctx* exec_ctx, uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) { FlowControlTrace trace("t updt sent", this, nullptr); - const uint32_t target_announced_window = target_window(); + const uint32_t target_announced_window = (const uint32_t)target_window(); if ((writing_anyway || announced_window_ <= target_announced_window / 2) && announced_window_ != target_announced_window) { const uint32_t announce = (uint32_t)GPR_CLAMP( diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template index 5657df85211..2836ab1f75b 100644 --- a/templates/gRPC-Core.podspec.template +++ b/templates/gRPC-Core.podspec.template @@ -116,6 +116,7 @@ s.default_subspecs = 'Interface', 'Implementation' s.compiler_flags = '-DGRPC_ARES=0' + s.libraries = 'c++' # Like many other C libraries, gRPC-Core has its public headers under `include//` and its # sources and private headers in other directories outside `include/`. Cocoapods' linter doesn't From 48be9dedc97448797b51bd3c1d3e8f34e6012389 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 23 Oct 2017 12:27:37 -0700 Subject: [PATCH 13/27] Restore logic from before #12297. --- .../filters/client_channel/client_channel.cc | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index d9695bbf8a0..ea5e076c3be 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -1205,6 +1205,9 @@ static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, "Pick cancelled", &error, 1)); } +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem); + static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1228,7 +1231,7 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, chand, calld); } async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); - } else { + } else if (chand->lb_policy != NULL) { if (GRPC_TRACER_ON(grpc_client_channel_trace)) { gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", chand, calld); @@ -1242,6 +1245,30 @@ static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, async_pick_done_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } + // TODO(roth): It should be impossible for chand->lb_policy to be NULL + // here, so the rest of this code should never actually be executed. + // However, we have reports of a crash on iOS that triggers this case, + // so we are temporarily adding this to restore branches that were + // removed in https://github.com/grpc/grpc/pull/12297. Need to figure + // out what is actually causing this to occur and then figure out the + // right way to deal with it. + else if (chand->resolver != NULL) { + // No LB policy, so try again. + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: resolver returned but no LB policy, " + "trying again", + chand, calld); + } + pick_after_resolver_result_start_locked(exec_ctx, elem); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver disconnected", chand, + calld); + } + async_pick_done_locked( + exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + } } static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, From cbdb5174caebe057913d27e0898f8d9e72da0742 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Mon, 23 Oct 2017 14:11:42 -0700 Subject: [PATCH 14/27] Remove duplicate message member in test --- test/cpp/end2end/grpclb_end2end_test.cc | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc index f73a9c17917..c370302c499 100644 --- a/test/cpp/end2end/grpclb_end2end_test.cc +++ b/test/cpp/end2end/grpclb_end2end_test.cc @@ -332,8 +332,7 @@ class GrpclbEnd2endTest : public ::testing::Test { num_backends_(num_backends), num_balancers_(num_balancers), client_load_reporting_interval_seconds_( - client_load_reporting_interval_seconds), - kRequestMessage_("Live long and prosper.") {} + client_load_reporting_interval_seconds) {} void SetUp() override { response_generator_ = grpc_fake_resolver_response_generator_create(); @@ -559,7 +558,6 @@ class GrpclbEnd2endTest : public ::testing::Test { std::unique_ptr thread_; }; - const grpc::string kMessage_ = "Live long and prosper."; const grpc::string server_host_; const size_t num_backends_; const size_t num_balancers_; @@ -571,7 +569,7 @@ class GrpclbEnd2endTest : public ::testing::Test { std::vector> backend_servers_; std::vector> balancer_servers_; grpc_fake_resolver_response_generator* response_generator_; - const grpc::string kRequestMessage_; + const grpc::string kRequestMessage_ = "Live long and prosper."; }; class SingleBalancerTest : public GrpclbEnd2endTest { @@ -1086,7 +1084,7 @@ TEST_F(SingleBalancerTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); @@ -1210,7 +1208,7 @@ TEST_F(SingleBalancerWithClientLoadReportingTest, Drop) { } else { EXPECT_TRUE(status.ok()) << "code=" << status.error_code() << " message=" << status.error_message(); - EXPECT_EQ(response.message(), kMessage_); + EXPECT_EQ(response.message(), kRequestMessage_); } } EXPECT_EQ(kNumRpcsPerAddress * num_of_drop_addresses, num_drops); From 9b9a44e4dd1087a1cbdde3a3884cc7beea031d58 Mon Sep 17 00:00:00 2001 From: Justin Burke Date: Tue, 3 Oct 2017 18:09:41 -0700 Subject: [PATCH 15/27] Expose conversion of grpc to tsi cert pairs --- CMakeLists.txt | 31 +++++++++ Makefile | 36 ++++++++++ build.yaml | 10 +++ .../credentials/ssl/ssl_credentials.cc | 60 +++++++++-------- .../credentials/ssl/ssl_credentials.h | 15 +++++ .../security/transport/security_connector.cc | 7 +- .../security/transport/security_connector.h | 2 +- test/core/security/BUILD | 12 ++++ test/core/security/ssl_credentials_test.c | 66 +++++++++++++++++++ .../generated/sources_and_headers.json | 17 +++++ tools/run_tests/generated/tests.json | 24 +++++++ 11 files changed, 250 insertions(+), 30 deletions(-) create mode 100644 test/core/security/ssl_credentials_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 579621b0908..3ac6c9b63d9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -459,6 +459,7 @@ add_dependencies(buildtests_c grpc_json_token_test) endif() add_dependencies(buildtests_c grpc_jwt_verifier_test) add_dependencies(buildtests_c grpc_security_connector_test) +add_dependencies(buildtests_c grpc_ssl_credentials_test) if(_gRPC_PLATFORM_LINUX) add_dependencies(buildtests_c handshake_client) endif() @@ -7290,6 +7291,36 @@ target_link_libraries(grpc_security_connector_test gpr ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) + +add_executable(grpc_ssl_credentials_test + test/core/security/ssl_credentials_test.c +) + + +target_include_directories(grpc_ssl_credentials_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${BORINGSSL_ROOT_DIR}/include + PRIVATE ${PROTOBUF_ROOT_DIR}/src + PRIVATE ${BENCHMARK_ROOT_DIR}/include + PRIVATE ${ZLIB_ROOT_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib + PRIVATE ${CARES_INCLUDE_DIR} + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares + PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/third_party/abseil-cpp +) + +target_link_libraries(grpc_ssl_credentials_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr_test_util + gpr +) + endif (gRPC_BUILD_TESTS) add_executable(grpc_verify_jwt diff --git a/Makefile b/Makefile index bb02c9bdf07..325c9123d99 100644 --- a/Makefile +++ b/Makefile @@ -1019,6 +1019,7 @@ grpc_json_token_test: $(BINDIR)/$(CONFIG)/grpc_json_token_test grpc_jwt_verifier_test: $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test grpc_print_google_default_creds_token: $(BINDIR)/$(CONFIG)/grpc_print_google_default_creds_token grpc_security_connector_test: $(BINDIR)/$(CONFIG)/grpc_security_connector_test +grpc_ssl_credentials_test: $(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test grpc_verify_jwt: $(BINDIR)/$(CONFIG)/grpc_verify_jwt handshake_client: $(BINDIR)/$(CONFIG)/handshake_client handshake_server: $(BINDIR)/$(CONFIG)/handshake_server @@ -1413,6 +1414,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/grpc_json_token_test \ $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test \ $(BINDIR)/$(CONFIG)/grpc_security_connector_test \ + $(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test \ $(BINDIR)/$(CONFIG)/handshake_client \ $(BINDIR)/$(CONFIG)/handshake_server \ $(BINDIR)/$(CONFIG)/hpack_parser_test \ @@ -1886,6 +1888,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test || ( echo test grpc_jwt_verifier_test failed ; exit 1 ) $(E) "[RUN] Testing grpc_security_connector_test" $(Q) $(BINDIR)/$(CONFIG)/grpc_security_connector_test || ( echo test grpc_security_connector_test failed ; exit 1 ) + $(E) "[RUN] Testing grpc_ssl_credentials_test" + $(Q) $(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test || ( echo test grpc_ssl_credentials_test failed ; exit 1 ) $(E) "[RUN] Testing handshake_client" $(Q) $(BINDIR)/$(CONFIG)/handshake_client || ( echo test handshake_client failed ; exit 1 ) $(E) "[RUN] Testing handshake_server" @@ -11119,6 +11123,38 @@ endif endif +GRPC_SSL_CREDENTIALS_TEST_SRC = \ + test/core/security/ssl_credentials_test.c \ + +GRPC_SSL_CREDENTIALS_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPC_SSL_CREDENTIALS_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test: $(GRPC_SSL_CREDENTIALS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(GRPC_SSL_CREDENTIALS_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/grpc_ssl_credentials_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/security/ssl_credentials_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_grpc_ssl_credentials_test: $(GRPC_SSL_CREDENTIALS_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GRPC_SSL_CREDENTIALS_TEST_OBJS:.o=.dep) +endif +endif + + GRPC_VERIFY_JWT_SRC = \ test/core/security/verify_jwt.c \ diff --git a/build.yaml b/build.yaml index e2faa438abe..d43d36f4809 100644 --- a/build.yaml +++ b/build.yaml @@ -2540,6 +2540,16 @@ targets: - grpc - gpr_test_util - gpr +- name: grpc_ssl_credentials_test + build: test + language: c + src: + - test/core/security/ssl_credentials_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr - name: grpc_verify_jwt build: tool language: c diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 290336adc05..8e47aebedb3 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -31,18 +31,21 @@ // SSL Channel Credentials. // -static void ssl_config_pem_key_cert_pair_destroy( - tsi_ssl_pem_key_cert_pair *kp) { +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs) { if (kp == NULL) return; - gpr_free((void *)kp->private_key); - gpr_free((void *)kp->cert_chain); + for (size_t i = 0; i < num_key_cert_pairs; i++) { + gpr_free((void *)kp[i].private_key); + gpr_free((void *)kp[i].cert_chain); + } + gpr_free(kp); } static void ssl_destruct(grpc_exec_ctx *exec_ctx, grpc_channel_credentials *creds) { grpc_ssl_credentials *c = (grpc_ssl_credentials *)creds; gpr_free(c->config.pem_root_certs); - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pair); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pair, 1); } static grpc_security_status ssl_create_security_connector( @@ -85,9 +88,11 @@ static void ssl_build_config(const char *pem_root_certs, if (pem_key_cert_pair != NULL) { GPR_ASSERT(pem_key_cert_pair->private_key != NULL); GPR_ASSERT(pem_key_cert_pair->cert_chain != NULL); - config->pem_key_cert_pair.cert_chain = + config->pem_key_cert_pair = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + sizeof(tsi_ssl_pem_key_cert_pair)); + config->pem_key_cert_pair->cert_chain = gpr_strdup(pem_key_cert_pair->cert_chain); - config->pem_key_cert_pair.private_key = + config->pem_key_cert_pair->private_key = gpr_strdup(pem_key_cert_pair->private_key); } } @@ -117,11 +122,8 @@ grpc_channel_credentials *grpc_ssl_credentials_create( static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - size_t i; - for (i = 0; i < c->config.num_key_cert_pairs; i++) { - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pairs[i]); - } - gpr_free(c->config.pem_key_cert_pairs); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pairs, + c->config.num_key_cert_pairs); gpr_free(c->config.pem_root_certs); } @@ -136,30 +138,36 @@ static grpc_security_status ssl_server_create_security_connector( static grpc_server_credentials_vtable ssl_server_vtable = { ssl_server_destruct, ssl_server_create_security_connector}; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs) { + tsi_ssl_pem_key_cert_pair *tsi_pairs = NULL; + if (num_key_cert_pairs > 0) { + GPR_ASSERT(pem_key_cert_pairs != NULL); + tsi_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); + } + for (size_t i = 0; i < num_key_cert_pairs; i++) { + GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); + GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); + tsi_pairs[i].cert_chain = gpr_strdup(pem_key_cert_pairs[i].cert_chain); + tsi_pairs[i].private_key = gpr_strdup(pem_key_cert_pairs[i].private_key); + } + return tsi_pairs; +} + static void ssl_build_server_config( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_config *config) { - size_t i; config->client_certificate_request = client_certificate_request; if (pem_root_certs != NULL) { config->pem_root_certs = gpr_strdup(pem_root_certs); } - if (num_key_cert_pairs > 0) { - GPR_ASSERT(pem_key_cert_pairs != NULL); - config->pem_key_cert_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( - num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); - } + config->pem_key_cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs( + pem_key_cert_pairs, num_key_cert_pairs); config->num_key_cert_pairs = num_key_cert_pairs; - for (i = 0; i < num_key_cert_pairs; i++) { - GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); - GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); - config->pem_key_cert_pairs[i].cert_chain = - gpr_strdup(pem_key_cert_pairs[i].cert_chain); - config->pem_key_cert_pairs[i].private_key = - gpr_strdup(pem_key_cert_pairs[i].private_key); - } } grpc_server_credentials *grpc_ssl_server_credentials_create( diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h index b43c656cd79..42e425d9f15 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.h +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h @@ -20,6 +20,10 @@ #include "src/core/lib/security/credentials/credentials.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { grpc_channel_credentials base; grpc_ssl_config config; @@ -30,4 +34,15 @@ typedef struct { grpc_ssl_server_config config; } grpc_ssl_server_credentials; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs); + +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs); + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_SSL_SSL_CREDENTIALS_H */ diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 80d9a7b77f6..b050be2129c 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -942,10 +942,11 @@ grpc_security_status grpc_ssl_channel_security_connector_create( c->overridden_target_name = gpr_strdup(overridden_target_name); } - has_key_cert_pair = config->pem_key_cert_pair.private_key != NULL && - config->pem_key_cert_pair.cert_chain != NULL; + has_key_cert_pair = config->pem_key_cert_pair != NULL && + config->pem_key_cert_pair->private_key != NULL && + config->pem_key_cert_pair->cert_chain != NULL; result = tsi_create_ssl_client_handshaker_factory( - has_key_cert_pair ? &config->pem_key_cert_pair : NULL, pem_root_certs, + has_key_cert_pair ? config->pem_key_cert_pair : NULL, pem_root_certs, ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, &c->client_handshaker_factory); if (result != TSI_OK) { diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 216bb35e811..8287151f44a 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -204,7 +204,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( /* Config for ssl clients. */ typedef struct { - tsi_ssl_pem_key_cert_pair pem_key_cert_pair; + tsi_ssl_pem_key_cert_pair *pem_key_cert_pair; char *pem_root_certs; } grpc_ssl_config; diff --git a/test/core/security/BUILD b/test/core/security/BUILD index dc417599222..83b1747648f 100644 --- a/test/core/security/BUILD +++ b/test/core/security/BUILD @@ -91,6 +91,18 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "ssl_credentials_test", + srcs = ["ssl_credentials_test.c"], + language = "C", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ] +) + grpc_cc_binary( name = "create_jwt", srcs = ["create_jwt.c"], diff --git a/test/core/security/ssl_credentials_test.c b/test/core/security/ssl_credentials_test.c new file mode 100644 index 00000000000..3c838faa607 --- /dev/null +++ b/test/core/security/ssl_credentials_test.c @@ -0,0 +1,66 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +#include +#include +#include + +#include "src/core/lib/security/credentials/ssl/ssl_credentials.h" +#include "src/core/tsi/ssl_transport_security.h" +#include "test/core/util/test_config.h" + +static void test_convert_grpc_to_tsi_cert_pairs() { + grpc_ssl_pem_key_cert_pair grpc_pairs[] = {{"private_key1", "cert_chain1"}, + {"private_key2", "cert_chain2"}, + {"private_key3", "cert_chain3"}}; + const size_t num_pairs = 3; + + { + tsi_ssl_pem_key_cert_pair *tsi_pairs = + grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, 0); + GPR_ASSERT(tsi_pairs == NULL); + } + + { + tsi_ssl_pem_key_cert_pair *tsi_pairs = + grpc_convert_grpc_to_tsi_cert_pairs(grpc_pairs, num_pairs); + + GPR_ASSERT(tsi_pairs != NULL); + for (size_t i = 0; i < num_pairs; i++) { + GPR_ASSERT(strncmp(grpc_pairs[i].private_key, tsi_pairs[i].private_key, + strlen(grpc_pairs[i].private_key)) == 0); + GPR_ASSERT(strncmp(grpc_pairs[i].cert_chain, tsi_pairs[i].cert_chain, + strlen(grpc_pairs[i].cert_chain)) == 0); + } + + grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_pairs, num_pairs); + } +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + + test_convert_grpc_to_tsi_cert_pairs(); + + grpc_shutdown(); + return 0; +} diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 7fa3d28834b..46f4cb65329 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -1219,6 +1219,23 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "grpc_ssl_credentials_test", + "src": [ + "test/core/security/ssl_credentials_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index da763725e62..15d38eb05b7 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1535,6 +1535,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "grpc_ssl_credentials_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, From ca31256597ced522371eed101d756adb4a860e2c Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Mon, 23 Oct 2017 18:26:21 -0700 Subject: [PATCH 16/27] Remove redundant WaitForConnected --- test/cpp/qps/client.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index 48c8995666e..a5049e5502b 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -433,9 +433,6 @@ class ClientImpl : public Client { !config.security_params().use_test_ca(), std::shared_ptr(), args); gpr_log(GPR_INFO, "Connecting to %s", target.c_str()); - GPR_ASSERT(channel_->WaitForConnected( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(300, GPR_TIMESPAN)))); is_inproc_ = false; } else { grpc::string tgt = target; From 6a0874b4054fec02e41957f573509213ab0d785b Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Thu, 5 Oct 2017 23:47:36 -0700 Subject: [PATCH 17/27] Fix bm_chttp2_transport --- .../cpp/microbenchmarks/bm_chttp2_transport.cc | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 070034fe336..639fd5fb154 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -46,10 +46,16 @@ auto &force_library_initialization = Library::get(); class DummyEndpoint : public grpc_endpoint { public: DummyEndpoint() { - static const grpc_endpoint_vtable my_vtable = { - read, write, add_to_pollset, add_to_pollset_set, - shutdown, destroy, get_resource_user, get_peer, - get_fd}; + static const grpc_endpoint_vtable my_vtable = {read, + write, + add_to_pollset, + add_to_pollset_set, + delete_from_pollset_set, + shutdown, + destroy, + get_resource_user, + get_peer, + get_fd}; grpc_endpoint::vtable = &my_vtable; ru_ = grpc_resource_user_create(Library::get().rq(), "dummy_endpoint"); } @@ -104,6 +110,10 @@ class DummyEndpoint : public grpc_endpoint { static void add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset) {} + static void delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) {} + static void shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_resource_user_shutdown(exec_ctx, From 6bf4bcef04ed216ec3f7a1a2dabe12f52329cb5c Mon Sep 17 00:00:00 2001 From: ncteisen Date: Mon, 23 Oct 2017 22:19:01 -0700 Subject: [PATCH 18/27] Fix bm_diff --- test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 46 ++++++++++----------- tools/profiling/microbenchmarks/bm_json.py | 2 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index f813bb7b646..0bf5daafa0b 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -443,7 +443,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data, } template + void (*OnHeader)(grpc_exec_ctx *, void *, grpc_mdelem)> static void BM_HpackParserParseHeader(benchmark::State &state) { TrackCounters track_counters; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -836,32 +836,32 @@ class SameDeadline { } }; -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeClientInitialMetadata); + RepresentativeClientInitialMetadata, UnrefHeader ); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - MoreRepresentativeClientInitialMetadata); + MoreRepresentativeClientInitialMetadata, UnrefHeader ); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerInitialMetadata); + RepresentativeServerInitialMetadata, UnrefHeader ); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerTrailingMetadata); + RepresentativeServerTrailingMetadata, UnrefHeader ); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderOld); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew); diff --git a/tools/profiling/microbenchmarks/bm_json.py b/tools/profiling/microbenchmarks/bm_json.py index f6082fe7b4a..bdd317405f3 100644 --- a/tools/profiling/microbenchmarks/bm_json.py +++ b/tools/profiling/microbenchmarks/bm_json.py @@ -76,7 +76,7 @@ _BM_SPECS = { 'dyn': ['end_of_stream', 'request_size'], }, 'BM_HpackParserParseHeader': { - 'tpl': ['fixture'], + 'tpl': ['fixture', 'on_header'], 'dyn': [], }, 'BM_CallCreateDestroy': { From 1b1e8d61493e98aa96600d8639f52e0996e8c740 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 24 Oct 2017 11:15:17 +0200 Subject: [PATCH 19/27] unref resource quota on windows --- src/core/lib/iomgr/tcp_windows.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index dc84e564a9b..ad939d3b508 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -442,6 +442,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } From 9e3eedb6afdd13cdb8f774e2270294ef57f9046b Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 24 Oct 2017 08:33:00 -0700 Subject: [PATCH 20/27] Remove old header benchmark --- test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 29 --------------------- 1 file changed, 29 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 0bf5daafa0b..2d712882daa 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -794,34 +794,6 @@ static void OnHeaderNew(grpc_exec_ctx *exec_ctx, void *user_data, } } -// Current implementation. -static void OnHeaderOld(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_mdelem md) { - if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_GRPC_TIMEOUT)) { - grpc_millis *cached_timeout = - static_cast(grpc_mdelem_get_user_data(md, free_timeout)); - grpc_millis timeout; - if (cached_timeout == NULL) { - /* not already parsed: parse it now, and store the result away */ - cached_timeout = (grpc_millis *)gpr_malloc(sizeof(grpc_millis)); - if (!grpc_http2_decode_timeout(GRPC_MDVALUE(md), cached_timeout)) { - char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); - gpr_log(GPR_ERROR, "Ignoring bad timeout value '%s'", val); - gpr_free(val); - *cached_timeout = GRPC_MILLIS_INF_FUTURE; - } - timeout = *cached_timeout; - grpc_mdelem_set_user_data(md, free_timeout, cached_timeout); - } else { - timeout = *cached_timeout; - } - benchmark::DoNotOptimize(timeout); - GRPC_MDELEM_UNREF(exec_ctx, md); - } else { - GPR_ASSERT(0); - } -} - // Send the same deadline repeatedly class SameDeadline { public: @@ -863,7 +835,6 @@ BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, RepresentativeServerTrailingMetadata, UnrefHeader ); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderOld); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew); } // namespace hpack_parser_fixtures From c296e82e11bfd6cb8aa4c796f2f7276595259be9 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 24 Oct 2017 08:58:14 -0700 Subject: [PATCH 21/27] clang fmt --- test/cpp/microbenchmarks/bm_chttp2_hpack.cc | 59 +++++++++++++-------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc index 2d712882daa..bc2157b9f13 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_hpack.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_hpack.cc @@ -442,8 +442,7 @@ static void UnrefHeader(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_MDELEM_UNREF(exec_ctx, md); } -template +template static void BM_HpackParserParseHeader(benchmark::State &state) { TrackCounters track_counters; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -809,31 +808,47 @@ class SameDeadline { }; BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, EmptyBatch, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleStaticElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, IndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, AddIndexedSingleInternedElem, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, KeyIndexedSingleInternedElem, + UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedElem, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>, UnrefHeader); -BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>, UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, false>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<1, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<3, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<10, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<31, true>, + UnrefHeader); +BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, NonIndexedBinaryElem<100, true>, + UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeClientInitialMetadata, UnrefHeader ); + RepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - MoreRepresentativeClientInitialMetadata, UnrefHeader ); + MoreRepresentativeClientInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerInitialMetadata, UnrefHeader ); + RepresentativeServerInitialMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, - RepresentativeServerTrailingMetadata, UnrefHeader ); + RepresentativeServerTrailingMetadata, UnrefHeader); BENCHMARK_TEMPLATE(BM_HpackParserParseHeader, SameDeadline, OnHeaderNew); From 4c9fa854f691726e73efeca20e7141c0f8dc10c6 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Tue, 24 Oct 2017 08:59:05 -0700 Subject: [PATCH 22/27] Add debugging prints --- tools/profiling/microbenchmarks/bm_json.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/profiling/microbenchmarks/bm_json.py b/tools/profiling/microbenchmarks/bm_json.py index bdd317405f3..eb450ee6ada 100644 --- a/tools/profiling/microbenchmarks/bm_json.py +++ b/tools/profiling/microbenchmarks/bm_json.py @@ -157,6 +157,9 @@ def parse_name(name): rest = s[0] dyn_args = s[1:] name = rest + print (name) + print (dyn_args, _BM_SPECS[name]['dyn']) + print (tpl_args, _BM_SPECS[name]['tpl']) assert name in _BM_SPECS, '_BM_SPECS needs to be expanded for %s' % name assert len(dyn_args) == len(_BM_SPECS[name]['dyn']) assert len(tpl_args) == len(_BM_SPECS[name]['tpl']) From 40b8cbec062be571f6a5644251ef13a3c9be1c9a Mon Sep 17 00:00:00 2001 From: David Garcia Quintas Date: Tue, 24 Oct 2017 10:05:51 -0700 Subject: [PATCH 23/27] Disable epollex for LB tests while failures are investigated --- build.yaml | 2 ++ tools/run_tests/generated/tests.json | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/build.yaml b/build.yaml index e2faa438abe..437883d4388 100644 --- a/build.yaml +++ b/build.yaml @@ -3948,6 +3948,7 @@ targets: excluded_poll_engines: - poll - poll-cv + - epollex - name: codegen_test_full gtest: true build: test @@ -4256,6 +4257,7 @@ targets: excluded_poll_engines: - poll - poll-cv + - epollex - name: h2_ssl_cert_test gtest: true build: test diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index da763725e62..89ba8188768 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3523,7 +3523,8 @@ "exclude_iomgrs": [], "excluded_poll_engines": [ "poll", - "poll-cv" + "poll-cv", + "epollex" ], "flaky": false, "gtest": true, @@ -3917,7 +3918,8 @@ "exclude_iomgrs": [], "excluded_poll_engines": [ "poll", - "poll-cv" + "poll-cv", + "epollex" ], "flaky": false, "gtest": false, From 44d820ee2c6d7498cff42910bda5eaafc9411f14 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 24 Oct 2017 13:27:39 -0700 Subject: [PATCH 24/27] One more fix in gRPC-Core.podspec that makes Xcode compile --- gRPC-Core.podspec | 2 +- templates/gRPC-Core.podspec.template | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index df9ec5a7c57..0e3b50c4aad 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1008,7 +1008,7 @@ Pod::Spec.new do |s| 'test/core/end2end/tests/*.{c,h}', 'test/core/end2end/fixtures/*.h', 'test/core/end2end/data/*.{c,h}', - 'test/core/util/debugger_macros.{c,h}', + 'test/core/util/debugger_macros.{cc,h}', 'test/core/util/test_config.{c,h}', 'test/core/util/port.h', 'test/core/util/port.c', diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template index 2836ab1f75b..c329d2dedc2 100644 --- a/templates/gRPC-Core.podspec.template +++ b/templates/gRPC-Core.podspec.template @@ -175,7 +175,7 @@ 'test/core/end2end/tests/*.{c,h}', 'test/core/end2end/fixtures/*.h', 'test/core/end2end/data/*.{c,h}', - 'test/core/util/debugger_macros.{c,h}', + 'test/core/util/debugger_macros.{cc,h}', 'test/core/util/test_config.{c,h}', 'test/core/util/port.h', 'test/core/util/port.c', From 42bd87e376913939850bfa78a3c7f96ce83af11e Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Fri, 20 Oct 2017 10:32:30 -0700 Subject: [PATCH 25/27] Adds gRPC Experimental CQ DoThenAsyncNext lambda API --- grpc.def | 2 + .../grpc++/impl/codegen/completion_queue.h | 39 ++++++ include/grpc/grpc.h | 17 +++ src/core/lib/surface/completion_queue.cc | 116 +++++++++++++----- src/core/lib/surface/completion_queue.h | 3 + src/core/lib/surface/init.cc | 1 + src/cpp/common/completion_queue_cc.cc | 25 ++++ src/ruby/ext/grpc/rb_grpc_imports.generated.c | 4 + src/ruby/ext/grpc/rb_grpc_imports.generated.h | 6 + test/core/surface/completion_queue_test.c | 76 ++++++++++++ test/cpp/end2end/async_end2end_test.cc | 111 ++++++++++++++++- test/cpp/qps/client.h | 44 +++---- test/cpp/qps/client_async.cc | 48 +++++--- test/cpp/qps/client_sync.cc | 39 ++++-- test/cpp/qps/server_async.cc | 31 +++-- 15 files changed, 466 insertions(+), 96 deletions(-) diff --git a/grpc.def b/grpc.def index 558be60c3c1..e4281f3ab67 100644 --- a/grpc.def +++ b/grpc.def @@ -54,6 +54,8 @@ EXPORTS grpc_completion_queue_pluck grpc_completion_queue_shutdown grpc_completion_queue_destroy + grpc_completion_queue_thread_local_cache_init + grpc_completion_queue_thread_local_cache_flush grpc_alarm_create grpc_alarm_set grpc_alarm_cancel diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index ca757e2a9c5..e2c0c29dca4 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -109,6 +109,30 @@ class CompletionQueue : private GrpcLibraryCodegen { TIMEOUT ///< deadline was reached. }; + /// EXPERIMENTAL + /// First executes \a F, then reads from the queue, blocking up to + /// \a deadline (or the queue's shutdown). + /// Both \a tag and \a ok are updated upon success (if an event is available + /// within the \a deadline). A \a tag points to an arbitrary location usually + /// employed to uniquely identify an event. + /// + /// \param F[in] Function to execute before calling AsyncNext on this queue. + /// \param tag[out] Upon sucess, updated to point to the event's tag. + /// \param ok[out] Upon sucess, true if read a regular event, false otherwise. + /// \param deadline[in] How long to block in wait for an event. + /// + /// \return The type of event read. + template + NextStatus DoThenAsyncNext(F&& f, void** tag, bool* ok, const T& deadline) { + CompletionQueueTLSCache cache = CompletionQueueTLSCache(this); + f(); + if (cache.Flush(tag, ok)) { + return GOT_EVENT; + } else { + return AsyncNext(tag, ok, deadline); + } + } + /// Read from the queue, blocking up to \a deadline (or the queue's shutdown). /// Both \a tag and \a ok are updated upon success (if an event is available /// within the \a deadline). A \a tag points to an arbitrary location usually @@ -213,6 +237,21 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// EXPERIMENTAL + /// Creates a Thread Local cache to store the first event + /// On this completion queue queued from this thread. Once + /// initialized, it must be flushed on the same thread. + class CompletionQueueTLSCache { + public: + CompletionQueueTLSCache(CompletionQueue* cq); + ~CompletionQueueTLSCache(); + bool Flush(void** tag, bool* ok); + + private: + CompletionQueue* cq_; + bool flushed_; + }; + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 1de289fba45..6df3b8086e8 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -143,6 +143,23 @@ GRPCAPI void grpc_completion_queue_shutdown(grpc_completion_queue *cq); drained and no threads are executing grpc_completion_queue_next */ GRPCAPI void grpc_completion_queue_destroy(grpc_completion_queue *cq); +/*********** EXPERIMENTAL API ************/ +/** Initializes a thread local cache for \a cq. + * grpc_flush_cq_tls_cache() MUST be called on the same thread, + * with the same cq. + */ +GRPCAPI void grpc_completion_queue_thread_local_cache_init( + grpc_completion_queue *cq); + +/*********** EXPERIMENTAL API ************/ +/** Flushes the thread local cache for \a cq. + * Returns 1 if there was contents in the cache. If there was an event + * in \a cq tls cache, its tag is placed in tag, and ok is set to the + * event success. + */ +GRPCAPI int grpc_completion_queue_thread_local_cache_flush( + grpc_completion_queue *cq, void **tag, int *ok); + /** Create a completion queue alarm instance */ GRPCAPI grpc_alarm *grpc_alarm_create(void *reserved); diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 21664f03c8b..5009f786e68 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" @@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif +// Specifies a cq thread local cache. +// The first event that occurs on a thread +// with a cq cache will go into that cache, and +// will only be returned on the thread that initialized the cache. +// NOTE: Only one event will ever be cached. +GPR_TLS_DECL(g_cached_event); +GPR_TLS_DECL(g_cached_cq); + typedef struct { grpc_pollset_worker **worker; void *tag; @@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); +void grpc_cq_global_init() { + gpr_tls_init(&g_cached_event); + gpr_tls_init(&g_cached_cq); +} + +void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) { + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)cq); + } +} + +int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, + void **tag, int *ok) { + grpc_cq_completion *storage = + (grpc_cq_completion *)gpr_tls_get(&g_cached_event); + int ret = 0; + if (storage != NULL && + (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) { + *tag = storage->tag; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + storage->done(&exec_ctx, storage->done_arg, storage); + *ok = (storage->next & (uintptr_t)(1)) == 1; + ret = 1; + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(&exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); + } + grpc_exec_ctx_finish(&exec_ctx); + } + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)0); + + return ret; +} + static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); q->queue_lock = GPR_SPINLOCK_INITIALIZER; @@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); @@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, cq_check_tag(cq, tag, true); /* Used in debug builds only */ - /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - /* Since we do not hold the cq lock here, it is important to do an 'acquire' - load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next - */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { - /* Only kick if this is the first item queued */ - if (is_first) { - gpr_mu_lock(cq->mu); - grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cq->mu); + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq && + (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)storage); + } else { + /* Add the completion to the queue */ + bool is_first = cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release + store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - GRPC_ERROR_UNREF(kick_error); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } } - } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } + } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } - } else { - GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); - gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 69d144bd95c..c02bc5da071 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif +/* Initializes global variables used by completion queues */ +void grpc_cq_global_init(); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index b089da2c54f..058e88f8048 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -64,6 +64,7 @@ static void do_basic_init(void) { gpr_log_verbosity_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); + grpc_cq_global_init(); g_initializations = 0; } diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc index f34b0f3d583..4a2e2be6880 100644 --- a/src/cpp/common/completion_queue_cc.cc +++ b/src/cpp/common/completion_queue_cc.cc @@ -71,4 +71,29 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal( } } +CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache( + CompletionQueue* cq) + : cq_(cq), flushed_(false) { + grpc_completion_queue_thread_local_cache_init(cq_->cq_); +} + +CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() { + GPR_ASSERT(flushed_); +} + +bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) { + int res = 0; + void* res_tag; + flushed_ = true; + if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag, + &res)) { + auto cq_tag = static_cast(res_tag); + *ok = res == 1; + if (cq_tag->FinalizeResult(tag, ok)) { + return true; + } + } + return false; +} + } // namespace grpc diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 70831494fad..cd1bd98abcd 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -77,6 +77,8 @@ grpc_completion_queue_next_type grpc_completion_queue_next_import; grpc_completion_queue_pluck_type grpc_completion_queue_pluck_import; grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import; grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; +grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import; +grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import; grpc_alarm_create_type grpc_alarm_create_import; grpc_alarm_set_type grpc_alarm_set_import; grpc_alarm_cancel_type grpc_alarm_cancel_import; @@ -385,6 +387,8 @@ void grpc_rb_load_imports(HMODULE library) { grpc_completion_queue_pluck_import = (grpc_completion_queue_pluck_type) GetProcAddress(library, "grpc_completion_queue_pluck"); grpc_completion_queue_shutdown_import = (grpc_completion_queue_shutdown_type) GetProcAddress(library, "grpc_completion_queue_shutdown"); grpc_completion_queue_destroy_import = (grpc_completion_queue_destroy_type) GetProcAddress(library, "grpc_completion_queue_destroy"); + grpc_completion_queue_thread_local_cache_init_import = (grpc_completion_queue_thread_local_cache_init_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_init"); + grpc_completion_queue_thread_local_cache_flush_import = (grpc_completion_queue_thread_local_cache_flush_type) GetProcAddress(library, "grpc_completion_queue_thread_local_cache_flush"); grpc_alarm_create_import = (grpc_alarm_create_type) GetProcAddress(library, "grpc_alarm_create"); grpc_alarm_set_import = (grpc_alarm_set_type) GetProcAddress(library, "grpc_alarm_set"); grpc_alarm_cancel_import = (grpc_alarm_cancel_type) GetProcAddress(library, "grpc_alarm_cancel"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index 868772cfc85..c7e78b70dcb 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -212,6 +212,12 @@ extern grpc_completion_queue_shutdown_type grpc_completion_queue_shutdown_import typedef void(*grpc_completion_queue_destroy_type)(grpc_completion_queue *cq); extern grpc_completion_queue_destroy_type grpc_completion_queue_destroy_import; #define grpc_completion_queue_destroy grpc_completion_queue_destroy_import +typedef void(*grpc_completion_queue_thread_local_cache_init_type)(grpc_completion_queue *cq); +extern grpc_completion_queue_thread_local_cache_init_type grpc_completion_queue_thread_local_cache_init_import; +#define grpc_completion_queue_thread_local_cache_init grpc_completion_queue_thread_local_cache_init_import +typedef int(*grpc_completion_queue_thread_local_cache_flush_type)(grpc_completion_queue *cq, void **tag, int *ok); +extern grpc_completion_queue_thread_local_cache_flush_type grpc_completion_queue_thread_local_cache_flush_import; +#define grpc_completion_queue_thread_local_cache_flush grpc_completion_queue_thread_local_cache_flush_import typedef grpc_alarm *(*grpc_alarm_create_type)(void *reserved); extern grpc_alarm_create_type grpc_alarm_create_import; #define grpc_alarm_create grpc_alarm_create_import diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index e6372a379ca..e4e4c9f1b2d 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -158,6 +158,80 @@ static void test_cq_end_op(void) { } } +static void test_cq_tls_cache_full(void) { + grpc_event ev; + grpc_completion_queue *cc; + grpc_cq_completion completion; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *tag = create_test_tag(); + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_full"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT(grpc_cq_begin_op(cc, tag)); + grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completion); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1); + GPR_ASSERT(res_tag == tag); + GPR_ASSERT(ok); + + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT); + + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +static void test_cq_tls_cache_empty(void) { + grpc_completion_queue *cc; + grpc_cq_polling_type polling_types[] = { + GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; + grpc_completion_queue_attributes attr; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; + void *res_tag; + int ok; + + LOG_TEST("test_cq_tls_cache_empty"); + + attr.version = 1; + attr.cq_completion_type = GRPC_CQ_NEXT; + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + attr.cq_polling_type = polling_types[i]; + cc = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attr), &attr, NULL); + + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + grpc_completion_queue_thread_local_cache_init(cc); + GPR_ASSERT( + grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0); + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } +} + static void test_shutdown_then_next_polling(void) { grpc_cq_polling_type polling_types[] = { GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING}; @@ -300,6 +374,8 @@ int main(int argc, char **argv) { test_cq_end_op(); test_pluck(); test_pluck_after_shutdown(); + test_cq_tls_cache_full(); + test_cq_tls_cache_empty(); grpc_shutdown(); return 0; } diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index 2a33e8ae115..b7634d04381 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -99,7 +99,7 @@ class PollingOverrider { class Verifier { public: - explicit Verifier(bool spin) : spin_(spin) {} + explicit Verifier(bool spin) : spin_(spin), lambda_run_(false) {} // Expect sets the expected ok value for a specific tag Verifier& Expect(int i, bool expect_ok) { return ExpectUnless(i, expect_ok, false); @@ -142,6 +142,18 @@ class Verifier { return detag(got_tag); } + template + CompletionQueue::NextStatus DoOnceThenAsyncNext( + CompletionQueue* cq, void** got_tag, bool* ok, T deadline, + std::function lambda) { + if (lambda_run_) { + return cq->AsyncNext(got_tag, ok, deadline); + } else { + lambda_run_ = true; + return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline); + } + } + // Verify keeps calling Next until all currently set // expected tags are complete void Verify(CompletionQueue* cq) { Verify(cq, false); } @@ -154,6 +166,7 @@ class Verifier { Next(cq, ignore_ok); } } + // This version of Verify stops after a certain deadline void Verify(CompletionQueue* cq, std::chrono::system_clock::time_point deadline) { @@ -193,6 +206,47 @@ class Verifier { } } + // This version of Verify stops after a certain deadline, and uses the + // DoThenAsyncNext API + // to call the lambda + void Verify(CompletionQueue* cq, + std::chrono::system_clock::time_point deadline, + std::function lambda) { + if (expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + while (std::chrono::system_clock::now() < deadline) { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::TIMEOUT); + } + } else { + while (!expectations_.empty()) { + bool ok; + void* got_tag; + if (spin_) { + for (;;) { + GPR_ASSERT(std::chrono::system_clock::now() < deadline); + auto r = DoOnceThenAsyncNext( + cq, &got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME), lambda); + if (r == CompletionQueue::TIMEOUT) continue; + if (r == CompletionQueue::GOT_EVENT) break; + gpr_log(GPR_ERROR, "unexpected result from AsyncNext"); + abort(); + } + } else { + EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda), + CompletionQueue::GOT_EVENT); + } + GotTag(got_tag, ok, false); + } + } + } + private: void GotTag(void* got_tag, bool ok, bool ignore_ok) { auto it = expectations_.find(got_tag); @@ -226,6 +280,7 @@ class Verifier { std::map expectations_; std::map maybe_expectations_; bool spin_; + bool lambda_run_; }; bool plugin_has_sync_methods(std::unique_ptr& plugin) { @@ -490,6 +545,60 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) { EXPECT_TRUE(recv_status.ok()); } +// Test a simple RPC using the async version of Next +TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) { + ResetStub(); + + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter response_writer(&srv_ctx); + + send_request.set_message(GetParam().message_content); + std::unique_ptr> response_reader( + stub_->AsyncEcho(&cli_ctx, send_request, cq_.get())); + + std::chrono::system_clock::time_point time_now( + std::chrono::system_clock::now()); + std::chrono::system_clock::time_point time_limit( + std::chrono::system_clock::now() + std::chrono::seconds(10)); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now); + + auto resp_writer_ptr = &response_writer; + auto lambda_2 = [&, this, resp_writer_ptr]() { + gpr_log(GPR_ERROR, "CALLED"); + service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(), + cq_.get(), tag(2)); + }; + + Verifier(GetParam().disable_blocking) + .Expect(2, true) + .Verify(cq_.get(), time_limit, lambda_2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + auto recv_resp_ptr = &recv_response; + auto status_ptr = &recv_status; + send_response.set_message(recv_request.message()); + auto lambda_3 = [&, this, resp_writer_ptr, send_response]() { + resp_writer_ptr->Finish(send_response, Status::OK, tag(3)); + }; + response_reader->Finish(recv_resp_ptr, status_ptr, tag(4)); + Verifier(GetParam().disable_blocking) + .Expect(3, true) + .Expect(4, true) + .Verify(cq_.get(), std::chrono::system_clock::time_point::max(), + lambda_3); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); +} + // Two pings and a final pong. TEST_P(AsyncEnd2endTest, SimpleClientStreaming) { ResetStub(); diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h index abf755b3935..9888c762f2c 100644 --- a/test/cpp/qps/client.h +++ b/test/cpp/qps/client.h @@ -226,8 +226,6 @@ class Client { } virtual void DestroyMultithreading() = 0; - virtual void InitThreadFunc(size_t thread_idx) = 0; - virtual bool ThreadFunc(HistogramEntry* histogram, size_t thread_idx) = 0; void SetupLoadTest(const ClientConfig& config, size_t num_threads) { // Set up the load distribution based on the number of threads @@ -275,7 +273,6 @@ class Client { : std::bind(&Client::NextIssueTime, this, thread_idx); } - private: class Thread { public: Thread(Client* client, size_t idx) @@ -295,6 +292,16 @@ class Client { MergeStatusHistogram(statuses_, s); } + void UpdateHistogram(HistogramEntry* entry) { + std::lock_guard g(mu_); + if (entry->value_used()) { + histogram_.Add(entry->value()); + } + if (entry->status_used()) { + statuses_[entry->status()]++; + } + } + private: Thread(const Thread&); Thread& operator=(const Thread&); @@ -310,29 +317,8 @@ class Client { wait_loop++; } - client_->InitThreadFunc(idx_); - - for (;;) { - // run the loop body - HistogramEntry entry; - const bool thread_still_ok = client_->ThreadFunc(&entry, idx_); - // lock, update histogram if needed and see if we're done - std::lock_guard g(mu_); - if (entry.value_used()) { - histogram_.Add(entry.value()); - } - if (entry.status_used()) { - statuses_[entry.status()]++; - } - if (!thread_still_ok) { - gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); - } - if (!thread_still_ok || - static_cast(gpr_atm_acq_load(&client_->thread_pool_done_))) { - client_->CompleteThread(); - return; - } - } + client_->ThreadFunc(idx_, this); + client_->CompleteThread(); } std::mutex mu_; @@ -343,6 +329,12 @@ class Client { std::thread impl_; }; + bool ThreadCompleted() { + return static_cast(gpr_atm_acq_load(&thread_pool_done_)); + } + + virtual void ThreadFunc(size_t thread_idx, Client::Thread* t) = 0; + std::vector> threads_; std::unique_ptr timer_; diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc index 9ed4e0b3552..b5c7208664c 100644 --- a/test/cpp/qps/client_async.cc +++ b/test/cpp/qps/client_async.cc @@ -236,33 +236,47 @@ class AsyncClient : public ClientImpl { this->EndThreads(); // this needed for resolution } - void InitThreadFunc(size_t thread_idx) override final {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override final { + void ThreadFunc(size_t thread_idx, Client::Thread* t) override final { void* got_tag; bool ok; - if (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + HistogramEntry entry; + HistogramEntry* entry_ptr = &entry; + if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ClientRpcContext* ctx; + std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex; + do { + t->UpdateHistogram(entry_ptr); // Got a regular event, so process it - ClientRpcContext* ctx = ClientRpcContext::detag(got_tag); + ctx = ClientRpcContext::detag(got_tag); // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard l(shutdown_state_[thread_idx]->mutex); + shutdown_mu->lock(); if (shutdown_state_[thread_idx]->shutdown) { ctx->TryCancel(); delete ctx; - return true; - } - if (!ctx->RunNextState(ok, entry)) { - // The RPC and callback are done, so clone the ctx - // and kickstart the new one - ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); - delete ctx; + while (cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + ctx = ClientRpcContext::detag(got_tag); + ctx->TryCancel(); + delete ctx; + } + shutdown_mu->unlock(); + return; } - return true; - } else { - // queue is shutting down, so we must be done - return true; - } + } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, entry_ptr, shutdown_mu]() { + bool next_ok = ok; + if (!ctx->RunNextState(next_ok, entry_ptr)) { + // The RPC and callback are done, so clone the ctx + // and kickstart the new one + ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get()); + delete ctx; + } + shutdown_mu->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } std::vector> cli_cqs_; diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 94554a46b20..9f20b148eb3 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -62,6 +62,25 @@ class SynchronousClient virtual ~SynchronousClient(){}; + virtual void InitThreadFuncImpl(size_t thread_idx) = 0; + virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0; + + void ThreadFunc(size_t thread_idx, Thread* t) override { + InitThreadFuncImpl(thread_idx); + for (;;) { + // run the loop body + HistogramEntry entry; + const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx); + t->UpdateHistogram(&entry); + if (!thread_still_ok) { + gpr_log(GPR_ERROR, "Finishing client thread due to RPC error"); + } + if (!thread_still_ok || ThreadCompleted()) { + return; + } + } + } + protected: // WaitToIssue returns false if we realize that we need to break out bool WaitToIssue(int thread_idx) { @@ -103,9 +122,9 @@ class SynchronousUnaryClient final : public SynchronousClient { } ~SynchronousUnaryClient() {} - void InitThreadFunc(size_t thread_idx) override {} + void InitThreadFuncImpl(size_t thread_idx) override {} - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -192,13 +211,13 @@ class SynchronousStreamingPingPongClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]); messages_issued_[thread_idx] = 0; } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { if (!WaitToIssue(thread_idx)) { return true; } @@ -246,14 +265,14 @@ class SynchronousStreamingFromClientClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx], &responses_[thread_idx]); last_issue_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // Figure out how to make histogram sensible if this is rate-paced if (!WaitToIssue(thread_idx)) { return true; @@ -282,13 +301,13 @@ class SynchronousStreamingFromServerClient final public: SynchronousStreamingFromServerClient(const ClientConfig& config) : SynchronousStreamingClient(config), last_recv_(num_threads_) {} - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingFromServer(&context_[thread_idx], request_); last_recv_[thread_idx] = UsageTimer::Now(); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0); if (stream_[thread_idx]->Read(&responses_[thread_idx])) { double now = UsageTimer::Now(); @@ -328,11 +347,11 @@ class SynchronousStreamingBothWaysClient final } } - void InitThreadFunc(size_t thread_idx) override { + void InitThreadFuncImpl(size_t thread_idx) override { auto* stub = channels_[thread_idx % channels_.size()].get_stub(); stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]); } - bool ThreadFunc(HistogramEntry* entry, size_t thread_idx) override { + bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override { // TODO (vjpai): Do this return true; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 4a82f981991..c1097cb8eeb 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -194,23 +194,32 @@ class AsyncQpsServerTest final : public grpc::testing::Server { // Wait until work is available or we are shutting down bool ok; void *got_tag; - while (srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { - ServerRpcContext *ctx = detag(got_tag); + if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) { + return; + } + ServerRpcContext *ctx; + std::mutex *mu_ptr; + do { + ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke // Proceed while holding a lock to make sure that // this thread isn't supposed to shut down - std::lock_guard l(shutdown_state_[thread_idx]->mutex); + mu_ptr = &shutdown_state_[thread_idx]->mutex; + mu_ptr->lock(); if (shutdown_state_[thread_idx]->shutdown) { + mu_ptr->unlock(); return; } - std::lock_guard l2(*ctx); - const bool still_going = ctx->RunNextState(ok); - // if this RPC context is done, refresh it - if (!still_going) { - ctx->Reset(); - } - } - return; + } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext( + [&, ctx, ok, mu_ptr]() { + ctx->lock(); + if (!ctx->RunNextState(ok)) { + ctx->Reset(); + } + ctx->unlock(); + mu_ptr->unlock(); + }, + &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))); } class ServerRpcContext { From 903981adb6509789f5d9b62c2b1dc7768f6df14d Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 25 Oct 2017 10:46:35 -0700 Subject: [PATCH 26/27] Catch out of order error code --- src/cpp/util/error_details.cc | 3 ++- test/cpp/util/error_details_test.cc | 20 +++++++++++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/cpp/util/error_details.cc b/src/cpp/util/error_details.cc index 44bc4d16485..f06b475683a 100644 --- a/src/cpp/util/error_details.cc +++ b/src/cpp/util/error_details.cc @@ -37,7 +37,8 @@ Status SetErrorDetails(const ::google::rpc::Status& from, Status* to) { return Status(StatusCode::FAILED_PRECONDITION, ""); } StatusCode code = StatusCode::UNKNOWN; - if (from.code() >= StatusCode::OK && from.code() <= StatusCode::DATA_LOSS) { + if (from.code() >= StatusCode::OK && + from.code() <= StatusCode::UNAUTHENTICATED) { code = static_cast(from.code()); } *to = Status(code, from.message(), from.SerializeAsString()); diff --git a/test/cpp/util/error_details_test.cc b/test/cpp/util/error_details_test.cc index 69a6876a3f9..16a00fb201c 100644 --- a/test/cpp/util/error_details_test.cc +++ b/test/cpp/util/error_details_test.cc @@ -82,7 +82,7 @@ TEST(SetTest, NullInput) { TEST(SetTest, OutOfScopeErrorCode) { google::rpc::Status expected; - expected.set_code(20); // Out of scope (DATA_LOSS is 15). + expected.set_code(17); // Out of scope (UNAUTHENTICATED is 16). expected.set_message("I am an error message"); testing::EchoRequest expected_details; expected_details.set_message(grpc::string(100, '\0')); @@ -96,6 +96,24 @@ TEST(SetTest, OutOfScopeErrorCode) { EXPECT_EQ(expected.SerializeAsString(), to.error_details()); } +TEST(SetTest, ValidScopeErrorCode) { + for (int c = StatusCode::OK; c <= StatusCode::UNAUTHENTICATED; c++) { + google::rpc::Status expected; + expected.set_code(c); + expected.set_message("I am an error message"); + testing::EchoRequest expected_details; + expected_details.set_message(grpc::string(100, '\0')); + expected.add_details()->PackFrom(expected_details); + + Status to; + Status s = SetErrorDetails(expected, &to); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(c, to.error_code()); + EXPECT_EQ(expected.message(), to.error_message()); + EXPECT_EQ(expected.SerializeAsString(), to.error_details()); + } +} + } // namespace } // namespace grpc From 01f0733618170bd3a375a51c7ce3d10897898838 Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Wed, 25 Oct 2017 13:18:42 -0700 Subject: [PATCH 27/27] Increase sanity build timeout --- tools/internal_ci/linux/grpc_sanity.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/internal_ci/linux/grpc_sanity.cfg b/tools/internal_ci/linux/grpc_sanity.cfg index 24e7984f3a5..e06a2f42410 100644 --- a/tools/internal_ci/linux/grpc_sanity.cfg +++ b/tools/internal_ci/linux/grpc_sanity.cfg @@ -16,7 +16,7 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/linux/grpc_run_tests_matrix.sh" -timeout_mins: 20 +timeout_mins: 40 action { define_artifacts { regex: "**/*sponge_log.xml"