mirror of https://github.com/grpc/grpc.git
commit
3511f45964
147 changed files with 4760 additions and 1702 deletions
@ -0,0 +1,43 @@ |
||||
# Troubleshooting gRPC |
||||
|
||||
This guide is for troubleshooting gRPC implementations based on C core library (sources for most of them are living in the `grpc/grpc` repository). |
||||
|
||||
## Enabling extra logging and tracing |
||||
|
||||
Extra logging can be very useful for diagnosing problems. All gRPC implementations based on C core library support |
||||
the `GRPC_VERBOSITY` and `GRPC_TRACE` environment variables that can be used to increase the amount of information |
||||
that gets printed to stderr. |
||||
|
||||
## GRPC_VERBOSITY |
||||
|
||||
`GRPC_VERBOSITY` is used to set the minimum level of log messages printed by gRPC (supported values are `DEBUG`, `INFO` and `ERROR`). If this environment variable is unset, only `ERROR` logs will be printed. |
||||
|
||||
## GRPC_TRACE |
||||
|
||||
`GRPC_TRACE` can be used to enable extra logging for some internal gRPC components. Enabling the right traces can be invaluable |
||||
for diagnosing for what is going wrong when things aren't working as intended. Possible values for `GRPC_TRACE` are listed in [Environment Variables Overview](doc/environment_variables.md). |
||||
Multiple traces can be enable at once (use comma as separator). |
||||
|
||||
``` |
||||
# Enable debug logs for an application |
||||
GRPC_VERBOSITY=debug ./helloworld_application_using_grpc |
||||
``` |
||||
|
||||
``` |
||||
# Print information about invocations of low-level C core API. |
||||
# Note that trace logs of log level DEBUG won't be displayed. |
||||
# Also note that most tracers user log level INFO, so without setting |
||||
# GPRC_VERBOSITY accordingly, no traces will be printed. |
||||
GRPC_VERBOSITY=info GRPC_TRACE=api ./helloworld_application_using_grpc |
||||
``` |
||||
|
||||
``` |
||||
# Print info from 3 different tracers, including tracing logs with log level DEBUG |
||||
GRPC_VERBOSITY=debug GRPC_TRACE=tcp,http,api ./helloworld_application_using_grpc |
||||
``` |
||||
|
||||
Known limitations: `GPRC_TRACE=tcp` is currently not implemented for Windows (you won't see any tcp traces). |
||||
|
||||
Please note that the `GRPC_TRACE` environment variable has nothing to do with gRPC's "tracing" feature (= tracing RPCs in |
||||
microservice environment to gain insight about how requests are processed by deployment), it is merely used to enable printing |
||||
of extra logs. |
@ -0,0 +1,50 @@ |
||||
# Keepalive User Guide for gRPC Core (and dependants) |
||||
|
||||
The keepalive ping is a way to check if a channel is currently working by sending HTTP2 pings over the transport. It is sent periodically, and if the ping is not acknowledged by the peer within a certain timeout period, the transport is disconnected. |
||||
|
||||
This guide documents the knobs within gRPC core to control the current behavior of the keepalive ping. |
||||
|
||||
The keepalive ping is controlled by two important channel arguments - |
||||
* **GRPC_ARG_KEEPALIVE_TIME_MS** |
||||
* This channel argument controls the period (in milliseconds) after which a keepalive ping is sent on the transport. |
||||
* **GRPC_ARG_KEEPALIVE_TIMEOUT_MS** |
||||
* This channel argument controls the amount of time (in milliseconds), the sender of the keepalive ping waits for an acknowledgement. If it does not receive an acknowledgement within this time, it will close the connection. |
||||
|
||||
The above two channel arguments should be sufficient for most users, but the following arguments can also be useful in certain use cases. |
||||
* **GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS** |
||||
* This channel argument if set to 1 (0 : false; 1 : true), allows keepalive pings to be sent even if there are no calls in flight. |
||||
* **GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA** |
||||
* This channel argument controls the maximum number of pings that can be sent when there is no other data (data frame or header frame) to be sent. GRPC Core will not continue sending pings if we run over the limit. Setting it to 0 allows sending pings without sending data. |
||||
* **GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS** |
||||
* If there is no data being sent on the transport, this channel argument controls the minimum time (in milliseconds) gRPC Core will wait between successive pings. |
||||
* **GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS** |
||||
* If there is no data being sent on the transport, this channel argument on the server side controls the minimum time (in milliseconds) that gRPC Core would expect between receiving successive pings. If the time between successive pings is less that than this time, then the ping will be considered a bad ping from the peer. Such a ping counts as a ‘ping strike’. |
||||
On the client side, this does not have any effect. |
||||
* **GRPC_ARG_HTTP2_MAX_PING_STRIKES** |
||||
* This arg controls the maximum number of bad pings that the server will tolerate before sending an HTTP2 GOAWAY frame and closing the transport. Setting it to 0 allows the server to accept any number of bad pings. |
||||
|
||||
### Defaults Values |
||||
|
||||
Channel Argument| Client|Server |
||||
----------------|-------|------ |
||||
GRPC_ARG_KEEPALIVE_TIME_MS|INT_MAX (disabled)|7200000 (2 hours) |
||||
GRPC_ARG_KEEPALIVE_TIMEOUT_MS|20000 (20 seconds)|20000 (20 seconds) |
||||
GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS|0 (false)|0 (false) |
||||
GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA|2|2 |
||||
GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS|300000 (5 minutes)|300000 (5 minutes) |
||||
GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS|N/A|300000 (5 minutes) |
||||
GRPC_ARG_HTTP2_MAX_PING_STRIKES|N/A|2 |
||||
|
||||
### FAQ |
||||
* When is the keepalive timer started? |
||||
* The keepalive timer is started when a transport is done connecting (after handshake). |
||||
* What happens when the keepalive timer fires? |
||||
* When the keepalive timer fires, gRPC Core would try to send a keepalive ping on the transport. This ping can be blocked if - |
||||
* there is no active call on that transport and GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS is false. |
||||
* the number of pings already sent on the transport without any data has already exceeded GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA. |
||||
* the time expired since the previous ping is less than GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS. |
||||
* If a keepalive ping is not blocked and is sent on the transport, then the keepalive watchdog timer is started which would close the transport if the ping is not acknowledged before it fires. |
||||
* Why am I receiving a GOAWAY with error code ENHANCE_YOUR_CALM? |
||||
* A server sends a GOAWAY with ENHANCE_YOUR_CALM if the client sends too many misbehaving pings. For example - |
||||
* if a server has GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS set to false, and the client sends pings without there being any call in flight. |
||||
* if the client's GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS setting is lower than the server's GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS. |
@ -0,0 +1,53 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPCPP_EXT_SERVER_LOAD_REPORTING_H |
||||
#define GRPCPP_EXT_SERVER_LOAD_REPORTING_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/load_reporting.h> |
||||
#include <grpcpp/impl/codegen/config.h> |
||||
#include <grpcpp/impl/codegen/server_context.h> |
||||
#include <grpcpp/impl/server_builder_option.h> |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
namespace experimental { |
||||
|
||||
// The ServerBuilderOption to enable server-side load reporting feature. To
|
||||
// enable the feature, please make sure the binary builds with the
|
||||
// grpcpp_server_load_reporting library and set this option in the
|
||||
// ServerBuilder.
|
||||
class LoadReportingServiceServerBuilderOption : public ServerBuilderOption { |
||||
public: |
||||
void UpdateArguments(::grpc::ChannelArguments* args) override; |
||||
void UpdatePlugins(std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* |
||||
plugins) override; |
||||
}; |
||||
|
||||
// Adds the load reporting cost with \a cost_name and \a cost_value in the
|
||||
// trailing metadata of the server context.
|
||||
void AddLoadReportingCost(grpc::ServerContext* ctx, |
||||
const grpc::string& cost_name, double cost_value); |
||||
|
||||
} // namespace experimental
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPCPP_EXT_SERVER_LOAD_REPORTING_H
|
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,113 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/client_channel.h" |
||||
#include "src/core/ext/filters/client_channel/client_channel_channelz.h" |
||||
#include "src/core/lib/gpr/useful.h" |
||||
#include "src/core/lib/surface/channel.h" |
||||
#include "src/core/lib/transport/connectivity_state.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace channelz { |
||||
namespace { |
||||
|
||||
void* client_channel_channelz_copy(void* p) { return p; } |
||||
|
||||
void client_channel_channelz_destroy(void* p) {} |
||||
|
||||
int client_channel_channelz_cmp(void* a, void* b) { return GPR_ICMP(a, b); } |
||||
|
||||
} // namespace
|
||||
|
||||
static const grpc_arg_pointer_vtable client_channel_channelz_vtable = { |
||||
client_channel_channelz_copy, client_channel_channelz_destroy, |
||||
client_channel_channelz_cmp}; |
||||
|
||||
ClientChannelNode::ClientChannelNode(grpc_channel* channel, |
||||
size_t channel_tracer_max_nodes, |
||||
bool is_top_level_channel) |
||||
: ChannelNode(channel, channel_tracer_max_nodes, is_top_level_channel) { |
||||
client_channel_ = |
||||
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel)); |
||||
GPR_ASSERT(client_channel_->filter == &grpc_client_channel_filter); |
||||
} |
||||
|
||||
void ClientChannelNode::PopulateConnectivityState(grpc_json* json) { |
||||
grpc_connectivity_state state; |
||||
if (ChannelIsDestroyed()) { |
||||
state = GRPC_CHANNEL_SHUTDOWN; |
||||
} else { |
||||
state = |
||||
grpc_client_channel_check_connectivity_state(client_channel_, false); |
||||
} |
||||
json = grpc_json_create_child(nullptr, json, "state", nullptr, |
||||
GRPC_JSON_OBJECT, false); |
||||
grpc_json_create_child(nullptr, json, "state", |
||||
grpc_connectivity_state_name(state), GRPC_JSON_STRING, |
||||
false); |
||||
} |
||||
|
||||
void ClientChannelNode::PopulateChildRefs(grpc_json* json) { |
||||
ChildRefsList child_subchannels; |
||||
ChildRefsList child_channels; |
||||
grpc_json* json_iterator = nullptr; |
||||
grpc_client_channel_populate_child_refs(client_channel_, &child_subchannels, |
||||
&child_channels); |
||||
if (!child_subchannels.empty()) { |
||||
grpc_json* array_parent = grpc_json_create_child( |
||||
nullptr, json, "subchannelRef", nullptr, GRPC_JSON_ARRAY, false); |
||||
for (size_t i = 0; i < child_subchannels.size(); ++i) { |
||||
json_iterator = |
||||
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, |
||||
GRPC_JSON_OBJECT, false); |
||||
grpc_json_add_number_string_child(json_iterator, nullptr, "subchannelId", |
||||
child_subchannels[i]); |
||||
} |
||||
} |
||||
if (!child_channels.empty()) { |
||||
grpc_json* array_parent = grpc_json_create_child( |
||||
nullptr, json, "channelRef", nullptr, GRPC_JSON_ARRAY, false); |
||||
json_iterator = nullptr; |
||||
for (size_t i = 0; i < child_channels.size(); ++i) { |
||||
json_iterator = |
||||
grpc_json_create_child(json_iterator, array_parent, nullptr, nullptr, |
||||
GRPC_JSON_OBJECT, false); |
||||
grpc_json_add_number_string_child(json_iterator, nullptr, "channelId", |
||||
child_channels[i]); |
||||
} |
||||
} |
||||
} |
||||
|
||||
grpc_arg ClientChannelNode::CreateChannelArg() { |
||||
return grpc_channel_arg_pointer_create( |
||||
const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_NODE_CREATION_FUNC), |
||||
reinterpret_cast<void*>(MakeClientChannelNode), |
||||
&client_channel_channelz_vtable); |
||||
} |
||||
|
||||
RefCountedPtr<ChannelNode> ClientChannelNode::MakeClientChannelNode( |
||||
grpc_channel* channel, size_t channel_tracer_max_nodes, |
||||
bool is_top_level_channel) { |
||||
return MakePolymorphicRefCounted<ChannelNode, ClientChannelNode>( |
||||
channel, channel_tracer_max_nodes, is_top_level_channel); |
||||
} |
||||
|
||||
} // namespace channelz
|
||||
} // namespace grpc_core
|
@ -0,0 +1,71 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_CHANNELZ_H |
||||
#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_CHANNELZ_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/channelz.h" |
||||
#include "src/core/lib/gprpp/inlined_vector.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
// TODO(ncteisen), this only contains the uuids of the children for now,
|
||||
// since that is all that is strictly needed. In a future enhancement we will
|
||||
// add human readable names as in the channelz.proto
|
||||
typedef InlinedVector<intptr_t, 10> ChildRefsList; |
||||
|
||||
namespace channelz { |
||||
|
||||
// Subtype of ChannelNode that overrides and provides client_channel specific
|
||||
// functionality like querying for connectivity_state and subchannel data.
|
||||
class ClientChannelNode : public ChannelNode { |
||||
public: |
||||
static RefCountedPtr<ChannelNode> MakeClientChannelNode( |
||||
grpc_channel* channel, size_t channel_tracer_max_nodes, |
||||
bool is_top_level_channel); |
||||
|
||||
// Override this functionality since client_channels have a notion of
|
||||
// channel connectivity.
|
||||
void PopulateConnectivityState(grpc_json* json) override; |
||||
|
||||
// Override this functionality since client_channels have subchannels
|
||||
void PopulateChildRefs(grpc_json* json) override; |
||||
|
||||
// Helper to create a channel arg to ensure this type of ChannelNode is
|
||||
// created.
|
||||
static grpc_arg CreateChannelArg(); |
||||
|
||||
protected: |
||||
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE |
||||
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW |
||||
ClientChannelNode(grpc_channel* channel, size_t channel_tracer_max_nodes, |
||||
bool is_top_level_channel); |
||||
virtual ~ClientChannelNode() {} |
||||
|
||||
private: |
||||
grpc_channel_element* client_channel_; |
||||
}; |
||||
|
||||
} // namespace channelz
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_CHANNELZ_H */ |
@ -0,0 +1,370 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h" |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
void LoadReporterAsyncServiceImpl::CallableTag::Run(bool ok) { |
||||
GPR_ASSERT(handler_function_ != nullptr); |
||||
GPR_ASSERT(handler_ != nullptr); |
||||
handler_function_(std::move(handler_), ok); |
||||
} |
||||
|
||||
LoadReporterAsyncServiceImpl::LoadReporterAsyncServiceImpl( |
||||
std::unique_ptr<ServerCompletionQueue> cq) |
||||
: cq_(std::move(cq)) { |
||||
thread_ = std::unique_ptr<::grpc_core::Thread>( |
||||
new ::grpc_core::Thread("server_load_reporting", Work, this)); |
||||
std::unique_ptr<CpuStatsProvider> cpu_stats_provider = nullptr; |
||||
#if defined(GPR_LINUX) || defined(GPR_WINDOWS) || defined(GPR_APPLE) |
||||
cpu_stats_provider.reset(new CpuStatsProviderDefaultImpl()); |
||||
#endif |
||||
load_reporter_ = std::unique_ptr<LoadReporter>(new LoadReporter( |
||||
kFeedbackSampleWindowSeconds, |
||||
std::unique_ptr<CensusViewProvider>(new CensusViewProviderDefaultImpl()), |
||||
std::move(cpu_stats_provider))); |
||||
} |
||||
|
||||
LoadReporterAsyncServiceImpl::~LoadReporterAsyncServiceImpl() { |
||||
// We will reach here after the server starts shutting down.
|
||||
shutdown_ = true; |
||||
{ |
||||
std::unique_lock<std::mutex> lock(cq_shutdown_mu_); |
||||
cq_->Shutdown(); |
||||
} |
||||
if (next_fetch_and_sample_alarm_ != nullptr) |
||||
next_fetch_and_sample_alarm_->Cancel(); |
||||
thread_->Join(); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ScheduleNextFetchAndSample() { |
||||
auto next_fetch_and_sample_time = |
||||
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
||||
gpr_time_from_millis(kFetchAndSampleIntervalSeconds * 1000, |
||||
GPR_TIMESPAN)); |
||||
{ |
||||
std::unique_lock<std::mutex> lock(cq_shutdown_mu_); |
||||
if (shutdown_) return; |
||||
// TODO(juanlishen): Improve the Alarm implementation to reuse a single
|
||||
// instance for multiple events.
|
||||
next_fetch_and_sample_alarm_.reset(new Alarm); |
||||
next_fetch_and_sample_alarm_->Set(cq_.get(), next_fetch_and_sample_time, |
||||
this); |
||||
} |
||||
gpr_log(GPR_DEBUG, "[LRS %p] Next fetch-and-sample scheduled.", this); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::FetchAndSample(bool ok) { |
||||
if (!ok) { |
||||
gpr_log(GPR_INFO, "[LRS %p] Fetch-and-sample is stopped.", this); |
||||
return; |
||||
} |
||||
gpr_log(GPR_DEBUG, "[LRS %p] Starting a fetch-and-sample...", this); |
||||
load_reporter_->FetchAndSample(); |
||||
ScheduleNextFetchAndSample(); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::Work(void* arg) { |
||||
LoadReporterAsyncServiceImpl* service = |
||||
reinterpret_cast<LoadReporterAsyncServiceImpl*>(arg); |
||||
service->FetchAndSample(true /* ok */); |
||||
// TODO(juanlishen): This is a workaround to wait for the cq to be ready. Need
|
||||
// to figure out why cq is not ready after service starts.
|
||||
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), |
||||
gpr_time_from_seconds(1, GPR_TIMESPAN))); |
||||
ReportLoadHandler::CreateAndStart(service->cq_.get(), service, |
||||
service->load_reporter_.get()); |
||||
void* tag; |
||||
bool ok; |
||||
while (true) { |
||||
if (!service->cq_->Next(&tag, &ok)) { |
||||
// The completion queue is shutting down.
|
||||
GPR_ASSERT(service->shutdown_); |
||||
break; |
||||
} |
||||
if (tag == service) { |
||||
service->FetchAndSample(ok); |
||||
} else { |
||||
auto* next_step = static_cast<CallableTag*>(tag); |
||||
next_step->Run(ok); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::StartThread() { thread_->Start(); } |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::CreateAndStart( |
||||
ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service, |
||||
LoadReporter* load_reporter) { |
||||
std::shared_ptr<ReportLoadHandler> handler = |
||||
std::make_shared<ReportLoadHandler>(cq, service, load_reporter); |
||||
ReportLoadHandler* p = handler.get(); |
||||
{ |
||||
std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_); |
||||
if (service->shutdown_) return; |
||||
p->on_done_notified_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::OnDoneNotified, p, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
handler); |
||||
p->next_inbound_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::OnRequestDelivered, p, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(handler)); |
||||
p->ctx_.AsyncNotifyWhenDone(&p->on_done_notified_); |
||||
service->RequestReportLoad(&p->ctx_, &p->stream_, cq, cq, |
||||
&p->next_inbound_); |
||||
} |
||||
} |
||||
|
||||
LoadReporterAsyncServiceImpl::ReportLoadHandler::ReportLoadHandler( |
||||
ServerCompletionQueue* cq, LoadReporterAsyncServiceImpl* service, |
||||
LoadReporter* load_reporter) |
||||
: cq_(cq), |
||||
service_(service), |
||||
load_reporter_(load_reporter), |
||||
stream_(&ctx_), |
||||
call_status_(WAITING_FOR_DELIVERY) {} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnRequestDelivered( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
if (ok) { |
||||
call_status_ = DELIVERED; |
||||
} else { |
||||
// AsyncNotifyWhenDone() needs to be called before the call starts, but the
|
||||
// tag will not pop out if the call never starts (
|
||||
// https://github.com/grpc/grpc/issues/10136). So we need to manually
|
||||
// release the ownership of the handler in this case.
|
||||
GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr); |
||||
} |
||||
if (!ok || shutdown_) { |
||||
// The value of ok being false means that the server is shutting down.
|
||||
Shutdown(std::move(self), "OnRequestDelivered"); |
||||
return; |
||||
} |
||||
// Spawn a new handler instance to serve the next new client. Every handler
|
||||
// instance will deallocate itself when it's done.
|
||||
CreateAndStart(cq_, service_, load_reporter_); |
||||
{ |
||||
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
||||
if (service_->shutdown_) { |
||||
lock.release()->unlock(); |
||||
Shutdown(std::move(self), "OnRequestDelivered"); |
||||
return; |
||||
} |
||||
next_inbound_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(self)); |
||||
stream_.Read(&request_, &next_inbound_); |
||||
} |
||||
// LB ID is unique for each load reporting stream.
|
||||
lb_id_ = load_reporter_->GenerateLbId(); |
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Call request delivered (lb_id_: %s, handler: %p). " |
||||
"Start reading the initial request...", |
||||
service_, lb_id_.c_str(), this); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
if (!ok || shutdown_) { |
||||
if (!ok && call_status_ < INITIAL_REQUEST_RECEIVED) { |
||||
// The client may have half-closed the stream or the stream is broken.
|
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Failed reading the initial request from the stream " |
||||
"(lb_id_: %s, handler: %p, done_notified: %d, is_cancelled: %d).", |
||||
service_, lb_id_.c_str(), this, static_cast<int>(done_notified_), |
||||
static_cast<int>(is_cancelled_)); |
||||
} |
||||
Shutdown(std::move(self), "OnReadDone"); |
||||
return; |
||||
} |
||||
// We only receive one request, which is the initial request.
|
||||
if (call_status_ < INITIAL_REQUEST_RECEIVED) { |
||||
if (!request_.has_initial_request()) { |
||||
Shutdown(std::move(self), "OnReadDone+initial_request_not_found"); |
||||
} else { |
||||
call_status_ = INITIAL_REQUEST_RECEIVED; |
||||
const auto& initial_request = request_.initial_request(); |
||||
load_balanced_hostname_ = initial_request.load_balanced_hostname(); |
||||
load_key_ = initial_request.load_key(); |
||||
load_reporter_->ReportStreamCreated(load_balanced_hostname_, lb_id_, |
||||
load_key_); |
||||
const auto& load_report_interval = initial_request.load_report_interval(); |
||||
load_report_interval_ms_ = |
||||
static_cast<uint64_t>(load_report_interval.seconds() * 1000 + |
||||
load_report_interval.nanos() / 1000); |
||||
gpr_log( |
||||
GPR_INFO, |
||||
"[LRS %p] Initial request received. Start load reporting (load " |
||||
"balanced host: %s, interval: %lu ms, lb_id_: %s, handler: %p)...", |
||||
service_, load_balanced_hostname_.c_str(), load_report_interval_ms_, |
||||
lb_id_.c_str(), this); |
||||
SendReport(self, true /* ok */); |
||||
// Expect this read to fail.
|
||||
{ |
||||
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
||||
if (service_->shutdown_) { |
||||
lock.release()->unlock(); |
||||
Shutdown(std::move(self), "OnReadDone"); |
||||
return; |
||||
} |
||||
next_inbound_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::OnReadDone, this, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(self)); |
||||
stream_.Read(&request_, &next_inbound_); |
||||
} |
||||
} |
||||
} else { |
||||
// Another request received! This violates the spec.
|
||||
gpr_log(GPR_ERROR, |
||||
"[LRS %p] Another request received (lb_id_: %s, handler: %p).", |
||||
service_, lb_id_.c_str(), this); |
||||
Shutdown(std::move(self), "OnReadDone+second_request"); |
||||
} |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::ScheduleNextReport( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
if (!ok || shutdown_) { |
||||
Shutdown(std::move(self), "ScheduleNextReport"); |
||||
return; |
||||
} |
||||
auto next_report_time = gpr_time_add( |
||||
gpr_now(GPR_CLOCK_MONOTONIC), |
||||
gpr_time_from_millis(load_report_interval_ms_, GPR_TIMESPAN)); |
||||
{ |
||||
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
||||
if (service_->shutdown_) { |
||||
lock.release()->unlock(); |
||||
Shutdown(std::move(self), "ScheduleNextReport"); |
||||
return; |
||||
} |
||||
next_outbound_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::SendReport, this, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(self)); |
||||
// TODO(juanlishen): Improve the Alarm implementation to reuse a single
|
||||
// instance for multiple events.
|
||||
next_report_alarm_.reset(new Alarm); |
||||
next_report_alarm_->Set(cq_, next_report_time, &next_outbound_); |
||||
} |
||||
gpr_log(GPR_DEBUG, |
||||
"[LRS %p] Next load report scheduled (lb_id_: %s, handler: %p).", |
||||
service_, lb_id_.c_str(), this); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::SendReport( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
if (!ok || shutdown_) { |
||||
Shutdown(std::move(self), "SendReport"); |
||||
return; |
||||
} |
||||
::grpc::lb::v1::LoadReportResponse response; |
||||
auto loads = load_reporter_->GenerateLoads(load_balanced_hostname_, lb_id_); |
||||
response.mutable_load()->Swap(&loads); |
||||
auto feedback = load_reporter_->GenerateLoadBalancingFeedback(); |
||||
response.mutable_load_balancing_feedback()->Swap(&feedback); |
||||
if (call_status_ < INITIAL_RESPONSE_SENT) { |
||||
auto initial_response = response.mutable_initial_response(); |
||||
initial_response->set_load_balancer_id(lb_id_); |
||||
initial_response->set_implementation_id( |
||||
::grpc::lb::v1::InitialLoadReportResponse::CPP); |
||||
initial_response->set_server_version(kVersion); |
||||
call_status_ = INITIAL_RESPONSE_SENT; |
||||
} |
||||
{ |
||||
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
||||
if (service_->shutdown_) { |
||||
lock.release()->unlock(); |
||||
Shutdown(std::move(self), "SendReport"); |
||||
return; |
||||
} |
||||
next_outbound_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::ScheduleNextReport, this, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(self)); |
||||
stream_.Write(response, &next_outbound_); |
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Sending load report (lb_id_: %s, handler: %p, loads " |
||||
"count: %d)...", |
||||
service_, lb_id_.c_str(), this, response.load().size()); |
||||
} |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnDoneNotified( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
GPR_ASSERT(ok); |
||||
done_notified_ = true; |
||||
if (ctx_.IsCancelled()) { |
||||
is_cancelled_ = true; |
||||
} |
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Load reporting call is notified done (handler: %p, " |
||||
"is_cancelled: %d).", |
||||
service_, this, static_cast<int>(is_cancelled_)); |
||||
Shutdown(std::move(self), "OnDoneNotified"); |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::Shutdown( |
||||
std::shared_ptr<ReportLoadHandler> self, const char* reason) { |
||||
if (!shutdown_) { |
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Shutting down the handler (lb_id_: %s, handler: %p, " |
||||
"reason: %s).", |
||||
service_, lb_id_.c_str(), this, reason); |
||||
shutdown_ = true; |
||||
if (call_status_ >= INITIAL_REQUEST_RECEIVED) { |
||||
load_reporter_->ReportStreamClosed(load_balanced_hostname_, lb_id_); |
||||
next_report_alarm_->Cancel(); |
||||
} |
||||
} |
||||
// OnRequestDelivered() may be called after OnDoneNotified(), so we need to
|
||||
// try to Finish() every time we are in Shutdown().
|
||||
if (call_status_ >= DELIVERED && call_status_ < FINISH_CALLED) { |
||||
std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_); |
||||
if (!service_->shutdown_) { |
||||
on_finish_done_ = |
||||
CallableTag(std::bind(&ReportLoadHandler::OnFinishDone, this, |
||||
std::placeholders::_1, std::placeholders::_2), |
||||
std::move(self)); |
||||
// TODO(juanlishen): Maybe add a message proto for the client to
|
||||
// explicitly cancel the stream so that we can return OK status in such
|
||||
// cases.
|
||||
stream_.Finish(Status::CANCELLED, &on_finish_done_); |
||||
call_status_ = FINISH_CALLED; |
||||
} |
||||
} |
||||
} |
||||
|
||||
void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnFinishDone( |
||||
std::shared_ptr<ReportLoadHandler> self, bool ok) { |
||||
if (ok) { |
||||
gpr_log(GPR_INFO, |
||||
"[LRS %p] Load reporting finished (lb_id_: %s, handler: %p).", |
||||
service_, lb_id_.c_str(), this); |
||||
} |
||||
} |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
@ -0,0 +1,194 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H |
||||
#define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpc/support/log.h> |
||||
#include <grpcpp/alarm.h> |
||||
#include <grpcpp/grpcpp.h> |
||||
|
||||
#include "src/core/lib/gprpp/thd.h" |
||||
#include "src/cpp/server/load_reporter/load_reporter.h" |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
// Async load reporting service. It's mainly responsible for controlling the
|
||||
// procedure of incoming requests. The real business logic is handed off to the
|
||||
// LoadReporter. There should be at most one instance of this service on a
|
||||
// server to avoid spreading the load data into multiple places.
|
||||
class LoadReporterAsyncServiceImpl |
||||
: public grpc::lb::v1::LoadReporter::AsyncService { |
||||
public: |
||||
explicit LoadReporterAsyncServiceImpl( |
||||
std::unique_ptr<ServerCompletionQueue> cq); |
||||
~LoadReporterAsyncServiceImpl(); |
||||
|
||||
// Starts the working thread.
|
||||
void StartThread(); |
||||
|
||||
// Not copyable nor movable.
|
||||
LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete; |
||||
LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) = |
||||
delete; |
||||
|
||||
private: |
||||
class ReportLoadHandler; |
||||
|
||||
// A tag that can be called with a bool argument. It's tailored for
|
||||
// ReportLoadHandler's use. Before being used, it should be constructed with a
|
||||
// method of ReportLoadHandler and a shared pointer to the handler. The
|
||||
// shared pointer will be moved to the invoked function and the function can
|
||||
// only be invoked once. That makes ref counting of the handler easier,
|
||||
// because the shared pointer is not bound to the function and can be gone
|
||||
// once the invoked function returns (if not used any more).
|
||||
class CallableTag { |
||||
public: |
||||
using HandlerFunction = |
||||
std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>; |
||||
|
||||
CallableTag() {} |
||||
|
||||
CallableTag(HandlerFunction func, |
||||
std::shared_ptr<ReportLoadHandler> handler) |
||||
: handler_function_(std::move(func)), handler_(std::move(handler)) { |
||||
GPR_ASSERT(handler_function_ != nullptr); |
||||
GPR_ASSERT(handler_ != nullptr); |
||||
} |
||||
|
||||
// Runs the tag. This should be called only once. The handler is no longer
|
||||
// owned by this tag after this method is invoked.
|
||||
void Run(bool ok); |
||||
|
||||
// Releases and returns the shared pointer to the handler.
|
||||
std::shared_ptr<ReportLoadHandler> ReleaseHandler() { |
||||
return std::move(handler_); |
||||
} |
||||
|
||||
private: |
||||
HandlerFunction handler_function_ = nullptr; |
||||
std::shared_ptr<ReportLoadHandler> handler_; |
||||
}; |
||||
|
||||
// Each handler takes care of one load reporting stream. It contains
|
||||
// per-stream data and it will access the members of the parent class (i.e.,
|
||||
// LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
|
||||
class ReportLoadHandler { |
||||
public: |
||||
// Instantiates a ReportLoadHandler and requests the next load reporting
|
||||
// call. The handler object will manage its own lifetime, so no action is
|
||||
// needed from the caller any more regarding that object.
|
||||
static void CreateAndStart(ServerCompletionQueue* cq, |
||||
LoadReporterAsyncServiceImpl* service, |
||||
LoadReporter* load_reporter); |
||||
|
||||
// This ctor is public because we want to use std::make_shared<> in
|
||||
// CreateAndStart(). This ctor shouldn't be used elsewhere.
|
||||
ReportLoadHandler(ServerCompletionQueue* cq, |
||||
LoadReporterAsyncServiceImpl* service, |
||||
LoadReporter* load_reporter); |
||||
|
||||
private: |
||||
// After the handler has a call request delivered, it starts reading the
|
||||
// initial request. Also, a new handler is spawned so that we can keep
|
||||
// servicing future calls.
|
||||
void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
|
||||
// The first Read() is expected to succeed, after which the handler starts
|
||||
// sending load reports back to the balancer. The second Read() is
|
||||
// expected to fail, which happens when the balancer half-closes the
|
||||
// stream to signal that it's no longer interested in the load reports. For
|
||||
// the latter case, the handler will then close the stream.
|
||||
void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
|
||||
// The report sending operations are sequential as: send report -> send
|
||||
// done, schedule the next send -> waiting for the alarm to fire -> alarm
|
||||
// fires, send report -> ...
|
||||
void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
|
||||
// Called when Finish() is done.
|
||||
void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
|
||||
// Called when AsyncNotifyWhenDone() notifies us.
|
||||
void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok); |
||||
|
||||
void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason); |
||||
|
||||
// The key fields of the stream.
|
||||
grpc::string lb_id_; |
||||
grpc::string load_balanced_hostname_; |
||||
grpc::string load_key_; |
||||
uint64_t load_report_interval_ms_; |
||||
|
||||
// The data for RPC communication with the load reportee.
|
||||
ServerContext ctx_; |
||||
::grpc::lb::v1::LoadReportRequest request_; |
||||
|
||||
// The members passed down from LoadReporterAsyncServiceImpl.
|
||||
ServerCompletionQueue* cq_; |
||||
LoadReporterAsyncServiceImpl* service_; |
||||
LoadReporter* load_reporter_; |
||||
ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse, |
||||
::grpc::lb::v1::LoadReportRequest> |
||||
stream_; |
||||
|
||||
// The status of the RPC progress.
|
||||
enum CallStatus { |
||||
WAITING_FOR_DELIVERY, |
||||
DELIVERED, |
||||
INITIAL_REQUEST_RECEIVED, |
||||
INITIAL_RESPONSE_SENT, |
||||
FINISH_CALLED |
||||
} call_status_; |
||||
bool shutdown_{false}; |
||||
bool done_notified_{false}; |
||||
bool is_cancelled_{false}; |
||||
CallableTag on_done_notified_; |
||||
CallableTag on_finish_done_; |
||||
CallableTag next_inbound_; |
||||
CallableTag next_outbound_; |
||||
std::unique_ptr<Alarm> next_report_alarm_; |
||||
}; |
||||
|
||||
// Handles the incoming requests and drives the completion queue in a loop.
|
||||
static void Work(void* arg); |
||||
|
||||
// Schedules the next data fetching from Census and LB feedback sampling.
|
||||
void ScheduleNextFetchAndSample(); |
||||
|
||||
// Fetches data from Census and samples LB feedback.
|
||||
void FetchAndSample(bool ok); |
||||
|
||||
std::unique_ptr<ServerCompletionQueue> cq_; |
||||
// To synchronize the operations related to shutdown state of cq_, so that we
|
||||
// don't enqueue new tags into cq_ after it is already shut down.
|
||||
std::mutex cq_shutdown_mu_; |
||||
std::atomic_bool shutdown_{false}; |
||||
std::unique_ptr<::grpc_core::Thread> thread_; |
||||
std::unique_ptr<LoadReporter> load_reporter_; |
||||
std::unique_ptr<Alarm> next_fetch_and_sample_alarm_; |
||||
}; |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
|
@ -0,0 +1,41 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpcpp/ext/server_load_reporting.h> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h" |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
namespace experimental { |
||||
|
||||
void LoadReportingServiceServerBuilderOption::UpdateArguments( |
||||
::grpc::ChannelArguments* args) { |
||||
args->SetInt(GRPC_ARG_ENABLE_LOAD_REPORTING, true); |
||||
} |
||||
|
||||
void LoadReportingServiceServerBuilderOption::UpdatePlugins( |
||||
std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>* plugins) { |
||||
plugins->emplace_back(new LoadReportingServiceServerBuilderPlugin()); |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
@ -0,0 +1,60 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_reporting_service_server_builder_plugin.h" |
||||
|
||||
#include <grpcpp/impl/server_initializer.h> |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
bool LoadReportingServiceServerBuilderPlugin::has_sync_methods() const { |
||||
if (service_ != nullptr) { |
||||
return service_->has_synchronous_methods(); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
bool LoadReportingServiceServerBuilderPlugin::has_async_methods() const { |
||||
if (service_ != nullptr) { |
||||
return service_->has_async_methods(); |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
void LoadReportingServiceServerBuilderPlugin::UpdateServerBuilder( |
||||
grpc::ServerBuilder* builder) { |
||||
auto cq = builder->AddCompletionQueue(); |
||||
service_ = std::make_shared<LoadReporterAsyncServiceImpl>(std::move(cq)); |
||||
} |
||||
|
||||
void LoadReportingServiceServerBuilderPlugin::InitServer( |
||||
grpc::ServerInitializer* si) { |
||||
si->RegisterService(service_); |
||||
} |
||||
|
||||
void LoadReportingServiceServerBuilderPlugin::Finish( |
||||
grpc::ServerInitializer* si) { |
||||
service_->StartThread(); |
||||
service_.reset(); |
||||
} |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
@ -0,0 +1,62 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#ifndef GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H |
||||
#define GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <grpcpp/impl/server_builder_plugin.h> |
||||
|
||||
#include "src/cpp/server/load_reporter/load_reporter_async_service_impl.h" |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
|
||||
// The plugin that registers and starts load reporting service when starting a
|
||||
// server.
|
||||
class LoadReportingServiceServerBuilderPlugin : public ServerBuilderPlugin { |
||||
public: |
||||
~LoadReportingServiceServerBuilderPlugin() override = default; |
||||
grpc::string name() override { return "load_reporting_service"; } |
||||
|
||||
// Creates a load reporting service.
|
||||
void UpdateServerBuilder(grpc::ServerBuilder* builder) override; |
||||
|
||||
// Registers the load reporter service.
|
||||
void InitServer(grpc::ServerInitializer* si) override; |
||||
|
||||
// Starts the load reporter service.
|
||||
void Finish(grpc::ServerInitializer* si) override; |
||||
|
||||
void ChangeArguments(const grpc::string& name, void* value) override {} |
||||
void UpdateChannelArguments(grpc::ChannelArguments* args) override {} |
||||
bool has_sync_methods() const override; |
||||
bool has_async_methods() const override; |
||||
|
||||
private: |
||||
std::shared_ptr<LoadReporterAsyncServiceImpl> service_; |
||||
}; |
||||
|
||||
std::unique_ptr<grpc::ServerBuilderPlugin> |
||||
CreateLoadReportingServiceServerBuilderPlugin(); |
||||
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_LOAD_REPORTING_SERVICE_SERVER_BUILDER_PLUGIN_H
|
@ -0,0 +1,45 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/impl/codegen/port_platform.h> |
||||
|
||||
#include <grpcpp/ext/server_load_reporting.h> |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
namespace grpc { |
||||
namespace load_reporter { |
||||
namespace experimental { |
||||
|
||||
void AddLoadReportingCost(grpc::ServerContext* ctx, |
||||
const grpc::string& cost_name, double cost_value) { |
||||
if (std::isnormal(cost_value)) { |
||||
grpc::string buf; |
||||
buf.resize(sizeof(cost_value) + cost_name.size()); |
||||
memcpy(&(*buf.begin()), &cost_value, sizeof(cost_value)); |
||||
memcpy(&(*buf.begin()) + sizeof(cost_value), cost_name.data(), |
||||
cost_name.size()); |
||||
ctx->AddTrailingMetadata(GRPC_LB_COST_MD_KEY, buf); |
||||
} else { |
||||
gpr_log(GPR_ERROR, "Call metric value is not normal."); |
||||
} |
||||
} |
||||
|
||||
} // namespace experimental
|
||||
} // namespace load_reporter
|
||||
} // namespace grpc
|
@ -0,0 +1,33 @@ |
||||
<?php |
||||
/* |
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
namespace Grpc; |
||||
|
||||
/** |
||||
* CallInvoker is used to pass the self defined channel into the stub, |
||||
* while intercept each RPC with the channel accessible. |
||||
* THIS IS AN EXPERIMENTAL API. |
||||
*/ |
||||
interface CallInvoker |
||||
{ |
||||
public function createChannelFactory($hostname, $opts); |
||||
public function UnaryCall($channel, $method, $deserialize, $options); |
||||
public function ClientStreamingCall($channel, $method, $deserialize, $options); |
||||
public function ServerStreamingCall($channel, $method, $deserialize, $options); |
||||
public function BidiStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
@ -0,0 +1,47 @@ |
||||
<?php |
||||
/* |
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
namespace Grpc; |
||||
|
||||
/** |
||||
* Default call invoker in the gRPC stub. |
||||
* THIS IS AN EXPERIMENTAL API. |
||||
*/ |
||||
class DefaultCallInvoker implements CallInvoker |
||||
{ |
||||
public function createChannelFactory($hostname, $opts) { |
||||
return new Channel($hostname, $opts); |
||||
} |
||||
|
||||
public function UnaryCall($channel, $method, $deserialize, $options) { |
||||
return new UnaryCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ClientStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ClientStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ServerStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ServerStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function BidiStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new BidiStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
} |
||||
|
@ -0,0 +1,227 @@ |
||||
<?php |
||||
/* |
||||
* |
||||
* Copyright 2018 gRPC authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
/** |
||||
* Interface exported by the server. |
||||
*/ |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/BaseStub.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/AbstractCall.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/UnaryCall.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/ClientStreamingCall.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/Interceptor.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/CallInvoker.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/DefaultCallInvoker.php'); |
||||
require_once(dirname(__FILE__).'/../../lib/Grpc/Internal/InterceptorChannel.php'); |
||||
|
||||
class CallInvokerSimpleRequest |
||||
{ |
||||
private $data; |
||||
public function __construct($data) |
||||
{ |
||||
$this->data = $data; |
||||
} |
||||
public function setData($data) |
||||
{ |
||||
$this->data = $data; |
||||
} |
||||
public function serializeToString() |
||||
{ |
||||
return $this->data; |
||||
} |
||||
} |
||||
|
||||
class CallInvokerClient extends Grpc\BaseStub |
||||
{ |
||||
|
||||
/** |
||||
* @param string $hostname hostname |
||||
* @param array $opts channel options |
||||
* @param Channel|InterceptorChannel $channel (optional) re-use channel object |
||||
*/ |
||||
public function __construct($hostname, $opts, $channel = null) |
||||
{ |
||||
parent::__construct($hostname, $opts, $channel); |
||||
} |
||||
|
||||
/** |
||||
* A simple RPC. |
||||
* @param SimpleRequest $argument input argument |
||||
* @param array $metadata metadata |
||||
* @param array $options call options |
||||
*/ |
||||
public function UnaryCall( |
||||
CallInvokerSimpleRequest $argument, |
||||
$metadata = [], |
||||
$options = [] |
||||
) { |
||||
return $this->_simpleRequest( |
||||
'/dummy_method', |
||||
$argument, |
||||
[], |
||||
$metadata, |
||||
$options |
||||
); |
||||
} |
||||
} |
||||
|
||||
class CallInvokerUpdateChannel implements \Grpc\CallInvoker |
||||
{ |
||||
private $channel; |
||||
|
||||
public function getChannel() { |
||||
return $this->channel; |
||||
} |
||||
|
||||
public function createChannelFactory($hostname, $opts) { |
||||
$this->channel = new \Grpc\Channel('localhost:50050', $opts); |
||||
return $this->channel; |
||||
} |
||||
|
||||
public function UnaryCall($channel, $method, $deserialize, $options) { |
||||
return new UnaryCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ClientStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ClientStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ServerStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ServerStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function BidiStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new BidiStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
} |
||||
|
||||
|
||||
class CallInvokerChangeRequest implements \Grpc\CallInvoker |
||||
{ |
||||
private $channel; |
||||
|
||||
public function getChannel() { |
||||
return $this->channel; |
||||
} |
||||
public function createChannelFactory($hostname, $opts) { |
||||
$this->channel = new \Grpc\Channel($hostname, $opts); |
||||
return $this->channel; |
||||
} |
||||
|
||||
public function UnaryCall($channel, $method, $deserialize, $options) { |
||||
return new CallInvokerChangeRequestCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ClientStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ClientStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function ServerStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new ServerStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function BidiStreamingCall($channel, $method, $deserialize, $options) { |
||||
return new BidiStreamingCall($channel, $method, $deserialize, $options); |
||||
} |
||||
} |
||||
|
||||
class CallInvokerChangeRequestCall |
||||
{ |
||||
private $call; |
||||
|
||||
public function __construct($channel, $method, $deserialize, $options) |
||||
{ |
||||
$this->call = new \Grpc\UnaryCall($channel, $method, $deserialize, $options); |
||||
} |
||||
|
||||
public function start($argument, $metadata, $options) { |
||||
$argument->setData('intercepted_unary_request'); |
||||
$this->call->start($argument, $metadata, $options); |
||||
} |
||||
|
||||
public function wait() |
||||
{ |
||||
return $this->call->wait(); |
||||
} |
||||
} |
||||
|
||||
class CallInvokerTest extends PHPUnit_Framework_TestCase |
||||
{ |
||||
public function setUp() |
||||
{ |
||||
$this->server = new Grpc\Server([]); |
||||
$this->port = $this->server->addHttp2Port('0.0.0.0:0'); |
||||
$this->server->start(); |
||||
} |
||||
|
||||
public function tearDown() |
||||
{ |
||||
unset($this->server); |
||||
} |
||||
|
||||
public function testCreateDefaultCallInvoker() |
||||
{ |
||||
$call_invoker = new \Grpc\DefaultCallInvoker(); |
||||
} |
||||
|
||||
public function testCreateCallInvoker() |
||||
{ |
||||
$call_invoker = new CallInvokerUpdateChannel(); |
||||
} |
||||
|
||||
public function testCallInvokerAccessChannel() |
||||
{ |
||||
$call_invoker = new CallInvokerUpdateChannel(); |
||||
$stub = new \Grpc\BaseStub('localhost:50051', |
||||
['credentials' => \Grpc\ChannelCredentials::createInsecure(), |
||||
'grpc_call_invoker' => $call_invoker]); |
||||
$this->assertEquals($call_invoker->getChannel()->getTarget(), 'localhost:50050'); |
||||
$call_invoker->getChannel()->close(); |
||||
} |
||||
|
||||
public function testClientChangeRequestCallInvoker() |
||||
{ |
||||
$req_text = 'client_request'; |
||||
$call_invoker = new CallInvokerChangeRequest(); |
||||
$client = new CallInvokerClient('localhost:'.$this->port, [ |
||||
'force_new' => true, |
||||
'credentials' => Grpc\ChannelCredentials::createInsecure(), |
||||
'grpc_call_invoker' => $call_invoker, |
||||
]); |
||||
|
||||
$req = new CallInvokerSimpleRequest($req_text); |
||||
$unary_call = $client->UnaryCall($req); |
||||
|
||||
$event = $this->server->requestCall(); |
||||
$this->assertSame('/dummy_method', $event->method); |
||||
$server_call = $event->call; |
||||
$event = $server_call->startBatch([ |
||||
Grpc\OP_SEND_INITIAL_METADATA => [], |
||||
Grpc\OP_SEND_STATUS_FROM_SERVER => [ |
||||
'metadata' => [], |
||||
'code' => Grpc\STATUS_OK, |
||||
'details' => '', |
||||
], |
||||
Grpc\OP_RECV_MESSAGE => true, |
||||
Grpc\OP_RECV_CLOSE_ON_SERVER => true, |
||||
]); |
||||
$this->assertSame('intercepted_unary_request', $event->message); |
||||
$call_invoker->getChannel()->close(); |
||||
unset($unary_call); |
||||
unset($server_call); |
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue