diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index af913d8a9df..1ef475d086e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -546,7 +546,6 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, *target = NULL; grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -576,7 +575,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, exec_ctx, pp->pollent, glb_policy->base.interested_parties); grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete, GRPC_ERROR_CANCELLED, NULL); - gpr_free(pp); } else { pp->next = glb_policy->pending_picks; glb_policy->pending_picks = pp; @@ -702,9 +700,6 @@ typedef struct lb_client_data { /* called once initial metadata's been sent */ grpc_closure md_sent; - /* called once initial metadata's been received */ - grpc_closure md_rcvd; - /* called once the LoadBalanceRequest has been sent to the LB server. See * src/proto/grpc/.../load_balancer.proto */ grpc_closure req_sent; @@ -741,7 +736,6 @@ typedef struct lb_client_data { } lb_client_data; static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, @@ -756,7 +750,6 @@ static lb_client_data *lb_client_data_create(glb_lb_policy *glb_policy) { gpr_mu_init(&lb_client->mu); grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - grpc_closure_init(&lb_client->md_rcvd, md_recv_cb, lb_client); grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); @@ -855,23 +848,6 @@ static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op ops[1]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->md_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message = lb_client->request_payload; @@ -886,11 +862,18 @@ static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_client_data *lb_client = arg; + GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; op->data.recv_message = &lb_client->response_payload; op->flags = 0; @@ -909,8 +892,7 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_op *op = ops; if (lb_client->response_payload != NULL) { /* Received data from the LB server. Look inside - * lb_client->response_payload, for - * a serverlist. */ + * lb_client->response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); gpr_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index b2fdce2963d..4720ddcdf38 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -39,6 +39,7 @@ extern "C" { #include +#include #include #include #include @@ -181,20 +182,9 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, cq_verify(cqv); gpr_log(GPR_INFO, "LB Server[%s] after tag 200", sf->servers_hostport); - op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = 0; - op->reserved = NULL; - op++; - op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; - op->data.recv_close_on_server.cancelled = &was_cancelled; - op->flags = 0; - op->reserved = NULL; - op++; - error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL); - GPR_ASSERT(GRPC_CALL_OK == error); - gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport); + // make sure we've received the initial metadata from the grpclb request. + GPR_ASSERT(request_metadata_recv.count > 0); + GPR_ASSERT(request_metadata_recv.metadata != NULL); // receive request for backends op = ops; @@ -208,9 +198,36 @@ static void start_lb_server(server_fixture *sf, int *ports, size_t nports, cq_expect_completion(cqv, tag(202), 1); cq_verify(cqv); gpr_log(GPR_INFO, "LB Server[%s] after RECV_MSG", sf->servers_hostport); - // TODO(dgq): validate request. + + // validate initial request. + grpc_byte_buffer_reader bbr; + grpc_byte_buffer_reader_init(&bbr, request_payload_recv); + gpr_slice request_payload_slice = grpc_byte_buffer_reader_readall(&bbr); + grpc::lb::v1::LoadBalanceRequest request; + request.ParseFromArray(GPR_SLICE_START_PTR(request_payload_slice), + GPR_SLICE_LENGTH(request_payload_slice)); + GPR_ASSERT(request.has_initial_request()); + GPR_ASSERT(request.initial_request().name() == "load.balanced.service.name"); + gpr_slice_unref(request_payload_slice); + grpc_byte_buffer_reader_destroy(&bbr); grpc_byte_buffer_destroy(request_payload_recv); + gpr_slice response_payload_slice; + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(201), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + gpr_log(GPR_INFO, "LB Server[%s] after tag 201", sf->servers_hostport); + for (int i = 0; i < 2; i++) { if (i == 0) { // First half of the ports.