diff --git a/BUILD b/BUILD index f049e3c4056..42a35a310d6 100644 --- a/BUILD +++ b/BUILD @@ -467,6 +467,7 @@ cc_library( "src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c", "src/core/ext/transport/chttp2/client/insecure/channel_create.c", "src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c", + "src/core/ext/lb_policy/grpclb/grpclb.c", "src/core/ext/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", "src/core/ext/lb_policy/pick_first/pick_first.c", @@ -1141,6 +1142,7 @@ cc_library( "src/core/ext/resolver/sockaddr/sockaddr_resolver.c", "src/core/ext/load_reporting/load_reporting.c", "src/core/ext/load_reporting/load_reporting_filter.c", + "src/core/ext/lb_policy/grpclb/grpclb.c", "src/core/ext/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", "src/core/ext/lb_policy/pick_first/pick_first.c", @@ -1943,6 +1945,7 @@ objc_library( "src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c", "src/core/ext/transport/chttp2/client/insecure/channel_create.c", "src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c", + "src/core/ext/lb_policy/grpclb/grpclb.c", "src/core/ext/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", "src/core/ext/lb_policy/pick_first/pick_first.c", diff --git a/Makefile b/Makefile index 6eccd06952c..e316d4b1533 100644 --- a/Makefile +++ b/Makefile @@ -946,6 +946,7 @@ grpc_jwt_verifier_test: $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test grpc_print_google_default_creds_token: $(BINDIR)/$(CONFIG)/grpc_print_google_default_creds_token grpc_security_connector_test: $(BINDIR)/$(CONFIG)/grpc_security_connector_test grpc_verify_jwt: $(BINDIR)/$(CONFIG)/grpc_verify_jwt +grpclb_test: $(BINDIR)/$(CONFIG)/grpclb_test hpack_parser_fuzzer_test: $(BINDIR)/$(CONFIG)/hpack_parser_fuzzer_test hpack_parser_test: $(BINDIR)/$(CONFIG)/hpack_parser_test hpack_table_test: $(BINDIR)/$(CONFIG)/hpack_table_test @@ -1277,6 +1278,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/grpc_json_token_test \ $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test \ $(BINDIR)/$(CONFIG)/grpc_security_connector_test \ + $(BINDIR)/$(CONFIG)/grpclb_test \ $(BINDIR)/$(CONFIG)/hpack_parser_test \ $(BINDIR)/$(CONFIG)/hpack_table_test \ $(BINDIR)/$(CONFIG)/http_parser_test \ @@ -1579,6 +1581,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/grpc_jwt_verifier_test || ( echo test grpc_jwt_verifier_test failed ; exit 1 ) $(E) "[RUN] Testing grpc_security_connector_test" $(Q) $(BINDIR)/$(CONFIG)/grpc_security_connector_test || ( echo test grpc_security_connector_test failed ; exit 1 ) + $(E) "[RUN] Testing grpclb_test" + $(Q) $(BINDIR)/$(CONFIG)/grpclb_test || ( echo test grpclb_test failed ; exit 1 ) $(E) "[RUN] Testing hpack_parser_test" $(Q) $(BINDIR)/$(CONFIG)/hpack_parser_test || ( echo test hpack_parser_test failed ; exit 1 ) $(E) "[RUN] Testing hpack_table_test" @@ -2662,6 +2666,7 @@ LIBGRPC_SRC = \ src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \ src/core/ext/transport/chttp2/client/insecure/channel_create.c \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \ + src/core/ext/lb_policy/grpclb/grpclb.c \ src/core/ext/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ @@ -3240,6 +3245,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/ext/resolver/sockaddr/sockaddr_resolver.c \ src/core/ext/load_reporting/load_reporting.c \ src/core/ext/load_reporting/load_reporting_filter.c \ + src/core/ext/lb_policy/grpclb/grpclb.c \ src/core/ext/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ @@ -8470,6 +8476,38 @@ endif endif +GRPCLB_TEST_SRC = \ + test/core/client_config/grpclb_test.c \ + +GRPCLB_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(GRPCLB_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/grpclb_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/grpclb_test: $(GRPCLB_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(GRPCLB_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/grpclb_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/client_config/grpclb_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_grpclb_test: $(GRPCLB_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(GRPCLB_TEST_OBJS:.o=.dep) +endif +endif + + HPACK_PARSER_FUZZER_TEST_SRC = \ test/core/transport/chttp2/hpack_parser_fuzzer_test.c \ diff --git a/binding.gyp b/binding.gyp index fc61a9d01fc..f6cb59b4115 100644 --- a/binding.gyp +++ b/binding.gyp @@ -722,6 +722,7 @@ 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/lb_policy/grpclb/grpclb.c', 'src/core/ext/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/build.yaml b/build.yaml index a83ebc595ad..d5d1b816a68 100644 --- a/build.yaml +++ b/build.yaml @@ -377,8 +377,10 @@ filegroups: - src/core/ext/lb_policy/grpclb/load_balancer_api.h - src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h src: + - src/core/ext/lb_policy/grpclb/grpclb.c - src/core/ext/lb_policy/grpclb/load_balancer_api.c - src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c + plugin: grpc_lb_policy_grpclb uses: - grpc_base - grpc_client_config @@ -796,6 +798,7 @@ libs: - grpc_lb_policy_grpclb - grpc_lb_policy_pick_first - grpc_lb_policy_round_robin + - grpc_lb_policy_grpclb - grpc_resolver_dns_native - grpc_resolver_sockaddr - grpc_load_reporting @@ -893,6 +896,7 @@ libs: - grpc_lb_policy_grpclb - grpc_lb_policy_pick_first - grpc_lb_policy_round_robin + - grpc_lb_policy_grpclb - census generate_plugin_registry: true secure: false @@ -1815,6 +1819,16 @@ targets: - grpc - gpr_test_util - gpr +- name: grpclb_test + build: test + language: c + src: + - test/core/client_config/grpclb_test.c + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr - name: hpack_parser_fuzzer_test build: fuzzer language: c diff --git a/config.m4 b/config.m4 index bf5aa244ff2..17dc3c86ec9 100644 --- a/config.m4 +++ b/config.m4 @@ -241,6 +241,7 @@ if test "$PHP_GRPC" != "no"; then src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \ src/core/ext/transport/chttp2/client/insecure/channel_create.c \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \ + src/core/ext/lb_policy/grpclb/grpclb.c \ src/core/ext/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ diff --git a/gRPC.podspec b/gRPC.podspec index d3665d51744..ab04e494a31 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -504,6 +504,7 @@ Pod::Spec.new do |s| 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/lb_policy/grpclb/grpclb.c', 'src/core/ext/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/grpc.gemspec b/grpc.gemspec index 9f3ae048a70..d30d7ae30e9 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -483,6 +483,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c ) s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create.c ) s.files += %w( src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c ) + s.files += %w( src/core/ext/lb_policy/grpclb/grpclb.c ) s.files += %w( src/core/ext/lb_policy/grpclb/load_balancer_api.c ) s.files += %w( src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c ) s.files += %w( third_party/nanopb/pb_common.c ) diff --git a/package.xml b/package.xml index b25e42c1c01..2c718608647 100644 --- a/package.xml +++ b/package.xml @@ -490,6 +490,7 @@ + diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c index 20535398d69..dc1612428ec 100644 --- a/src/core/ext/client_config/lb_policy.c +++ b/src/core/ext/client_config/lb_policy.c @@ -60,8 +60,9 @@ static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta); #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY: %p % 12s 0x%08x -> 0x%08x [%s]", c, purpose, old_val, - old_val + delta, reason); + "LB_POLICY: 0x%" PRIxPTR " %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR + " [%s]", + (intptr_t)c, purpose, old_val, old_val + delta, reason); #endif return old_val; } diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index 56fa11198b7..b07824ae1ba 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -73,15 +73,17 @@ struct grpc_lb_policy_vtable { void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_closure *closure); - /** try to enter a READY connectivity state */ + /** Try to enter a READY connectivity state */ void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - /** check the current connectivity of the lb_policy */ + /** Check the current connectivity */ grpc_connectivity_state (*check_connectivity)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** call notify when the connectivity state of a channel changes from *state. - Updates *state with the new state of the policy */ + Updates *state with the new state of the policy. Calling with a NULL \a + state cancels the subscription. + */ void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, @@ -124,7 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting \a target. - Picking can be asynchronous. Any IO should be done under \a pollset. */ + Picking can be asynchronous. Any IO should be done under \a pollent. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_polling_entity *pollent, grpc_metadata_batch *initial_metadata, @@ -146,8 +148,11 @@ void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq); +/** Try to enter a READY connectivity state */ void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +/* Call notify when the connectivity state of a channel changes from \a *state. + * Updates \a *state with the new state of the policy */ void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_connectivity_state *state, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c new file mode 100644 index 00000000000..cfa9669e0a2 --- /dev/null +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -0,0 +1,836 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/lb_policy/grpclb/grpclb.h" +#include "src/core/ext/client_config/client_channel_factory.h" +#include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/parse_address.h" +#include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/surface/call.h" +#include "src/core/lib/surface/channel.h" + +#include + +#include +#include +#include +#include +#include + +int grpc_lb_glb_trace = 0; + +typedef struct wrapped_rr_closure_arg { + grpc_closure *wrapped_closure; + grpc_lb_policy *rr_policy; +} wrapped_rr_closure_arg; + +/* The \a on_complete closure passed as part of the pick requires keeping a + * reference to its associated round robin instance. We wrap this closure in + * order to unref the round robin instance upon its invocation */ +static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + wrapped_rr_closure_arg *wc = arg; + + if (wc->rr_policy != NULL) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR %p", wc->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc->rr_policy, "wrapped_rr_closure"); + } + + if (wc->wrapped_closure != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, wc->wrapped_closure, success, NULL); + } + gpr_free(wc); +} + +typedef struct pending_pick { + struct pending_pick *next; + grpc_polling_entity *pollent; + grpc_metadata_batch *initial_metadata; + uint32_t initial_metadata_flags; + grpc_connected_subchannel **target; + grpc_closure *wrapped_on_complete; + wrapped_rr_closure_arg *wrapped_on_complete_arg; +} pending_pick; + +typedef struct pending_ping { + struct pending_ping *next; + grpc_closure *wrapped_notify; + wrapped_rr_closure_arg *wrapped_notify_arg; +} pending_ping; + +typedef struct glb_lb_policy glb_lb_policy; + +#define MAX_LBCD_OPS_LEN 6 +typedef struct lb_client_data { + gpr_mu mu; + grpc_closure md_sent; + grpc_closure md_rcvd; + grpc_closure req_sent; + grpc_closure res_rcvd; + grpc_closure close_sent; + grpc_closure srv_status_rcvd; + + grpc_call *c; + gpr_timespec deadline; + + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + + grpc_byte_buffer *request_payload; + grpc_byte_buffer *response_payload; + + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + + glb_lb_policy *p; +} lb_client_data; + +/* Keeps track and reacts to changes in connectivity of the RR instance */ +typedef struct rr_connectivity_data { + grpc_closure on_change; + grpc_connectivity_state state; + glb_lb_policy *p; +} rr_connectivity_data; + +struct glb_lb_policy { + /** base policy: must be first */ + grpc_lb_policy base; + + /** mutex protecting remaining members */ + gpr_mu mu; + + grpc_client_channel_factory *cc_factory; + + /** for communicating with the LB server */ + grpc_channel *lb_server_channel; + + /** the RR policy to use of the backend servers returned by the LB server */ + grpc_lb_policy *rr_policy; + + bool started_picking; + + /** our connectivity state tracker */ + grpc_connectivity_state_tracker state_tracker; + + grpc_grpclb_serverlist *serverlist; + + /** list of picks that are waiting on connectivity */ + pending_pick *pending_picks; + + /** list of pings that are waiting on connectivity */ + pending_ping *pending_pings; + + /** data associated with the communication with the LB server */ + lb_client_data *lbcd; + + /** for tracking of the RR connectivity */ + rr_connectivity_data *rr_connectivity; +}; + +static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p); +static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + bool iomgr_success) { + rr_connectivity_data *rrcd = arg; + if (!iomgr_success) { + gpr_free(rrcd); + return; + } + glb_lb_policy *p = rrcd->p; + const grpc_connectivity_state new_state = p->rr_connectivity->state; + if (new_state == GRPC_CHANNEL_SHUTDOWN && p->serverlist != NULL) { + /* a RR policy is shutting down but there's a serverlist available -> + * perform a handover */ + rr_handover(exec_ctx, p); + } else { + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, new_state, + "rr_connectivity_changed"); + /* resubscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, + &p->rr_connectivity->state, + &p->rr_connectivity->on_change); + } +} + +static void add_pending_pick(pending_pick **root, grpc_polling_entity *pollent, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { + pending_pick *pp = gpr_malloc(sizeof(*pp)); + memset(pp, 0, sizeof(pending_pick)); + pp->wrapped_on_complete_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); + pp->next = *root; + pp->pollent = pollent; + pp->target = target; + pp->initial_metadata = initial_metadata; + pp->initial_metadata_flags = initial_metadata_flags; + pp->wrapped_on_complete = + grpc_closure_create(wrapped_rr_closure, pp->wrapped_on_complete_arg); + pp->wrapped_on_complete_arg->wrapped_closure = on_complete; + *root = pp; +} + +static void add_pending_ping(pending_ping **root, grpc_closure *notify) { + pending_ping *pping = gpr_malloc(sizeof(*pping)); + memset(pping, 0, sizeof(pending_ping)); + pping->wrapped_notify_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + memset(pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); + pping->next = *root; + pping->wrapped_notify = + grpc_closure_create(wrapped_rr_closure, pping->wrapped_notify_arg); + pping->wrapped_notify_arg->wrapped_closure = notify; + *root = pping; +} + +static void lb_client_data_destroy(lb_client_data *lbcd); + +static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + lb_client_data *lbcd = arg; + GPR_ASSERT(lbcd->c); + grpc_call_error error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lbcd->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch_and_execute(exec_ctx, lbcd->c, ops, + (size_t)(op - ops), &lbcd->md_rcvd); + GPR_ASSERT(GRPC_CALL_OK == error); +} + +static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + lb_client_data *lbcd = arg; + GPR_ASSERT(lbcd->c); + grpc_call_error error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = lbcd->request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->req_sent); + GPR_ASSERT(GRPC_CALL_OK == error); +} + +static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + lb_client_data *lbcd = arg; + grpc_call_error error; + + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &lbcd->response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->res_rcvd); + GPR_ASSERT(GRPC_CALL_OK == error); +} + +static void res_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + /* look inside lbcd->response_payload, ideally to send it back as the + * serverlist. */ + lb_client_data *lbcd = arg; + grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + if (lbcd->response_payload) { + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, lbcd->response_payload); + gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc_byte_buffer_destroy(lbcd->response_payload); + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist) { + gpr_slice_unref(response_slice); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Serverlist with %zu servers received", + serverlist->num_servers); + } + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(lbcd->p->serverlist, serverlist)) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } else { + if (lbcd->p->serverlist != NULL) { + grpc_grpclb_destroy_serverlist(lbcd->p->serverlist); + } + lbcd->p->serverlist = serverlist; + } + } + if (lbcd->p->rr_policy == NULL) { + /* initial "handover", in this case from a null RR policy, meaning it'll + * just create the first one */ + rr_handover(exec_ctx, lbcd->p); + } else { + /* unref the RR policy, eventually leading to its substitution with a + * new one constructed from the received serverlist (see + * rr_connectivity_changed) */ + GRPC_LB_POLICY_UNREF(exec_ctx, lbcd->p->rr_policy, + "serverlist_received"); + } + + /* listen for a potential serverlist update */ + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &lbcd->response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + const grpc_call_error error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), + &lbcd->res_rcvd); /* loop */ + GPR_ASSERT(GRPC_CALL_OK == error); + return; + } else { + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", + gpr_dump_slice(response_slice, GPR_DUMP_ASCII)); + gpr_slice_unref(response_slice); + + /* Disconnect from server returning invalid response. */ + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + grpc_call_error error = grpc_call_start_batch_and_execute( + exec_ctx, lbcd->c, ops, (size_t)(op - ops), &lbcd->close_sent); + GPR_ASSERT(GRPC_CALL_OK == error); + } + } + /* empty payload: call cancelled by server. Cleanups happening in + * srv_status_rcvd_cb */ +} +static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Close from LB client sent. Waiting from server status now"); + } +} +static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + lb_client_data *lbcd = arg; + glb_lb_policy *p = lbcd->p; + if (grpc_lb_glb_trace) { + gpr_log( + GPR_INFO, + "status from lb server received. Status = %d, Details = '%s', Capaticy " + "= %zu", + lbcd->status, lbcd->status_details, lbcd->status_details_capacity); + } + + grpc_call_destroy(lbcd->c); + grpc_channel_destroy(lbcd->p->lb_server_channel); + lbcd->p->lb_server_channel = NULL; + lb_client_data_destroy(lbcd); + p->lbcd = NULL; +} + +static lb_client_data *lb_client_data_create(glb_lb_policy *p) { + lb_client_data *lbcd = gpr_malloc(sizeof(lb_client_data)); + memset(lbcd, 0, sizeof(lb_client_data)); + + gpr_mu_init(&lbcd->mu); + grpc_closure_init(&lbcd->md_sent, md_sent_cb, lbcd); + + grpc_closure_init(&lbcd->md_rcvd, md_recv_cb, lbcd); + grpc_closure_init(&lbcd->req_sent, req_sent_cb, lbcd); + grpc_closure_init(&lbcd->res_rcvd, res_rcvd_cb, lbcd); + grpc_closure_init(&lbcd->close_sent, close_sent_cb, lbcd); + grpc_closure_init(&lbcd->srv_status_rcvd, srv_status_rcvd_cb, lbcd); + + /* TODO(dgq): get the deadline from the client/user instead of fabricating + * one + * here. Make it a policy arg? */ + lbcd->deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(3, GPR_TIMESPAN)); + + lbcd->c = grpc_channel_create_pollset_set_call( + p->lb_server_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + p->base.interested_parties, "/BalanceLoad", + NULL, /* FIXME(dgq): which "host" value to use? */ + lbcd->deadline, NULL); + + grpc_metadata_array_init(&lbcd->initial_metadata_recv); + grpc_metadata_array_init(&lbcd->trailing_metadata_recv); + + grpc_grpclb_request *request = grpc_grpclb_request_create( + "load.balanced.service.name"); /* FIXME(dgq): get the name of the load + balanced service from above. */ + gpr_slice request_payload_slice = grpc_grpclb_request_encode(request); + lbcd->request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + gpr_slice_unref(request_payload_slice); + grpc_grpclb_request_destroy(request); + + lbcd->status_details = NULL; + lbcd->status_details_capacity = 0; + lbcd->p = p; + return lbcd; +} + +static void lb_client_data_destroy(lb_client_data *lbcd) { + grpc_metadata_array_destroy(&lbcd->initial_metadata_recv); + grpc_metadata_array_destroy(&lbcd->trailing_metadata_recv); + + grpc_byte_buffer_destroy(lbcd->request_payload); + + gpr_free(lbcd->status_details); + gpr_mu_destroy(&lbcd->mu); + gpr_free(lbcd); +} + +static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + GPR_ASSERT(p->pending_picks == NULL); + GPR_ASSERT(p->pending_pings == NULL); + grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); + if (p->serverlist != NULL) { + grpc_grpclb_destroy_serverlist(p->serverlist); + } + gpr_mu_destroy(&p->mu); + gpr_free(p); +} + +static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + pending_ping *pping = p->pending_pings; + p->pending_pings = NULL; + gpr_mu_unlock(&p->mu); + + while (pp != NULL) { + pending_pick *next = pp->next; + *pp->target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, true, NULL); + gpr_free(pp); + pp = next; + } + + while (pping != NULL) { + pending_ping *next = pping->next; + grpc_exec_ctx_enqueue(exec_ctx, pping->wrapped_notify, true, NULL); + pping = next; + } + + if (p->rr_policy) { + /* unsubscribe */ + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, NULL, + &p->rr_connectivity->on_change); + GRPC_LB_POLICY_UNREF(exec_ctx, p->rr_policy, "glb_shutdown"); + } + + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + GRPC_CHANNEL_SHUTDOWN, "glb_shutdown"); +} + +static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if (pp->target == target) { + grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, + p->base.interested_parties); + *target = NULL; + grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + +static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->lbcd != NULL) { + /* cancel the call to the load balancer service, if any */ + grpc_call_cancel(p->lbcd->c, NULL); + } + pending_pick *pp = p->pending_picks; + p->pending_picks = NULL; + while (pp != NULL) { + pending_pick *next = pp->next; + if ((pp->initial_metadata_flags & initial_metadata_flags_mask) == + initial_metadata_flags_eq) { + grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent, + p->base.interested_parties); + grpc_exec_ctx_enqueue(exec_ctx, pp->wrapped_on_complete, false, NULL); + gpr_free(pp); + } else { + pp->next = p->pending_picks; + p->pending_picks = pp; + } + pp = next; + } + gpr_mu_unlock(&p->mu); +} + +static void query_for_backends(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) { + GPR_ASSERT(p->lb_server_channel != NULL); + + p->lbcd = lb_client_data_create(p); + grpc_call_error error; + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch_and_execute( + exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->md_sent); + GPR_ASSERT(GRPC_CALL_OK == error); + + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &p->lbcd->trailing_metadata_recv; + op->data.recv_status_on_client.status = &p->lbcd->status; + op->data.recv_status_on_client.status_details = &p->lbcd->status_details; + op->data.recv_status_on_client.status_details_capacity = + &p->lbcd->status_details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch_and_execute( + exec_ctx, p->lbcd->c, ops, (size_t)(op - ops), &p->lbcd->srv_status_rcvd); + GPR_ASSERT(GRPC_CALL_OK == error); +} + +static grpc_lb_policy *create_rr(grpc_exec_ctx *exec_ctx, + const grpc_grpclb_serverlist *serverlist, + glb_lb_policy *p) { + /* TODO(dgq): support mixed ip version */ + GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); + char **host_ports = gpr_malloc(sizeof(char *) * serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + gpr_join_host_port(&host_ports[i], serverlist->servers[i]->ip_address, + serverlist->servers[i]->port); + } + + size_t uri_path_len; + char *concat_ipports = gpr_strjoin_sep( + (const char **)host_ports, serverlist->num_servers, ",", &uri_path_len); + + grpc_lb_policy_args args; + args.client_channel_factory = p->cc_factory; + args.addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + args.addresses->naddrs = serverlist->num_servers; + args.addresses->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * args.addresses->naddrs); + size_t out_addrs_idx = 0; + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_uri uri; + struct sockaddr_storage sa; + size_t sa_len; + uri.path = host_ports[i]; + if (parse_ipv4(&uri, &sa, &sa_len)) { /* TODO(dgq): add support for ipv6 */ + memcpy(args.addresses->addrs[out_addrs_idx].addr, &sa, sa_len); + args.addresses->addrs[out_addrs_idx].len = sa_len; + ++out_addrs_idx; + } else { + gpr_log(GPR_ERROR, "Invalid LB service address '%s', ignoring.", + host_ports[i]); + } + } + + grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); + + gpr_free(concat_ipports); + for (size_t i = 0; i < serverlist->num_servers; i++) { + gpr_free(host_ports[i]); + } + gpr_free(host_ports); + + gpr_free(args.addresses->addrs); + gpr_free(args.addresses); + + return rr; +} + +static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) { + p->started_picking = true; + query_for_backends(exec_ctx, p); +} + +static void rr_handover(grpc_exec_ctx *exec_ctx, glb_lb_policy *p) { + p->rr_policy = create_rr(exec_ctx, p->serverlist, p); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", + (intptr_t)p->rr_policy); + } + GPR_ASSERT(p->rr_policy != NULL); + p->rr_connectivity->state = + grpc_lb_policy_check_connectivity(exec_ctx, p->rr_policy); + grpc_lb_policy_notify_on_state_change(exec_ctx, p->rr_policy, + &p->rr_connectivity->state, + &p->rr_connectivity->on_change); + grpc_connectivity_state_set(exec_ctx, &p->state_tracker, + p->rr_connectivity->state, "rr_handover"); + grpc_lb_policy_exit_idle(exec_ctx, p->rr_policy); + + /* flush pending ops */ + pending_pick *pp; + while ((pp = p->pending_picks)) { + p->pending_picks = pp->next; + GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_pick"); + pp->wrapped_on_complete_arg->rr_policy = p->rr_policy; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%"PRIxPTR"", (intptr_t)p->rr_policy); + } + grpc_lb_policy_pick(exec_ctx, p->rr_policy, pp->pollent, + pp->initial_metadata, pp->initial_metadata_flags, + pp->target, pp->wrapped_on_complete); + gpr_free(pp); + } + + pending_ping *pping; + while ((pping = p->pending_pings)) { + p->pending_pings = pping->next; + GRPC_LB_POLICY_REF(p->rr_policy, "rr_handover_pending_ping"); + pping->wrapped_notify_arg->rr_policy = p->rr_policy; + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Pending ping about to PING from 0x%"PRIxPTR"", (intptr_t)p->rr_policy); + } + grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, pping->wrapped_notify); + gpr_free(pping); + } +} + +static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + gpr_mu_unlock(&p->mu); +} + +static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_polling_entity *pollent, + grpc_metadata_batch *initial_metadata, + uint32_t initial_metadata_flags, + grpc_connected_subchannel **target, + grpc_closure *on_complete) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + int r; + + if (p->rr_policy != NULL) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "about to PICK from 0x%"PRIxPTR"", (intptr_t)p->rr_policy); + } + GRPC_LB_POLICY_REF(p->rr_policy, "rr_pick"); + wrapped_rr_closure_arg *warg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); + warg->rr_policy = p->rr_policy; + warg->wrapped_closure = on_complete; + grpc_closure *wrapped_on_complete = + grpc_closure_create(wrapped_rr_closure, warg); + r = grpc_lb_policy_pick(exec_ctx, p->rr_policy, pollent, initial_metadata, + initial_metadata_flags, target, + wrapped_on_complete); + if (r != 0) { + /* the call to grpc_lb_policy_pick has been sychronous. Invoke a neutered + * wrapped closure */ + warg->wrapped_closure = NULL; + grpc_exec_ctx_enqueue(exec_ctx, wrapped_on_complete, false, NULL); + } + } else { + grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent, + p->base.interested_parties); + add_pending_pick(&p->pending_picks, pollent, initial_metadata, + initial_metadata_flags, target, on_complete); + + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + r = 0; + } + gpr_mu_unlock(&p->mu); + return r; +} + +static grpc_connectivity_state glb_check_connectivity(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol) { + glb_lb_policy *p = (glb_lb_policy *)pol; + grpc_connectivity_state st; + gpr_mu_lock(&p->mu); + st = grpc_connectivity_state_check(&p->state_tracker); + gpr_mu_unlock(&p->mu); + return st; +} + +static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + if (p->rr_policy) { + grpc_lb_policy_ping_one(exec_ctx, p->rr_policy, closure); + } else { + add_pending_ping(&p->pending_pings, closure); + if (!p->started_picking) { + start_picking(exec_ctx, p); + } + } + gpr_mu_unlock(&p->mu); +} + +static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { + glb_lb_policy *p = (glb_lb_policy *)pol; + gpr_mu_lock(&p->mu); + grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, + current, notify); + + gpr_mu_unlock(&p->mu); +} + +static const grpc_lb_policy_vtable glb_lb_policy_vtable = { + glb_destroy, glb_shutdown, glb_pick, + glb_cancel_pick, glb_cancel_picks, glb_ping_one, + glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change}; + +static void glb_factory_ref(grpc_lb_policy_factory *factory) {} + +static void glb_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, + grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { + glb_lb_policy *p = gpr_malloc(sizeof(*p)); + memset(p, 0, sizeof(*p)); + + /* all input addresses in args->addresses come from a resolver that claims + * they are LB services. + * + * Create a client channel over them to communicate with a LB service */ + p->cc_factory = args->client_channel_factory; + GPR_ASSERT(p->cc_factory != NULL); + if (args->addresses->naddrs == 0) { + return NULL; + } + + /* construct a target from the args->addresses, in the form + * ipvX://ip1:port1,ip2:port2,... + * TODO(dgq): support mixed ip version */ + char **addr_strs = gpr_malloc(sizeof(char *) * args->addresses->naddrs); + addr_strs[0] = + grpc_sockaddr_to_uri((const struct sockaddr *)&args->addresses->addrs[0]); + for (size_t i = 1; i < args->addresses->naddrs; i++) { + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[i], + (const struct sockaddr *)&args->addresses->addrs[i], + true) == 0); + } + size_t uri_path_len; + char *target_uri_str = gpr_strjoin_sep( + (const char **)addr_strs, args->addresses->naddrs, ",", &uri_path_len); + + /* will pick using pick_first */ + p->lb_server_channel = grpc_client_channel_factory_create_channel( + exec_ctx, p->cc_factory, target_uri_str, + GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, NULL); + + gpr_free(target_uri_str); + for (size_t i = 0; i < args->addresses->naddrs; i++) { + gpr_free(addr_strs[i]); + } + gpr_free(addr_strs); + + if (p->lb_server_channel == NULL) { + gpr_free(p); + return NULL; + } + + rr_connectivity_data *rr_connectivity = + gpr_malloc(sizeof(rr_connectivity_data)); + memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, rr_connectivity_changed, + rr_connectivity); + rr_connectivity->p = p; + p->rr_connectivity = rr_connectivity; + + grpc_lb_policy_init(&p->base, &glb_lb_policy_vtable); + gpr_mu_init(&p->mu); + grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + return &p->base; +} + +static const grpc_lb_policy_factory_vtable glb_factory_vtable = { + glb_factory_ref, glb_factory_unref, glb_create, "grpclb"}; + +static grpc_lb_policy_factory glb_lb_policy_factory = {&glb_factory_vtable}; + +grpc_lb_policy_factory *grpc_glb_lb_factory_create() { + return &glb_lb_policy_factory; +} + +/* Plugin registration */ + +void grpc_lb_policy_grpclb_init() { + grpc_register_lb_policy(grpc_glb_lb_factory_create()); + grpc_register_tracer("glb", &grpc_lb_glb_trace); +} + +void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/lb_policy/grpclb/grpclb.h b/src/core/ext/lb_policy/grpclb/grpclb.h new file mode 100644 index 00000000000..83552b4fa02 --- /dev/null +++ b/src/core/ext/lb_policy/grpclb/grpclb.h @@ -0,0 +1,44 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H +#define GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H + +#include "src/core/ext/client_config/lb_policy_factory.h" + +/** Returns a load balancing factory for the glb policy, which tries to connect + * to a load balancing server to decide the next successfully connected + * subchannel to pick. */ +grpc_lb_policy_factory *grpc_glb_lb_factory_create(); + +#endif /* GRPC_CORE_EXT_LB_POLICY_GRPCLB_GRPCLB_H */ diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 59b89997dd9..c3b3888e90b 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -56,6 +56,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, dec_arg->num_servers++; } else { /* second pass */ grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); + memset(server, 0, sizeof(grpc_grpclb_server)); GPR_ASSERT(dec_arg->num_servers > 0); if (dec_arg->i == 0) { /* first iteration of second pass */ dec_arg->servers = @@ -147,6 +148,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( } grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); + memset(sl, 0, sizeof(*sl)); sl->num_servers = arg.num_servers; sl->servers = arg.servers; if (res->server_list.has_expiration_interval) { @@ -167,6 +169,81 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { gpr_free(serverlist); } +grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( + const grpc_grpclb_serverlist *sl) { + grpc_grpclb_serverlist *copy = gpr_malloc(sizeof(grpc_grpclb_serverlist)); + memset(copy, 0, sizeof(grpc_grpclb_serverlist)); + copy->num_servers = sl->num_servers; + memcpy(©->expiration_interval, &sl->expiration_interval, + sizeof(grpc_grpclb_duration)); + copy->servers = gpr_malloc(sizeof(grpc_grpclb_server*) * sl->num_servers); + for (size_t i = 0; i < sl->num_servers; i++) { + copy->servers[i] = gpr_malloc(sizeof(grpc_grpclb_server)); + memcpy(copy->servers[i], sl->servers[i], sizeof(grpc_grpclb_server)); + } + return copy; +} + +bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, + const grpc_grpclb_serverlist *rhs) { + if ((lhs == NULL) || (rhs == NULL)) { + return false; + } + if (lhs->num_servers != rhs->num_servers) { + return false; + } + if (grpc_grpclb_duration_compare(&lhs->expiration_interval, + &rhs->expiration_interval) != 0) { + return false; + } + for (size_t i = 0; i < lhs->num_servers; i++) { + if (!grpc_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { + return false; + } + } + return true; +} + +bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs, + const grpc_grpclb_server *rhs) { + return memcmp(lhs, rhs, sizeof(grpc_grpclb_server)) == 0; +} + +int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, + const grpc_grpclb_duration *rhs) { + GPR_ASSERT(lhs && rhs); + if (lhs->has_seconds && rhs->has_seconds) { + if (lhs->seconds < rhs->seconds) { + return -1; + } + if (lhs->seconds > rhs->seconds) { + return 1; + } + goto compare_nanos; + } + if (lhs->has_seconds + rhs->has_seconds == 0) { + goto compare_nanos; + } + // only lhs or rhs have seconds + return rhs->has_seconds ? 1 : -1; + +compare_nanos: + if (lhs->has_nanos && rhs->has_nanos) { + if (lhs->nanos < rhs->nanos) { + return -1; + } + if (lhs->nanos > rhs->nanos) { + return 1; + } + return 0; + } + if (lhs->has_nanos + rhs->has_nanos == 0) { + return 0; + } + // only lhs or rhs have nanos + return rhs->has_nanos ? 1 : -1; +} + void grpc_grpclb_response_destroy(grpc_grpclb_response *response) { gpr_free(response); } diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 71b5616d0c8..cc5c4e93439 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -68,6 +68,11 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request); * grpc_grpclb_response */ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response); +/** Return a copy of \a sl. The caller is responsible for calling \a + * grpc_grpclb_destroy_serverlist on the returned copy. */ +grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( + const grpc_grpclb_serverlist *sl); + /** Destroy \a serverlist */ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); @@ -75,6 +80,17 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( gpr_slice encoded_response); +bool grpc_grpclb_serverlist_equals(const grpc_grpclb_serverlist *lhs, + const grpc_grpclb_serverlist *rhs); + +bool grpc_grpclb_server_equals(const grpc_grpclb_server *lhs, + const grpc_grpclb_server *rhs); + +/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal, + * < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */ +int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, + const grpc_grpclb_duration *rhs); + /** Destroy \a response */ void grpc_grpclb_response_destroy(grpc_grpclb_response *response); diff --git a/src/core/plugin_registry/grpc_plugin_registry.c b/src/core/plugin_registry/grpc_plugin_registry.c index 905cd59e23d..7a7a9ce477a 100644 --- a/src/core/plugin_registry/grpc_plugin_registry.c +++ b/src/core/plugin_registry/grpc_plugin_registry.c @@ -37,6 +37,8 @@ extern void grpc_chttp2_plugin_init(void); extern void grpc_chttp2_plugin_shutdown(void); extern void grpc_client_config_init(void); extern void grpc_client_config_shutdown(void); +extern void grpc_lb_policy_grpclb_init(void); +extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); extern void grpc_lb_policy_pick_first_shutdown(void); extern void grpc_lb_policy_round_robin_init(void); @@ -55,6 +57,8 @@ void grpc_register_built_in_plugins(void) { grpc_chttp2_plugin_shutdown); grpc_register_plugin(grpc_client_config_init, grpc_client_config_shutdown); + grpc_register_plugin(grpc_lb_policy_grpclb_init, + grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, grpc_lb_policy_pick_first_shutdown); grpc_register_plugin(grpc_lb_policy_round_robin_init, diff --git a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c index 79950787258..ad4ddf0ff48 100644 --- a/src/core/plugin_registry/grpc_unsecure_plugin_registry.c +++ b/src/core/plugin_registry/grpc_unsecure_plugin_registry.c @@ -43,6 +43,8 @@ extern void grpc_resolver_sockaddr_init(void); extern void grpc_resolver_sockaddr_shutdown(void); extern void grpc_load_reporting_plugin_init(void); extern void grpc_load_reporting_plugin_shutdown(void); +extern void grpc_lb_policy_grpclb_init(void); +extern void grpc_lb_policy_grpclb_shutdown(void); extern void grpc_lb_policy_pick_first_init(void); extern void grpc_lb_policy_pick_first_shutdown(void); extern void grpc_lb_policy_round_robin_init(void); @@ -61,6 +63,8 @@ void grpc_register_built_in_plugins(void) { grpc_resolver_sockaddr_shutdown); grpc_register_plugin(grpc_load_reporting_plugin_init, grpc_load_reporting_plugin_shutdown); + grpc_register_plugin(grpc_lb_policy_grpclb_init, + grpc_lb_policy_grpclb_shutdown); grpc_register_plugin(grpc_lb_policy_pick_first_init, grpc_lb_policy_pick_first_shutdown); grpc_register_plugin(grpc_lb_policy_round_robin_init, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2a5229434c9..5768403a040 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -235,6 +235,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c', + 'src/core/ext/lb_policy/grpclb/grpclb.c', 'src/core/ext/lb_policy/grpclb/load_balancer_api.c', 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c', 'third_party/nanopb/pb_common.c', diff --git a/test/core/client_config/grpclb_test.c b/test/core/client_config/grpclb_test.c new file mode 100644 index 00000000000..2080fefd1ae --- /dev/null +++ b/test/core/client_config/grpclb_test.c @@ -0,0 +1,670 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/core/ext/client_config/client_channel.h" +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/support/string.h" +#include "src/core/lib/support/tmpfile.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/server.h" +#include "test/core/end2end/cq_verifier.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#define NUM_BACKENDS 4 + +typedef struct client_fixture { + grpc_channel *client; + char *server_uri; + grpc_completion_queue *cq; +} client_fixture; + +typedef struct server_fixture { + grpc_server *server; + grpc_call *server_call; + grpc_completion_queue *cq; + char *servers_hostport; + int port; + gpr_thd_id tid; + int num_calls_serviced; +} server_fixture; + +typedef struct test_fixture { + server_fixture lb_server; + server_fixture lb_backends[NUM_BACKENDS]; + client_fixture client; + int lb_server_update_delay_ms; +} test_fixture; + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static void *tag(intptr_t t) { return (void *)t; } + +static gpr_slice build_response_payload_slice(const char *host, int *ports, + size_t nports) { + /* + server_list { + servers { + ip_address: "127.0.0.1" + port: ... + load_balance_token: "token..." + } + ... + } */ + char **hostports_vec = gpr_malloc(sizeof(char *) * nports); + for (size_t i = 0; i < nports; i++) { + gpr_join_host_port(&hostports_vec[i], "127.0.0.1", ports[i]); + } + char *hostports_str = + gpr_strjoin_sep((const char **)hostports_vec, nports, " ", NULL); + gpr_log(GPR_INFO, "generating response for %s", hostports_str); + + char *output_fname; + FILE *tmpfd = gpr_tmpfile("grpclb_test", &output_fname); + fclose(tmpfd); + char *cmdline; + gpr_asprintf(&cmdline, + "./tools/codegen/core/gen_grpclb_test_response.py --lb_proto " + "src/proto/grpc/lb/v1/load_balancer.proto %s " + "--output %s --quiet", + hostports_str, output_fname); + GPR_ASSERT(system(cmdline) == 0); + FILE *f = fopen(output_fname, "rb"); + fseek(f, 0, SEEK_END); + const size_t fsize = (size_t)ftell(f); + rewind(f); + + char *serialized_response = gpr_malloc(fsize); + GPR_ASSERT(fread(serialized_response, fsize, 1, f) == 1); + fclose(f); + gpr_free(output_fname); + gpr_free(cmdline); + + for (size_t i = 0; i < nports; i++) { + gpr_free(hostports_vec[i]); + } + gpr_free(hostports_vec); + gpr_free(hostports_str); + + const gpr_slice response_slice = + gpr_slice_from_copied_buffer(serialized_response, fsize); + gpr_free(serialized_response); + return response_slice; +} + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void sleep_ms(int delay_ms) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay_ms, GPR_TIMESPAN))); +} + +static void start_lb_server(server_fixture *sf, int *ports, size_t nports, + int update_delay_ms) { + grpc_call *s; + cq_verifier *cqv = cq_verifier_create(sf->cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_call_error error; + int was_cancelled = 2; + grpc_byte_buffer *request_payload_recv; + grpc_byte_buffer *response_payload; + + memset(ops, 0, sizeof(ops)); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + error = grpc_server_request_call(sf->server, &s, &call_details, + &request_metadata_recv, sf->cq, sf->cq, + tag(200)); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "LB Server[%s] up", sf->servers_hostport); + cq_expect_completion(cqv, tag(200), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport); + + /* receive request for backends */ + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(202), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(202), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport); + // TODO(dgq): validate request. + grpc_byte_buffer_destroy(request_payload_recv); + gpr_slice response_payload_slice; + for (int i = 0; i < 2; i++) { + if (i == 0) { + // First half of the ports. + response_payload_slice = + build_response_payload_slice("127.0.0.1", ports, nports / 2); + } else { + // Second half of the ports. + sleep_ms(update_delay_ms); + response_payload_slice = build_response_payload_slice( + "127.0.0.1", ports + (nports / 2), (nports + 1) / 2 /* ceil */); + } + + response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(203), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + cq_expect_completion(cqv, tag(203), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after SEND_MESSAGE, iter %d", + sf->servers_hostport, i); + + grpc_byte_buffer_destroy(response_payload); + gpr_slice_unref(response_payload_slice); + } + gpr_log(GPR_INFO, "LB Server[%s] shutting down", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(204), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(201), 1); + cq_expect_completion(cqv, tag(204), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "LB Server[%s] after tag 204. All done. LB server out", + sf->servers_hostport); + + grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); +} + +static void start_backend_server(server_fixture *sf) { + grpc_call *s; + cq_verifier *cqv; + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_call_error error; + int was_cancelled; + grpc_byte_buffer *request_payload_recv; + grpc_byte_buffer *response_payload; + grpc_event ev; + + while (true) { + memset(ops, 0, sizeof(ops)); + cqv = cq_verifier_create(sf->cq); + was_cancelled = 2; + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + error = grpc_server_request_call(sf->server, &s, &call_details, + &request_metadata_recv, sf->cq, sf->cq, + tag(100)); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport); + ev = grpc_completion_queue_next(sf->cq, n_seconds_time(60), NULL); + if (!ev.success) { + gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport); + cq_verifier_destroy(cqv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + return; + } + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + gpr_log(GPR_INFO, "Server[%s] after tag 100", sf->servers_hostport); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(101), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "Server[%s] after tag 101", sf->servers_hostport); + + bool exit = false; + gpr_slice response_payload_slice = + gpr_slice_from_copied_string("hello you"); + while (!exit) { + op = ops; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL); + if (ev.type == GRPC_OP_COMPLETE && ev.success) { + GPR_ASSERT(ev.tag = tag(102)); + if (request_payload_recv == NULL) { + exit = true; + gpr_log(GPR_INFO, + "Server[%s] recv \"close\" from client, exiting. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + } + } else { + gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + exit = true; + } + gpr_log(GPR_INFO, "Server[%s] after tag 102. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + + if (!exit) { + response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + error = + grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL); + if (ev.type == GRPC_OP_COMPLETE && ev.success) { + GPR_ASSERT(ev.tag = tag(103)); + } else { + gpr_log(GPR_INFO, "Server[%s] forced to shutdown. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + exit = true; + } + gpr_log(GPR_INFO, "Server[%s] after tag 103. Call #%d", + sf->servers_hostport, sf->num_calls_serviced); + grpc_byte_buffer_destroy(response_payload); + } + + grpc_byte_buffer_destroy(request_payload_recv); + } + ++sf->num_calls_serviced; + + gpr_log(GPR_INFO, "Server[%s] OUT OF THE LOOP", sf->servers_hostport); + gpr_slice_unref(response_payload_slice); + + op = ops; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "Backend server out a-ok"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(104), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(101), 1); + cq_expect_completion(cqv, tag(104), 1); + cq_verify(cqv); + gpr_log(GPR_INFO, "Server[%s] DONE. After servicing %d calls", + sf->servers_hostport, sf->num_calls_serviced); + + grpc_call_destroy(s); + cq_verifier_destroy(cqv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + } +} + +static void perform_request(client_fixture *cf) { + grpc_call *c; + cq_verifier *cqv = cq_verifier_create(cf->cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + grpc_byte_buffer *request_payload; + grpc_byte_buffer *response_payload_recv; + int i; + + memset(ops, 0, sizeof(ops)); + gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world"); + + c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS, + cf->cq, "/foo", "foo.test.google.fr:1234", + n_seconds_time(1000), NULL); + gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c); + GPR_ASSERT(c); + char *peer; + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + for (i = 0; i < 4; i++) { + request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); + + op = ops; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + peer = grpc_call_get_peer(c); + cq_expect_completion(cqv, tag(2), 1); + cq_verify(cqv); + gpr_free(peer); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(response_payload_recv); + } + + gpr_slice_unref(request_payload_slice); + + op = ops; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(3), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + cq_expect_completion(cqv, tag(1), 1); + cq_expect_completion(cqv, tag(3), 1); + cq_verify(cqv); + peer = grpc_call_get_peer(c); + gpr_log(GPR_INFO, "Client DONE WITH SERVER %s ", peer); + gpr_free(peer); + + grpc_call_destroy(c); + + cq_verify_empty_timeout(cqv, 1); + cq_verifier_destroy(cqv); + + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + gpr_free(details); +} + +static void setup_client(const char *server_hostport, client_fixture *cf) { + cf->cq = grpc_completion_queue_create(NULL); + cf->server_uri = gpr_strdup(server_hostport); + cf->client = grpc_insecure_channel_create(cf->server_uri, NULL, NULL); +} + +static void teardown_client(client_fixture *cf) { + grpc_completion_queue_shutdown(cf->cq); + drain_cq(cf->cq); + grpc_completion_queue_destroy(cf->cq); + cf->cq = NULL; + grpc_channel_destroy(cf->client); + cf->client = NULL; + gpr_free(cf->server_uri); +} + +static void setup_server(const char *host, server_fixture *sf) { + int assigned_port; + + sf->cq = grpc_completion_queue_create(NULL); + char *colon_idx = strchr(host, ':'); + if (colon_idx) { + char *port_str = colon_idx + 1; + sf->port = atoi(port_str); + sf->servers_hostport = gpr_strdup(host); + } else { + sf->port = grpc_pick_unused_port_or_die(); + gpr_join_host_port(&sf->servers_hostport, host, sf->port); + } + + sf->server = grpc_server_create(NULL, NULL); + grpc_server_register_completion_queue(sf->server, sf->cq, NULL); + GPR_ASSERT((assigned_port = grpc_server_add_insecure_http2_port( + sf->server, sf->servers_hostport)) > 0); + GPR_ASSERT(sf->port == assigned_port); + grpc_server_start(sf->server); +} + +static void teardown_server(server_fixture *sf) { + if (!sf->server) return; + + gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); + grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck( + sf->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(sf->server); + gpr_thd_join(sf->tid); + + sf->server = NULL; + grpc_completion_queue_shutdown(sf->cq); + drain_cq(sf->cq); + grpc_completion_queue_destroy(sf->cq); + + gpr_log(GPR_INFO, "Server[%s] bye bye", sf->servers_hostport); + gpr_free(sf->servers_hostport); +} + +static void fork_backend_server(void *arg) { + server_fixture *sf = arg; + start_backend_server(sf); +} + +static void fork_lb_server(void *arg) { + test_fixture *tf = arg; + int ports[NUM_BACKENDS]; + for (int i = 0; i < NUM_BACKENDS; i++) { + ports[i] = tf->lb_backends[i].port; + } + start_lb_server(&tf->lb_server, ports, NUM_BACKENDS, + tf->lb_server_update_delay_ms); +} + +static void setup_test_fixture(test_fixture *tf, + int lb_server_update_delay_ms) { + tf->lb_server_update_delay_ms = lb_server_update_delay_ms; + + gpr_thd_options options = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&options); + + for (int i = 0; i < NUM_BACKENDS; ++i) { + setup_server("127.0.0.1", &tf->lb_backends[i]); + gpr_thd_new(&tf->lb_backends[i].tid, fork_backend_server, + &tf->lb_backends[i], &options); + } + + setup_server("127.0.0.1", &tf->lb_server); + gpr_thd_new(&tf->lb_server.tid, fork_lb_server, &tf->lb_server, &options); + + char *server_uri; + gpr_asprintf(&server_uri, "ipv4:%s?lb_policy=grpclb&lb_enabled=1", + tf->lb_server.servers_hostport); + setup_client(server_uri, &tf->client); + gpr_free(server_uri); +} + +static void teardown_test_fixture(test_fixture *tf) { + teardown_client(&tf->client); + for (int i = 0; i < NUM_BACKENDS; ++i) { + teardown_server(&tf->lb_backends[i]); + } + teardown_server(&tf->lb_server); +} + +// The LB server will send two updates: batch 1 and batch 2. Each batch contains +// two addresses, both of a valid and running backend server. Batch 1 is readily +// available and provided as soon as the client establishes the streaming call. +// Batch 2 is sent after a delay of \a lb_server_update_delay_ms milliseconds. +static test_fixture test_update(int lb_server_update_delay_ms) { + test_fixture tf; + memset(&tf, 0, sizeof(tf)); + setup_test_fixture(&tf, lb_server_update_delay_ms); + perform_request( + &tf.client); // "consumes" 1st backend server of 1st serverlist + perform_request( + &tf.client); // "consumes" 2nd backend server of 1st serverlist + + perform_request( + &tf.client); // "consumes" 1st backend server of 2nd serverlist + perform_request( + &tf.client); // "consumes" 2nd backend server of 2nd serverlist + + teardown_test_fixture(&tf); + return tf; +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + + test_fixture tf_result; + // Clients take a bit over one second to complete a call (the last part of the + // call sleeps for 1 second while verifying the client's completion queue is + // empty). Therefore: + // + // If the LB server waits 800ms before sending an update, it will arrive + // before the first client request is done, skipping the second server from + // batch 1 altogether: the 2nd client request will go to the 1st server of + // batch 2 (ie, the third one out of the four total servers). + tf_result = test_update(800); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 0); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 2); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); + + // If the LB server waits 1500ms, the update arrives after having picked the + // 2nd server from batch 1 but before the next pick for the first server of + // batch 2. All server are used. + tf_result = test_update(1500); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 1); + + // If the LB server waits >= 2000ms, the update arrives after the first two + // request are done and the third pick is performed, which returns, in RR + // fashion, the 1st server of the 1st update. Therefore, the second server of + // batch 1 is hit twice, whereas the first server of batch 2 is never hit. + tf_result = test_update(2000); + GPR_ASSERT(tf_result.lb_backends[0].num_calls_serviced == 2); + GPR_ASSERT(tf_result.lb_backends[1].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[2].num_calls_serviced == 1); + GPR_ASSERT(tf_result.lb_backends[3].num_calls_serviced == 0); + + grpc_shutdown(); + return 0; +} diff --git a/test/core/end2end/cq_verifier.c b/test/core/end2end/cq_verifier.c index 8e9fa70b0ec..060e166e6eb 100644 --- a/test/core/end2end/cq_verifier.c +++ b/test/core/end2end/cq_verifier.c @@ -258,9 +258,10 @@ void cq_verify(cq_verifier *v) { gpr_strvec_destroy(&have_tags); } -void cq_verify_empty(cq_verifier *v) { - gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(1, GPR_TIMESPAN)); +void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec) { + gpr_timespec deadline = + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_seconds(timeout_sec, GPR_TIMESPAN)); grpc_event ev; GPR_ASSERT(v->expect.next == &v->expect && "expectation queue must be empty"); @@ -274,6 +275,8 @@ void cq_verify_empty(cq_verifier *v) { } } +void cq_verify_empty(cq_verifier *v) { cq_verify_empty_timeout(v, 1); } + static expectation *add(cq_verifier *v, grpc_completion_type type, void *tag) { expectation *e = gpr_malloc(sizeof(expectation)); e->type = type; diff --git a/test/core/end2end/cq_verifier.h b/test/core/end2end/cq_verifier.h index 8c9a85c2187..bf82468c9a2 100644 --- a/test/core/end2end/cq_verifier.h +++ b/test/core/end2end/cq_verifier.h @@ -55,6 +55,9 @@ void cq_verify(cq_verifier *v); /* ensure that the completion queue is empty */ void cq_verify_empty(cq_verifier *v); +/* ensure that the completion queue is empty, waiting up to \a timeout secs. */ +void cq_verify_empty_timeout(cq_verifier *v, int timeout_sec); + /* Various expectation matchers Any functions taking ... expect a NULL terminated list of key/value pairs (each pair using two parameter slots) of metadata that MUST be present in diff --git a/tools/codegen/core/gen_grpclb_test_response.py b/tools/codegen/core/gen_grpclb_test_response.py new file mode 100755 index 00000000000..4535efc738c --- /dev/null +++ b/tools/codegen/core/gen_grpclb_test_response.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python2.7 + +# Copyright 2015-2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function +import argparse +import subprocess +import sys +import os.path +import sys +import tempfile +import importlib + +# Example: tools/codegen/core/gen_grpclb_test_response.py \ +# --lb_proto src/proto/grpc/lb/v1/load_balancer.proto \ +# 127.0.0.1:1234 10.0.0.1:4321 + +# 1) Compile src/proto/grpc/lb/v1/load_balancer.proto to a temp location +parser = argparse.ArgumentParser() +parser.add_argument('--lb_proto', required=True) +parser.add_argument('-e', '--expiration_interval_secs', type=int) +parser.add_argument('-o', '--output') +parser.add_argument('-q', '--quiet', default=False, action='store_true') +parser.add_argument('ipports', nargs='+') +args = parser.parse_args() + +if not os.path.isfile(args.lb_proto): + print("ERROR: file '{}' cannot be accessed (not found, no permissions, etc.)" + .format(args.lb_proto), file=sys.stderr) + sys.exit(1) + +proto_dirname = os.path.dirname(args.lb_proto) +output_dir = tempfile.mkdtemp() + +protoc_cmd = 'protoc -I{} --python_out={} {}'.format( + proto_dirname, output_dir, args.lb_proto) + +with tempfile.TemporaryFile() as stderr_tmpfile: + if subprocess.call(protoc_cmd, stderr=stderr_tmpfile, shell=True) != 0: + stderr_tmpfile.seek(0) + print("ERROR: while running '{}': {}". + format(protoc_cmd, stderr_tmpfile.read())) + sys.exit(2) + +# 2) import the output .py file. +module_name = os.path.splitext(os.path.basename(args.lb_proto))[0] + '_pb2' +sys.path.append(output_dir) +pb_module = importlib.import_module(module_name) + +# 3) Generate! +lb_response = pb_module.LoadBalanceResponse() +if args.expiration_interval_secs: + lb_response.server_list.expiration_interval.seconds = \ + args.expiration_interval_secs + +for ipport in args.ipports: + ip, port = ipport.split(':') + server = lb_response.server_list.servers.add() + server.ip_address = ip + server.port = int(port) + server.load_balance_token = b'token{}'.format(port) + +serialized_bytes = lb_response.SerializeToString() +serialized_hex = ''.join('\\x{:02x}'.format(ord(c)) for c in serialized_bytes) +if args.output: + with open(args.output, 'w') as f: + f.write(serialized_bytes) +if not args.quiet: + print(str(lb_response)) + print(serialized_hex) diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index aee866adf46..62ae97a9a3c 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1100,6 +1100,7 @@ src/core/ext/transport/chttp2/server/insecure/server_chttp2.c \ src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c \ src/core/ext/transport/chttp2/client/insecure/channel_create.c \ src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c \ +src/core/ext/lb_policy/grpclb/grpclb.c \ src/core/ext/lb_policy/grpclb/load_balancer_api.c \ src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c \ third_party/nanopb/pb_common.c \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 4aad52c69d0..b2b0d5bf997 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -944,6 +944,22 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "language": "c", + "name": "grpclb_test", + "src": [ + "test/core/client_config/grpclb_test.c" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -4152,6 +4168,7 @@ "gpr", "grpc_base", "grpc_lb_policy_grpclb", + "grpc_lb_policy_grpclb", "grpc_lb_policy_pick_first", "grpc_lb_policy_round_robin", "grpc_load_reporting", @@ -4246,6 +4263,7 @@ "gpr", "grpc_base", "grpc_lb_policy_grpclb", + "grpc_lb_policy_grpclb", "grpc_lb_policy_pick_first", "grpc_lb_policy_round_robin", "grpc_load_reporting", @@ -6048,6 +6066,7 @@ "language": "c", "name": "grpc_lb_policy_grpclb", "src": [ + "src/core/ext/lb_policy/grpclb/grpclb.c", "src/core/ext/lb_policy/grpclb/load_balancer_api.c", "src/core/ext/lb_policy/grpclb/load_balancer_api.h", "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 5a84a41b638..bf439b43e9c 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -1079,6 +1079,27 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": false, + "language": "c", + "name": "grpclb_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, { "args": [], "ci_platforms": [ diff --git a/vsprojects/buildtests_c.sln b/vsprojects/buildtests_c.sln index 3a214a4ad41..070f6559b7b 100644 --- a/vsprojects/buildtests_c.sln +++ b/vsprojects/buildtests_c.sln @@ -643,6 +643,17 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "grpc_verify_jwt", "vcxproj\ {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} EndProjectSection EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "grpclb_test", "vcxproj\test\grpclb_test\grpclb_test.vcxproj", "{9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}" + ProjectSection(myProperties) = preProject + lib = "False" + EndProjectSection + ProjectSection(ProjectDependencies) = postProject + {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} = {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} + {29D16885-7228-4C31-81ED-5F9187C7F2A9} = {29D16885-7228-4C31-81ED-5F9187C7F2A9} + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} = {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} = {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + EndProjectSection +EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "h2_census_nosec_test", "vcxproj\test/end2end/fixtures\h2_census_nosec_test\h2_census_nosec_test.vcxproj", "{A8039D43-910E-4248-2A22-74366E8C4DCD}" ProjectSection(myProperties) = preProject lib = "False" @@ -2455,6 +2466,22 @@ Global {02FAC25F-5FF6-34A0-00AE-B82BFBA851A9}.Release-DLL|Win32.Build.0 = Release|Win32 {02FAC25F-5FF6-34A0-00AE-B82BFBA851A9}.Release-DLL|x64.ActiveCfg = Release|x64 {02FAC25F-5FF6-34A0-00AE-B82BFBA851A9}.Release-DLL|x64.Build.0 = Release|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug|Win32.ActiveCfg = Debug|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug|x64.ActiveCfg = Debug|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release|Win32.ActiveCfg = Release|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release|x64.ActiveCfg = Release|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug|Win32.Build.0 = Debug|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug|x64.Build.0 = Debug|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release|Win32.Build.0 = Release|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release|x64.Build.0 = Release|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug-DLL|Win32.ActiveCfg = Debug|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug-DLL|Win32.Build.0 = Debug|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug-DLL|x64.ActiveCfg = Debug|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Debug-DLL|x64.Build.0 = Debug|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release-DLL|Win32.ActiveCfg = Release|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release-DLL|Win32.Build.0 = Release|Win32 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release-DLL|x64.ActiveCfg = Release|x64 + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB}.Release-DLL|x64.Build.0 = Release|x64 {A8039D43-910E-4248-2A22-74366E8C4DCD}.Debug|Win32.ActiveCfg = Debug|Win32 {A8039D43-910E-4248-2A22-74366E8C4DCD}.Debug|x64.ActiveCfg = Debug|x64 {A8039D43-910E-4248-2A22-74366E8C4DCD}.Release|Win32.ActiveCfg = Release|Win32 diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index ec0c984f2e7..2473525a97f 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -770,6 +770,8 @@ + + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 273f73b2a30..28c6483cf2a 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -478,6 +478,9 @@ src\core\ext\transport\chttp2\client\insecure + + src\core\ext\lb_policy\grpclb + src\core\ext\lb_policy\grpclb diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index bba099d803c..6e1b3ad05aa 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -688,6 +688,8 @@ + + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 78a8777f942..ce599a6e66f 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -406,6 +406,9 @@ src\core\ext\load_reporting + + src\core\ext\lb_policy\grpclb + src\core\ext\lb_policy\grpclb diff --git a/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj b/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj new file mode 100644 index 00000000000..7ea8e06d74a --- /dev/null +++ b/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj @@ -0,0 +1,199 @@ + + + + + + Debug + Win32 + + + Debug + x64 + + + Release + Win32 + + + Release + x64 + + + + {9D6FFE17-ABF0-A851-268E-9E3E8C573CBB} + true + $(SolutionDir)IntDir\$(MSBuildProjectName)\ + + + + v100 + + + v110 + + + v120 + + + v140 + + + Application + true + Unicode + + + Application + false + true + Unicode + + + + + + + + + + + + + + grpclb_test + static + Debug + static + Debug + + + grpclb_test + static + Release + static + Release + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + + + + + {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} + + + {29D16885-7228-4C31-81ED-5F9187C7F2A9} + + + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + + + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + + + + + + + + + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + + + + + diff --git a/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj.filters b/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj.filters new file mode 100644 index 00000000000..7a70c0b5e1c --- /dev/null +++ b/vsprojects/vcxproj/test/grpclb_test/grpclb_test.vcxproj.filters @@ -0,0 +1,21 @@ + + + + + test\core\client_config + + + + + + {e0ba55a2-37d9-5029-6f4e-64f097307340} + + + {6d1d02a2-d635-142d-16d4-45bd59f5c83a} + + + {356b8663-f1cf-f0df-39d0-d2de958699d0} + + + +