diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index f1e9ecc558c..382aa287cd3 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -724,7 +724,6 @@ typedef struct lb_client_data { static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void req_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -823,6 +822,23 @@ static void query_for_backends(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + lb_client_data *lb_client = arg; + GPR_ASSERT(lb_client->lb_call); + grpc_op ops[1]; + memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), + &lb_client->md_rcvd); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + static void md_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { lb_client_data *lb_client = arg; GPR_ASSERT(lb_client->lb_call); diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 8b3f2ac3ef1..f4720a13455 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -109,11 +109,12 @@ void grpc_grpclb_request_destroy(grpc_grpclb_request *request) { gpr_free(request); } +typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response; grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( - gpr_slice encoded_response) { + gpr_slice encoded_grpc_grpclb_response) { pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), - GPR_SLICE_LENGTH(encoded_response)); + pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response), + GPR_SLICE_LENGTH(encoded_grpc_grpclb_response)); grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); if (!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res)) { @@ -128,12 +129,12 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( } grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_response) { + gpr_slice encoded_grpc_grpclb_response) { bool status; decode_serverlist_arg arg; pb_istream_t stream = - pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_response), - GPR_SLICE_LENGTH(encoded_response)); + pb_istream_from_buffer(GPR_SLICE_START_PTR(encoded_grpc_grpclb_response), + GPR_SLICE_LENGTH(encoded_grpc_grpclb_response)); pb_istream_t stream_at_start = stream; grpc_grpclb_response res; memset(&res, 0, sizeof(grpc_grpclb_response)); @@ -219,35 +220,25 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs) { GPR_ASSERT(lhs && rhs); if (lhs->has_seconds && rhs->has_seconds) { - if (lhs->seconds < rhs->seconds) { - return -1; - } - if (lhs->seconds > rhs->seconds) { - return 1; - } - goto compare_nanos; + if (lhs->seconds < rhs->seconds) return -1; + if (lhs->seconds > rhs->seconds) return 1; + } else if (lhs->has_seconds) { + return 1; + } else if (rhs->has_seconds) { + return -1; } - if (lhs->has_seconds + rhs->has_seconds == 0) { - goto compare_nanos; - } - // only lhs or rhs have seconds - return rhs->has_seconds ? 1 : -1; -compare_nanos: + GPR_ASSERT(lhs->seconds == rhs->seconds); if (lhs->has_nanos && rhs->has_nanos) { - if (lhs->nanos < rhs->nanos) { - return -1; - } - if (lhs->nanos > rhs->nanos) { - return 1; - } - return 0; - } - if (lhs->has_nanos + rhs->has_nanos == 0) { - return 0; + if (lhs->nanos < rhs->nanos) return -1; + if (lhs->nanos > rhs->nanos) return 1; + } else if (lhs->has_nanos) { + return 1; + } else if (rhs->has_nanos) { + return -1; } - // only lhs or rhs have nanos - return rhs->has_nanos ? 1 : -1; + + return 0; } void grpc_grpclb_initial_response_destroy( diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h index 5f1dff707eb..9726c87a37f 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h @@ -46,7 +46,6 @@ extern "C" { #define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128 typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request; -typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response; typedef grpc_lb_v1_InitialLoadBalanceResponse grpc_grpclb_initial_response; typedef grpc_lb_v1_Server grpc_grpclb_server; typedef grpc_lb_v1_Duration grpc_grpclb_duration; @@ -65,14 +64,14 @@ gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); /** Destroy \a request */ void grpc_grpclb_request_destroy(grpc_grpclb_request *request); -/** Parse (ie, decode) the bytes in \a encoded_response as a \a +/** Parse (ie, decode) the bytes in \a encoded_grpc_grpclb_response as a \a * grpc_grpclb_initial_response */ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( - gpr_slice encoded_response); + gpr_slice encoded_grpc_grpclb_response); /** Parse the list of servers from an encoded \a grpc_grpclb_response */ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( - gpr_slice encoded_response); + gpr_slice encoded_grpc_grpclb_response); /** Return a copy of \a sl. The caller is responsible for calling \a * grpc_grpclb_destroy_serverlist on the returned copy. */ diff --git a/test/core/nanopb/fuzzer_response.c b/test/core/nanopb/fuzzer_response.c index 21a5d7b9685..75a99faf3f3 100644 --- a/test/core/nanopb/fuzzer_response.c +++ b/test/core/nanopb/fuzzer_response.c @@ -43,9 +43,9 @@ bool leak_check = true; int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size); - grpc_grpclb_response *response; - if ((response = grpc_grpclb_response_parse(slice))) { - grpc_grpclb_response_destroy(response); + grpc_grpclb_initial_response *response; + if ((response = grpc_grpclb_initial_response_parse(slice))) { + grpc_grpclb_initial_response_destroy(response); } gpr_slice_unref(slice); return 0; diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index f5b678ef1d0..3c193d2b021 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -98,10 +98,6 @@ typedef struct test_fixture { int lb_server_update_delay_ms; } test_fixture; -static gpr_timespec n_seconds_time(int n) { - return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); -} - static void *tag(intptr_t t) { return (void *)t; } static gpr_slice build_response_payload_slice( @@ -149,7 +145,8 @@ static gpr_slice build_response_payload_slice( static void drain_cq(grpc_completion_queue *cq) { grpc_event ev; do { - ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL); + ev = grpc_completion_queue_next(cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), + NULL); } while (ev.type != GRPC_QUEUE_SHUTDOWN); } @@ -296,7 +293,8 @@ static void start_backend_server(server_fixture *sf) { tag(100)); GPR_ASSERT(GRPC_CALL_OK == error); gpr_log(GPR_INFO, "Server[%s] up", sf->servers_hostport); - ev = grpc_completion_queue_next(sf->cq, n_seconds_time(60), NULL); + ev = grpc_completion_queue_next(sf->cq, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(60), NULL); if (!ev.success) { gpr_log(GPR_INFO, "Server[%s] being torn down", sf->servers_hostport); cq_verifier_destroy(cqv); @@ -334,7 +332,8 @@ static void start_backend_server(server_fixture *sf) { op++; error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL); + ev = grpc_completion_queue_next( + sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(102)); if (request_payload_recv == NULL) { @@ -363,7 +362,8 @@ static void start_backend_server(server_fixture *sf) { error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); GPR_ASSERT(GRPC_CALL_OK == error); - ev = grpc_completion_queue_next(sf->cq, n_seconds_time(3), NULL); + ev = grpc_completion_queue_next( + sf->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(3), NULL); if (ev.type == GRPC_OP_COMPLETE && ev.success) { GPR_ASSERT(ev.tag = tag(103)); } else { @@ -427,7 +427,7 @@ static void perform_request(client_fixture *cf) { c = grpc_channel_create_call(cf->client, NULL, GRPC_PROPAGATE_DEFAULTS, cf->cq, "/foo", "foo.test.google.fr:1234", - n_seconds_time(1000), NULL); + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1000), NULL); gpr_log(GPR_INFO, "Call 0x%" PRIxPTR " created", (intptr_t)c); GPR_ASSERT(c); char *peer;