@ -24,6 +24,7 @@
# include <vector>
# include "absl/log/check.h"
# include "absl/log/log.h"
# include "absl/types/optional.h"
# include <grpc/support/log.h>
@ -233,16 +234,16 @@ class AdsServiceImpl
Status StreamAggregatedResources ( ServerContext * context ,
Stream * stream ) override {
gpr_log ( GPR_INFO , " ADS[%s]: StreamAggregatedResources starts " ,
debug_label_ . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: StreamAggregatedResources starts " ;
{
grpc_core : : MutexLock lock ( & ads_mu_ ) ;
if ( forced_ads_failure_ . has_value ( ) ) {
gpr_log ( GPR_INFO ,
" ADS[%s ]: StreamAggregatedResources forcing early failure "
" with status code: %d, message: %s " ,
debug_label_ . c_str ( ) , forced_ads_failure_ . value ( ) . error_code ( ) ,
forced_ads_failure_ . value ( ) . error_message ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: StreamAggregatedResources forcing early failure "
" with status code: "
< < forced_ads_failure_ . value ( ) . error_code ( ) < < " , message: "
< < forced_ads_failure_ . value ( ) . error_message ( ) ;
return forced_ads_failure_ . value ( ) ;
}
}
@ -283,10 +284,9 @@ class AdsServiceImpl
DiscoveryRequest request = std : : move ( requests . front ( ) ) ;
requests . pop_front ( ) ;
did_work = true ;
gpr_log ( GPR_INFO ,
" ADS[%s]: Received request for type %s with content %s " ,
debug_label_ . c_str ( ) , request . type_url ( ) . c_str ( ) ,
request . DebugString ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_ < < " ]: Received request for type "
< < request . type_url ( ) < < " with content "
< < request . DebugString ( ) ;
SentState & sent_state = sent_state_map [ request . type_url ( ) ] ;
// Process request.
ProcessRequest ( request , & update_queue , & subscription_map , & sent_state ,
@ -294,8 +294,8 @@ class AdsServiceImpl
}
}
if ( response . has_value ( ) ) {
gpr_log ( GPR_INFO , " ADS[%s]: Sending response: %s " , debug_label_ . c_str ( ) ,
response - > DebugString ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: Sending response: " < < response - > DebugString ( ) ;
stream - > Write ( response . value ( ) ) ;
}
response . reset ( ) ;
@ -315,8 +315,8 @@ class AdsServiceImpl
}
}
if ( response . has_value ( ) ) {
gpr_log ( GPR_INFO , " ADS[%s]: Sending update response: %s " ,
debug_label_ . c_str ( ) , response - > DebugString ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: Sending update response: " < < response - > DebugString ( ) ;
stream - > Write ( response . value ( ) ) ;
}
{
@ -350,8 +350,7 @@ class AdsServiceImpl
}
}
}
gpr_log ( GPR_INFO , " ADS[%s]: StreamAggregatedResources done " ,
debug_label_ . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_ < < " ]: StreamAggregatedResources done " ;
RemoveClient ( context - > peer ( ) ) ;
return Status : : OK ;
}
@ -382,9 +381,9 @@ class AdsServiceImpl
ResponseState response_state ;
if ( ! request . has_error_detail ( ) ) {
response_state . state = ResponseState : : ACKED ;
gpr_log ( GPR_INFO , " ADS[%s]: client ACKed resource_type=%s version=%s " ,
debug_label_ . c_str ( ) , request . type_url ( ) . c_str ( ) ,
request . version_info ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: client ACKed resource_type= " < < request . type_url ( )
< < " version= " < < request . version_info ( ) ;
} else {
response_state . state = ResponseState : : NACKED ;
if ( check_nack_status_code_ ! = nullptr ) {
@ -392,11 +391,10 @@ class AdsServiceImpl
static_cast < absl : : StatusCode > ( request . error_detail ( ) . code ( ) ) ) ;
}
response_state . error_message = request . error_detail ( ) . message ( ) ;
gpr_log ( GPR_INFO ,
" ADS[%s]: client NACKed resource_type=%s version=%s: %s " ,
debug_label_ . c_str ( ) , request . type_url ( ) . c_str ( ) ,
request . version_info ( ) . c_str ( ) ,
response_state . error_message . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: client NACKed resource_type= " < < request . type_url ( )
< < " version= " < < request . version_info ( ) < < " : "
< < response_state . error_message ;
}
resource_type_response_state_ [ request . type_url ( ) ] . emplace_back (
std : : move ( response_state ) ) ;
@ -426,9 +424,9 @@ class AdsServiceImpl
& resource_state , update_queue ) | |
ClientNeedsResourceUpdate ( resource_type_state , resource_state ,
sent_state - > resource_type_version ) ) {
gpr_log ( GPR_INFO , " ADS[%s]: Sending update for type=%s name=%s " ,
debug_label_ . c_str ( ) , request . type_url ( ) . c_str ( ) ,
resource_name . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: Sending update for type= " < < request . type_url ( )
< < " name= " < < resource_name ;
resources_added_to_response . emplace ( resource_name ) ;
if ( ! response - > has_value ( ) ) response - > emplace ( ) ;
if ( resource_state . resource . has_value ( ) ) {
@ -441,10 +439,9 @@ class AdsServiceImpl
}
}
} else {
gpr_log ( GPR_INFO ,
" ADS[%s]: client does not need update for type=%s name=%s " ,
debug_label_ . c_str ( ) , request . type_url ( ) . c_str ( ) ,
resource_name . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: client does not need update for type= "
< < request . type_url ( ) < < " name= " < < resource_name ;
}
}
// Process unsubscriptions for any resource no longer
@ -467,8 +464,9 @@ class AdsServiceImpl
SubscriptionMap * subscription_map , SentState * sent_state ,
absl : : optional < DiscoveryResponse > * response )
ABSL_EXCLUSIVE_LOCKS_REQUIRED ( ads_mu_ ) {
gpr_log ( GPR_INFO , " ADS[%s]: Received update for type=%s name=%s " ,
debug_label_ . c_str ( ) , resource_type . c_str ( ) , resource_name . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: Received update for type= " < < resource_type
< < " name= " < < resource_name ;
auto & subscription_name_map = ( * subscription_map ) [ resource_type ] ;
auto & resource_type_state = resource_map_ [ resource_type ] ;
auto & resource_name_map = resource_type_state . resource_name_map ;
@ -477,9 +475,9 @@ class AdsServiceImpl
ResourceState & resource_state = resource_name_map [ resource_name ] ;
if ( ClientNeedsResourceUpdate ( resource_type_state , resource_state ,
sent_state - > resource_type_version ) ) {
gpr_log ( GPR_INFO , " ADS[%s]: Sending update for type=%s name=%s " ,
debug_label_ . c_str ( ) , resource_type . c_str ( ) ,
resource_name . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_
< < " ]: Sending update for type= " < < resource_type
< < " name= " < < resource_name ;
response - > emplace ( ) ;
if ( resource_state . resource . has_value ( ) ) {
auto * resource = ( * response ) - > add_resources ( ) ;
@ -510,8 +508,7 @@ class AdsServiceImpl
requests - > emplace_back ( std : : move ( request ) ) ;
}
}
gpr_log ( GPR_INFO , " ADS[%s]: Null read, stream closed " ,
debug_label_ . c_str ( ) ) ;
LOG ( INFO ) < < " ADS[ " < < debug_label_ < < " ]: Null read, stream closed " ;
grpc_core : : MutexLock lock ( & ads_mu_ ) ;
* stream_closed = true ;
}
@ -751,7 +748,7 @@ class LrsServiceImpl
using Stream = ServerReaderWriter < LoadStatsResponse , LoadStatsRequest > ;
Status StreamLoadStats ( ServerContext * /*context*/ , Stream * stream ) override {
gpr_log ( GPR_INFO , " LRS[%s]: StreamLoadStats starts " , debug_label_ . c_str ( ) ) ;
LOG ( INFO ) < < " LRS[ " < < debug_label_ < < " ]: StreamLoadStats starts " ;
if ( stream_started_callback_ ! = nullptr ) stream_started_callback_ ( ) ;
// Take a reference of the LrsServiceImpl object, reference will go
// out of scope after this method exits.
@ -778,8 +775,9 @@ class LrsServiceImpl
// Wait for report.
request . Clear ( ) ;
while ( stream - > Read ( & request ) ) {
gpr_log ( GPR_INFO , " LRS[%s]: received client load report message: %s " ,
debug_label_ . c_str ( ) , request . DebugString ( ) . c_str ( ) ) ;
LOG ( INFO ) < < " LRS[ " < < debug_label_
< < " ]: received client load report message: "
< < request . DebugString ( ) ;
std : : vector < ClientStats > stats ;
for ( const auto & cluster_stats : request . cluster_stats ( ) ) {
stats . emplace_back ( cluster_stats ) ;
@ -796,7 +794,7 @@ class LrsServiceImpl
lrs_cv_ . Wait ( & lrs_mu_ ) ;
}
}
gpr_log ( GPR_INFO , " LRS[%s]: StreamLoadStats done " , debug_label_ . c_str ( ) ) ;
LOG ( INFO ) < < " LRS[ " < < debug_label_ < < " ]: StreamLoadStats done " ;
return Status : : OK ;
}