mirror of https://github.com/grpc/grpc.git
This plugin currently implements the grpclb protocol. NOTE: All the files under xds/ directory have been cloned from grpclb/ and will be updated in subsequently to support xds API.pull/16868/head
parent
57d37ca70f
commit
a95ea6dab4
14 changed files with 3024 additions and 0 deletions
@ -0,0 +1,140 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/client_load_reporting_filter.h" |
||||
|
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/profiling/timers.h" |
||||
|
||||
static grpc_error* init_channel_elem(grpc_channel_element* elem, |
||||
grpc_channel_element_args* args) { |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
static void destroy_channel_elem(grpc_channel_element* elem) {} |
||||
|
||||
namespace { |
||||
|
||||
struct call_data { |
||||
// Stats object to update.
|
||||
grpc_core::RefCountedPtr<grpc_core::XdsLbClientStats> client_stats; |
||||
// State for intercepting send_initial_metadata.
|
||||
grpc_closure on_complete_for_send; |
||||
grpc_closure* original_on_complete_for_send; |
||||
bool send_initial_metadata_succeeded; |
||||
// State for intercepting recv_initial_metadata.
|
||||
grpc_closure recv_initial_metadata_ready; |
||||
grpc_closure* original_recv_initial_metadata_ready; |
||||
bool recv_initial_metadata_succeeded; |
||||
}; |
||||
|
||||
} // namespace
|
||||
|
||||
static void on_complete_for_send(void* arg, grpc_error* error) { |
||||
call_data* calld = static_cast<call_data*>(arg); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
calld->send_initial_metadata_succeeded = true; |
||||
} |
||||
GRPC_CLOSURE_RUN(calld->original_on_complete_for_send, GRPC_ERROR_REF(error)); |
||||
} |
||||
|
||||
static void recv_initial_metadata_ready(void* arg, grpc_error* error) { |
||||
call_data* calld = static_cast<call_data*>(arg); |
||||
if (error == GRPC_ERROR_NONE) { |
||||
calld->recv_initial_metadata_succeeded = true; |
||||
} |
||||
GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, |
||||
GRPC_ERROR_REF(error)); |
||||
} |
||||
|
||||
static grpc_error* init_call_elem(grpc_call_element* elem, |
||||
const grpc_call_element_args* args) { |
||||
call_data* calld = static_cast<call_data*>(elem->call_data); |
||||
// Get stats object from context and take a ref.
|
||||
GPR_ASSERT(args->context != nullptr); |
||||
if (args->context[GRPC_GRPCLB_CLIENT_STATS].value != nullptr) { |
||||
calld->client_stats = static_cast<grpc_core::XdsLbClientStats*>( |
||||
args->context[GRPC_GRPCLB_CLIENT_STATS].value) |
||||
->Ref(); |
||||
// Record call started.
|
||||
calld->client_stats->AddCallStarted(); |
||||
} |
||||
return GRPC_ERROR_NONE; |
||||
} |
||||
|
||||
static void destroy_call_elem(grpc_call_element* elem, |
||||
const grpc_call_final_info* final_info, |
||||
grpc_closure* ignored) { |
||||
call_data* calld = static_cast<call_data*>(elem->call_data); |
||||
if (calld->client_stats != nullptr) { |
||||
// Record call finished, optionally setting client_failed_to_send and
|
||||
// received.
|
||||
calld->client_stats->AddCallFinished( |
||||
!calld->send_initial_metadata_succeeded /* client_failed_to_send */, |
||||
calld->recv_initial_metadata_succeeded /* known_received */); |
||||
// All done, so unref the stats object.
|
||||
// TODO(roth): Eliminate this once filter stack is converted to C++.
|
||||
calld->client_stats.reset(); |
||||
} |
||||
} |
||||
|
||||
static void start_transport_stream_op_batch( |
||||
grpc_call_element* elem, grpc_transport_stream_op_batch* batch) { |
||||
call_data* calld = static_cast<call_data*>(elem->call_data); |
||||
GPR_TIMER_SCOPE("clr_start_transport_stream_op_batch", 0); |
||||
if (calld->client_stats != nullptr) { |
||||
// Intercept send_initial_metadata.
|
||||
if (batch->send_initial_metadata) { |
||||
calld->original_on_complete_for_send = batch->on_complete; |
||||
GRPC_CLOSURE_INIT(&calld->on_complete_for_send, on_complete_for_send, |
||||
calld, grpc_schedule_on_exec_ctx); |
||||
batch->on_complete = &calld->on_complete_for_send; |
||||
} |
||||
// Intercept recv_initial_metadata.
|
||||
if (batch->recv_initial_metadata) { |
||||
calld->original_recv_initial_metadata_ready = |
||||
batch->payload->recv_initial_metadata.recv_initial_metadata_ready; |
||||
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, |
||||
recv_initial_metadata_ready, calld, |
||||
grpc_schedule_on_exec_ctx); |
||||
batch->payload->recv_initial_metadata.recv_initial_metadata_ready = |
||||
&calld->recv_initial_metadata_ready; |
||||
} |
||||
} |
||||
// Chain to next filter.
|
||||
grpc_call_next_op(elem, batch); |
||||
} |
||||
|
||||
const grpc_channel_filter xds_client_load_reporting_filter = { |
||||
start_transport_stream_op_batch, |
||||
grpc_channel_next_op, |
||||
sizeof(call_data), |
||||
init_call_elem, |
||||
grpc_call_stack_ignore_set_pollset_or_pollset_set, |
||||
destroy_call_elem, |
||||
0, // sizeof(channel_data)
|
||||
init_channel_elem, |
||||
destroy_channel_elem, |
||||
grpc_channel_next_get_info, |
||||
"client_load_reporting"}; |
@ -0,0 +1,29 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
|
||||
extern const grpc_channel_filter xds_client_load_reporting_filter; |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_CLIENT_LOAD_REPORTING_FILTER_H \ |
||||
*/ |
@ -0,0 +1,307 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "pb_decode.h" |
||||
#include "pb_encode.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/load_balancer_api.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
|
||||
/* invoked once for every Server in ServerList */ |
||||
static bool count_serverlist(pb_istream_t* stream, const pb_field_t* field, |
||||
void** arg) { |
||||
xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>(*arg); |
||||
xds_grpclb_server server; |
||||
if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, &server))) { |
||||
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); |
||||
return false; |
||||
} |
||||
++sl->num_servers; |
||||
return true; |
||||
} |
||||
|
||||
typedef struct decode_serverlist_arg { |
||||
/* The decoding callback is invoked once per server in serverlist. Remember
|
||||
* which index of the serverlist are we currently decoding */ |
||||
size_t decoding_idx; |
||||
/* The decoded serverlist */ |
||||
xds_grpclb_serverlist* serverlist; |
||||
} decode_serverlist_arg; |
||||
|
||||
/* invoked once for every Server in ServerList */ |
||||
static bool decode_serverlist(pb_istream_t* stream, const pb_field_t* field, |
||||
void** arg) { |
||||
decode_serverlist_arg* dec_arg = static_cast<decode_serverlist_arg*>(*arg); |
||||
GPR_ASSERT(dec_arg->serverlist->num_servers >= dec_arg->decoding_idx); |
||||
xds_grpclb_server* server = |
||||
static_cast<xds_grpclb_server*>(gpr_zalloc(sizeof(xds_grpclb_server))); |
||||
if (GPR_UNLIKELY(!pb_decode(stream, grpc_lb_v1_Server_fields, server))) { |
||||
gpr_free(server); |
||||
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(stream)); |
||||
return false; |
||||
} |
||||
dec_arg->serverlist->servers[dec_arg->decoding_idx++] = server; |
||||
return true; |
||||
} |
||||
|
||||
xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name) { |
||||
xds_grpclb_request* req = |
||||
static_cast<xds_grpclb_request*>(gpr_malloc(sizeof(xds_grpclb_request))); |
||||
req->has_client_stats = false; |
||||
req->has_initial_request = true; |
||||
req->initial_request.has_name = true; |
||||
strncpy(req->initial_request.name, lb_service_name, |
||||
XDS_SERVICE_NAME_MAX_LENGTH); |
||||
return req; |
||||
} |
||||
|
||||
static void populate_timestamp(gpr_timespec timestamp, |
||||
xds_grpclb_timestamp* timestamp_pb) { |
||||
timestamp_pb->has_seconds = true; |
||||
timestamp_pb->seconds = timestamp.tv_sec; |
||||
timestamp_pb->has_nanos = true; |
||||
timestamp_pb->nanos = timestamp.tv_nsec; |
||||
} |
||||
|
||||
static bool encode_string(pb_ostream_t* stream, const pb_field_t* field, |
||||
void* const* arg) { |
||||
char* str = static_cast<char*>(*arg); |
||||
if (!pb_encode_tag_for_field(stream, field)) return false; |
||||
return pb_encode_string(stream, reinterpret_cast<uint8_t*>(str), strlen(str)); |
||||
} |
||||
|
||||
static bool encode_drops(pb_ostream_t* stream, const pb_field_t* field, |
||||
void* const* arg) { |
||||
grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries = |
||||
static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>(*arg); |
||||
if (drop_entries == nullptr) return true; |
||||
for (size_t i = 0; i < drop_entries->size(); ++i) { |
||||
if (!pb_encode_tag_for_field(stream, field)) return false; |
||||
grpc_lb_v1_ClientStatsPerToken drop_message; |
||||
drop_message.load_balance_token.funcs.encode = encode_string; |
||||
drop_message.load_balance_token.arg = (*drop_entries)[i].token.get(); |
||||
drop_message.has_num_calls = true; |
||||
drop_message.num_calls = (*drop_entries)[i].count; |
||||
if (!pb_encode_submessage(stream, grpc_lb_v1_ClientStatsPerToken_fields, |
||||
&drop_message)) { |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
xds_grpclb_request* xds_grpclb_load_report_request_create_locked( |
||||
grpc_core::XdsLbClientStats* client_stats) { |
||||
xds_grpclb_request* req = |
||||
static_cast<xds_grpclb_request*>(gpr_zalloc(sizeof(xds_grpclb_request))); |
||||
req->has_client_stats = true; |
||||
req->client_stats.has_timestamp = true; |
||||
populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); |
||||
req->client_stats.has_num_calls_started = true; |
||||
req->client_stats.has_num_calls_finished = true; |
||||
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; |
||||
req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; |
||||
req->client_stats.has_num_calls_finished_known_received = true; |
||||
req->client_stats.calls_finished_with_drop.funcs.encode = encode_drops; |
||||
grpc_core::UniquePtr<grpc_core::XdsLbClientStats::DroppedCallCounts> |
||||
drop_counts; |
||||
client_stats->GetLocked( |
||||
&req->client_stats.num_calls_started, |
||||
&req->client_stats.num_calls_finished, |
||||
&req->client_stats.num_calls_finished_with_client_failed_to_send, |
||||
&req->client_stats.num_calls_finished_known_received, &drop_counts); |
||||
// Will be deleted in xds_grpclb_request_destroy().
|
||||
req->client_stats.calls_finished_with_drop.arg = drop_counts.release(); |
||||
return req; |
||||
} |
||||
|
||||
grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request) { |
||||
size_t encoded_length; |
||||
pb_ostream_t sizestream; |
||||
pb_ostream_t outputstream; |
||||
grpc_slice slice; |
||||
memset(&sizestream, 0, sizeof(pb_ostream_t)); |
||||
pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); |
||||
encoded_length = sizestream.bytes_written; |
||||
|
||||
slice = GRPC_SLICE_MALLOC(encoded_length); |
||||
outputstream = |
||||
pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); |
||||
GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, |
||||
request) != 0); |
||||
return slice; |
||||
} |
||||
|
||||
void xds_grpclb_request_destroy(xds_grpclb_request* request) { |
||||
if (request->has_client_stats) { |
||||
grpc_core::XdsLbClientStats::DroppedCallCounts* drop_entries = |
||||
static_cast<grpc_core::XdsLbClientStats::DroppedCallCounts*>( |
||||
request->client_stats.calls_finished_with_drop.arg); |
||||
grpc_core::Delete(drop_entries); |
||||
} |
||||
gpr_free(request); |
||||
} |
||||
|
||||
typedef grpc_lb_v1_LoadBalanceResponse xds_grpclb_response; |
||||
xds_grpclb_initial_response* xds_grpclb_initial_response_parse( |
||||
grpc_slice encoded_xds_grpclb_response) { |
||||
pb_istream_t stream = |
||||
pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response), |
||||
GRPC_SLICE_LENGTH(encoded_xds_grpclb_response)); |
||||
xds_grpclb_response res; |
||||
memset(&res, 0, sizeof(xds_grpclb_response)); |
||||
if (GPR_UNLIKELY( |
||||
!pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res))) { |
||||
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); |
||||
return nullptr; |
||||
} |
||||
|
||||
if (!res.has_initial_response) return nullptr; |
||||
|
||||
xds_grpclb_initial_response* initial_res = |
||||
static_cast<xds_grpclb_initial_response*>( |
||||
gpr_malloc(sizeof(xds_grpclb_initial_response))); |
||||
memcpy(initial_res, &res.initial_response, |
||||
sizeof(xds_grpclb_initial_response)); |
||||
|
||||
return initial_res; |
||||
} |
||||
|
||||
xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist( |
||||
grpc_slice encoded_xds_grpclb_response) { |
||||
pb_istream_t stream = |
||||
pb_istream_from_buffer(GRPC_SLICE_START_PTR(encoded_xds_grpclb_response), |
||||
GRPC_SLICE_LENGTH(encoded_xds_grpclb_response)); |
||||
pb_istream_t stream_at_start = stream; |
||||
xds_grpclb_serverlist* sl = static_cast<xds_grpclb_serverlist*>( |
||||
gpr_zalloc(sizeof(xds_grpclb_serverlist))); |
||||
xds_grpclb_response res; |
||||
memset(&res, 0, sizeof(xds_grpclb_response)); |
||||
// First pass: count number of servers.
|
||||
res.server_list.servers.funcs.decode = count_serverlist; |
||||
res.server_list.servers.arg = sl; |
||||
bool status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, &res); |
||||
if (GPR_UNLIKELY(!status)) { |
||||
gpr_free(sl); |
||||
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); |
||||
return nullptr; |
||||
} |
||||
// Second pass: populate servers.
|
||||
if (sl->num_servers > 0) { |
||||
sl->servers = static_cast<xds_grpclb_server**>( |
||||
gpr_zalloc(sizeof(xds_grpclb_server*) * sl->num_servers)); |
||||
decode_serverlist_arg decode_arg; |
||||
memset(&decode_arg, 0, sizeof(decode_arg)); |
||||
decode_arg.serverlist = sl; |
||||
res.server_list.servers.funcs.decode = decode_serverlist; |
||||
res.server_list.servers.arg = &decode_arg; |
||||
status = pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, |
||||
&res); |
||||
if (GPR_UNLIKELY(!status)) { |
||||
xds_grpclb_destroy_serverlist(sl); |
||||
gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); |
||||
return nullptr; |
||||
} |
||||
} |
||||
return sl; |
||||
} |
||||
|
||||
void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist) { |
||||
if (serverlist == nullptr) { |
||||
return; |
||||
} |
||||
for (size_t i = 0; i < serverlist->num_servers; i++) { |
||||
gpr_free(serverlist->servers[i]); |
||||
} |
||||
gpr_free(serverlist->servers); |
||||
gpr_free(serverlist); |
||||
} |
||||
|
||||
xds_grpclb_serverlist* xds_grpclb_serverlist_copy( |
||||
const xds_grpclb_serverlist* sl) { |
||||
xds_grpclb_serverlist* copy = static_cast<xds_grpclb_serverlist*>( |
||||
gpr_zalloc(sizeof(xds_grpclb_serverlist))); |
||||
copy->num_servers = sl->num_servers; |
||||
copy->servers = static_cast<xds_grpclb_server**>( |
||||
gpr_malloc(sizeof(xds_grpclb_server*) * sl->num_servers)); |
||||
for (size_t i = 0; i < sl->num_servers; i++) { |
||||
copy->servers[i] = |
||||
static_cast<xds_grpclb_server*>(gpr_malloc(sizeof(xds_grpclb_server))); |
||||
memcpy(copy->servers[i], sl->servers[i], sizeof(xds_grpclb_server)); |
||||
} |
||||
return copy; |
||||
} |
||||
|
||||
bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs, |
||||
const xds_grpclb_serverlist* rhs) { |
||||
if (lhs == nullptr || rhs == nullptr) { |
||||
return false; |
||||
} |
||||
if (lhs->num_servers != rhs->num_servers) { |
||||
return false; |
||||
} |
||||
for (size_t i = 0; i < lhs->num_servers; i++) { |
||||
if (!xds_grpclb_server_equals(lhs->servers[i], rhs->servers[i])) { |
||||
return false; |
||||
} |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
bool xds_grpclb_server_equals(const xds_grpclb_server* lhs, |
||||
const xds_grpclb_server* rhs) { |
||||
return memcmp(lhs, rhs, sizeof(xds_grpclb_server)) == 0; |
||||
} |
||||
|
||||
int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs, |
||||
const xds_grpclb_duration* rhs) { |
||||
GPR_ASSERT(lhs && rhs); |
||||
if (lhs->has_seconds && rhs->has_seconds) { |
||||
if (lhs->seconds < rhs->seconds) return -1; |
||||
if (lhs->seconds > rhs->seconds) return 1; |
||||
} else if (lhs->has_seconds) { |
||||
return 1; |
||||
} else if (rhs->has_seconds) { |
||||
return -1; |
||||
} |
||||
|
||||
GPR_ASSERT(lhs->seconds == rhs->seconds); |
||||
if (lhs->has_nanos && rhs->has_nanos) { |
||||
if (lhs->nanos < rhs->nanos) return -1; |
||||
if (lhs->nanos > rhs->nanos) return 1; |
||||
} else if (lhs->has_nanos) { |
||||
return 1; |
||||
} else if (rhs->has_nanos) { |
||||
return -1; |
||||
} |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb) { |
||||
return static_cast<grpc_millis>( |
||||
(duration_pb->has_seconds ? duration_pb->seconds : 0) * GPR_MS_PER_SEC + |
||||
(duration_pb->has_nanos ? duration_pb->nanos : 0) / GPR_NS_PER_MS); |
||||
} |
||||
|
||||
void xds_grpclb_initial_response_destroy( |
||||
xds_grpclb_initial_response* response) { |
||||
gpr_free(response); |
||||
} |
@ -0,0 +1,89 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/slice_buffer.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" |
||||
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||
|
||||
#define XDS_SERVICE_NAME_MAX_LENGTH 128 |
||||
|
||||
typedef grpc_lb_v1_Server_ip_address_t xds_grpclb_ip_address; |
||||
typedef grpc_lb_v1_LoadBalanceRequest xds_grpclb_request; |
||||
typedef grpc_lb_v1_InitialLoadBalanceResponse xds_grpclb_initial_response; |
||||
typedef grpc_lb_v1_Server xds_grpclb_server; |
||||
typedef google_protobuf_Duration xds_grpclb_duration; |
||||
typedef google_protobuf_Timestamp xds_grpclb_timestamp; |
||||
|
||||
typedef struct { |
||||
xds_grpclb_server** servers; |
||||
size_t num_servers; |
||||
} xds_grpclb_serverlist; |
||||
|
||||
/** Create a request for a gRPC LB service under \a lb_service_name */ |
||||
xds_grpclb_request* xds_grpclb_request_create(const char* lb_service_name); |
||||
xds_grpclb_request* xds_grpclb_load_report_request_create_locked( |
||||
grpc_core::XdsLbClientStats* client_stats); |
||||
|
||||
/** Protocol Buffers v3-encode \a request */ |
||||
grpc_slice xds_grpclb_request_encode(const xds_grpclb_request* request); |
||||
|
||||
/** Destroy \a request */ |
||||
void xds_grpclb_request_destroy(xds_grpclb_request* request); |
||||
|
||||
/** Parse (ie, decode) the bytes in \a encoded_xds_grpclb_response as a \a
|
||||
* xds_grpclb_initial_response */ |
||||
xds_grpclb_initial_response* xds_grpclb_initial_response_parse( |
||||
grpc_slice encoded_xds_grpclb_response); |
||||
|
||||
/** Parse the list of servers from an encoded \a xds_grpclb_response */ |
||||
xds_grpclb_serverlist* xds_grpclb_response_parse_serverlist( |
||||
grpc_slice encoded_xds_grpclb_response); |
||||
|
||||
/** Return a copy of \a sl. The caller is responsible for calling \a
|
||||
* xds_grpclb_destroy_serverlist on the returned copy. */ |
||||
xds_grpclb_serverlist* xds_grpclb_serverlist_copy( |
||||
const xds_grpclb_serverlist* sl); |
||||
|
||||
bool xds_grpclb_serverlist_equals(const xds_grpclb_serverlist* lhs, |
||||
const xds_grpclb_serverlist* rhs); |
||||
|
||||
bool xds_grpclb_server_equals(const xds_grpclb_server* lhs, |
||||
const xds_grpclb_server* rhs); |
||||
|
||||
/** Destroy \a serverlist */ |
||||
void xds_grpclb_destroy_serverlist(xds_grpclb_serverlist* serverlist); |
||||
|
||||
/** Compare \a lhs against \a rhs and return 0 if \a lhs and \a rhs are equal,
|
||||
* < 0 if \a lhs represents a duration shorter than \a rhs and > 0 otherwise */ |
||||
int xds_grpclb_duration_compare(const xds_grpclb_duration* lhs, |
||||
const xds_grpclb_duration* rhs); |
||||
|
||||
grpc_millis xds_grpclb_duration_to_millis(xds_grpclb_duration* duration_pb); |
||||
|
||||
/** Destroy \a initial_response */ |
||||
void xds_grpclb_initial_response_destroy(xds_grpclb_initial_response* response); |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_LOAD_BALANCER_API_H \ |
||||
*/ |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,36 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
/** Channel arg indicating if a target corresponding to the address is grpclb
|
||||
* loadbalancer. The type of this arg is an integer and the value is treated as |
||||
* a bool. */ |
||||
#define GRPC_ARG_ADDRESS_IS_XDS_LOAD_BALANCER \ |
||||
"grpc.address_is_xds_load_balancer" |
||||
/** Channel arg indicating if a target corresponding to the address is a backend
|
||||
* received from a balancer. The type of this arg is an integer and the value is |
||||
* treated as a bool. */ |
||||
#define GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER \ |
||||
"grpc.address_is_backend_from_xds_load_balancer" |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_H \ |
||||
*/ |
@ -0,0 +1,26 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" |
||||
|
||||
grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( |
||||
grpc_channel_args* args) { |
||||
return args; |
||||
} |
@ -0,0 +1,36 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy_factory.h" |
||||
|
||||
/// Makes any necessary modifications to \a args for use in the xds
|
||||
/// balancer channel.
|
||||
///
|
||||
/// Takes ownership of \a args.
|
||||
///
|
||||
/// Caller takes ownership of the returned args.
|
||||
grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( |
||||
grpc_channel_args* args); |
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CHANNEL_H \ |
||||
*/ |
@ -0,0 +1,107 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel.h" |
||||
|
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <string.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gpr/string.h" |
||||
#include "src/core/lib/iomgr/sockaddr_utils.h" |
||||
#include "src/core/lib/security/credentials/credentials.h" |
||||
#include "src/core/lib/security/transport/target_authority_table.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace { |
||||
|
||||
int BalancerNameCmp(const grpc_core::UniquePtr<char>& a, |
||||
const grpc_core::UniquePtr<char>& b) { |
||||
return strcmp(a.get(), b.get()); |
||||
} |
||||
|
||||
RefCountedPtr<TargetAuthorityTable> CreateTargetAuthorityTable( |
||||
grpc_lb_addresses* addresses) { |
||||
TargetAuthorityTable::Entry* target_authority_entries = |
||||
static_cast<TargetAuthorityTable::Entry*>(gpr_zalloc( |
||||
sizeof(*target_authority_entries) * addresses->num_addresses)); |
||||
for (size_t i = 0; i < addresses->num_addresses; ++i) { |
||||
char* addr_str; |
||||
GPR_ASSERT(grpc_sockaddr_to_string( |
||||
&addr_str, &addresses->addresses[i].address, true) > 0); |
||||
target_authority_entries[i].key = grpc_slice_from_copied_string(addr_str); |
||||
target_authority_entries[i].value.reset( |
||||
gpr_strdup(addresses->addresses[i].balancer_name)); |
||||
gpr_free(addr_str); |
||||
} |
||||
RefCountedPtr<TargetAuthorityTable> target_authority_table = |
||||
TargetAuthorityTable::Create(addresses->num_addresses, |
||||
target_authority_entries, BalancerNameCmp); |
||||
gpr_free(target_authority_entries); |
||||
return target_authority_table; |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace grpc_core
|
||||
|
||||
grpc_channel_args* grpc_lb_policy_xds_modify_lb_channel_args( |
||||
grpc_channel_args* args) { |
||||
const char* args_to_remove[1]; |
||||
size_t num_args_to_remove = 0; |
||||
grpc_arg args_to_add[2]; |
||||
size_t num_args_to_add = 0; |
||||
// Add arg for targets info table.
|
||||
const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_LB_ADDRESSES); |
||||
GPR_ASSERT(arg != nullptr); |
||||
GPR_ASSERT(arg->type == GRPC_ARG_POINTER); |
||||
grpc_lb_addresses* addresses = |
||||
static_cast<grpc_lb_addresses*>(arg->value.pointer.p); |
||||
grpc_core::RefCountedPtr<grpc_core::TargetAuthorityTable> |
||||
target_authority_table = grpc_core::CreateTargetAuthorityTable(addresses); |
||||
args_to_add[num_args_to_add++] = |
||||
grpc_core::CreateTargetAuthorityTableChannelArg( |
||||
target_authority_table.get()); |
||||
// Substitute the channel credentials with a version without call
|
||||
// credentials: the load balancer is not necessarily trusted to handle
|
||||
// bearer token credentials.
|
||||
grpc_channel_credentials* channel_credentials = |
||||
grpc_channel_credentials_find_in_args(args); |
||||
grpc_channel_credentials* creds_sans_call_creds = nullptr; |
||||
if (channel_credentials != nullptr) { |
||||
creds_sans_call_creds = |
||||
grpc_channel_credentials_duplicate_without_call_credentials( |
||||
channel_credentials); |
||||
GPR_ASSERT(creds_sans_call_creds != nullptr); |
||||
args_to_remove[num_args_to_remove++] = GRPC_ARG_CHANNEL_CREDENTIALS; |
||||
args_to_add[num_args_to_add++] = |
||||
grpc_channel_credentials_to_arg(creds_sans_call_creds); |
||||
} |
||||
grpc_channel_args* result = grpc_channel_args_copy_and_add_and_remove( |
||||
args, args_to_remove, num_args_to_remove, args_to_add, num_args_to_add); |
||||
// Clean up.
|
||||
grpc_channel_args_destroy(args); |
||||
if (creds_sans_call_creds != nullptr) { |
||||
grpc_channel_credentials_unref(creds_sans_call_creds); |
||||
} |
||||
return result; |
||||
} |
@ -0,0 +1,85 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_client_stats.h" |
||||
|
||||
#include <grpc/support/atm.h> |
||||
#include <grpc/support/string_util.h> |
||||
#include <string.h> |
||||
|
||||
namespace grpc_core { |
||||
|
||||
void XdsLbClientStats::AddCallStarted() { |
||||
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); |
||||
} |
||||
|
||||
void XdsLbClientStats::AddCallFinished(bool finished_with_client_failed_to_send, |
||||
bool finished_known_received) { |
||||
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); |
||||
if (finished_with_client_failed_to_send) { |
||||
gpr_atm_full_fetch_add(&num_calls_finished_with_client_failed_to_send_, |
||||
(gpr_atm)1); |
||||
} |
||||
if (finished_known_received) { |
||||
gpr_atm_full_fetch_add(&num_calls_finished_known_received_, (gpr_atm)1); |
||||
} |
||||
} |
||||
|
||||
void XdsLbClientStats::AddCallDroppedLocked(char* token) { |
||||
// Increment num_calls_started and num_calls_finished.
|
||||
gpr_atm_full_fetch_add(&num_calls_started_, (gpr_atm)1); |
||||
gpr_atm_full_fetch_add(&num_calls_finished_, (gpr_atm)1); |
||||
// Record the drop.
|
||||
if (drop_token_counts_ == nullptr) { |
||||
drop_token_counts_.reset(New<DroppedCallCounts>()); |
||||
} |
||||
for (size_t i = 0; i < drop_token_counts_->size(); ++i) { |
||||
if (strcmp((*drop_token_counts_)[i].token.get(), token) == 0) { |
||||
++(*drop_token_counts_)[i].count; |
||||
return; |
||||
} |
||||
} |
||||
// Not found, so add a new entry.
|
||||
drop_token_counts_->emplace_back(UniquePtr<char>(gpr_strdup(token)), 1); |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
void AtomicGetAndResetCounter(int64_t* value, gpr_atm* counter) { |
||||
*value = static_cast<int64_t>(gpr_atm_full_xchg(counter, (gpr_atm)0)); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
void XdsLbClientStats::GetLocked( |
||||
int64_t* num_calls_started, int64_t* num_calls_finished, |
||||
int64_t* num_calls_finished_with_client_failed_to_send, |
||||
int64_t* num_calls_finished_known_received, |
||||
UniquePtr<DroppedCallCounts>* drop_token_counts) { |
||||
AtomicGetAndResetCounter(num_calls_started, &num_calls_started_); |
||||
AtomicGetAndResetCounter(num_calls_finished, &num_calls_finished_); |
||||
AtomicGetAndResetCounter(num_calls_finished_with_client_failed_to_send, |
||||
&num_calls_finished_with_client_failed_to_send_); |
||||
AtomicGetAndResetCounter(num_calls_finished_known_received, |
||||
&num_calls_finished_known_received_); |
||||
*drop_token_counts = std::move(drop_token_counts_); |
||||
} |
||||
|
||||
} // namespace grpc_core
|
@ -0,0 +1,72 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/atm.h> |
||||
|
||||
#include "src/core/lib/gprpp/inlined_vector.h" |
||||
#include "src/core/lib/gprpp/memory.h" |
||||
#include "src/core/lib/gprpp/ref_counted.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
class XdsLbClientStats : public RefCounted<XdsLbClientStats> { |
||||
public: |
||||
struct DropTokenCount { |
||||
UniquePtr<char> token; |
||||
int64_t count; |
||||
|
||||
DropTokenCount(UniquePtr<char> token, int64_t count) |
||||
: token(std::move(token)), count(count) {} |
||||
}; |
||||
|
||||
typedef InlinedVector<DropTokenCount, 10> DroppedCallCounts; |
||||
|
||||
XdsLbClientStats() {} |
||||
|
||||
void AddCallStarted(); |
||||
void AddCallFinished(bool finished_with_client_failed_to_send, |
||||
bool finished_known_received); |
||||
|
||||
// This method is not thread-safe; caller must synchronize.
|
||||
void AddCallDroppedLocked(char* token); |
||||
|
||||
// This method is not thread-safe; caller must synchronize.
|
||||
void GetLocked(int64_t* num_calls_started, int64_t* num_calls_finished, |
||||
int64_t* num_calls_finished_with_client_failed_to_send, |
||||
int64_t* num_calls_finished_known_received, |
||||
UniquePtr<DroppedCallCounts>* drop_token_counts); |
||||
|
||||
private: |
||||
// This field must only be accessed via *_locked() methods.
|
||||
UniquePtr<DroppedCallCounts> drop_token_counts_; |
||||
// These fields may be accessed from multiple threads at a time.
|
||||
gpr_atm num_calls_started_ = 0; |
||||
gpr_atm num_calls_finished_ = 0; |
||||
gpr_atm num_calls_finished_with_client_failed_to_send_ = 0; |
||||
gpr_atm num_calls_finished_known_received_ = 0; |
||||
}; |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_XDS_XDS_CLIENT_STATS_H \ |
||||
*/ |
Loading…
Reference in new issue