@ -16,16 +16,20 @@
*
*/
# include <sys/types.h>
# include <chrono>
# include <condition_variable>
# include <iostream>
# include <memory>
# include <mutex >
# include <optional >
# include <string>
# include "absl/flags/flag.h"
# include "absl/flags/parse.h"
# include "absl/strings/str_join.h"
# include "absl/strings/str_split.h"
# include "absl/types/optional.h"
# include "opentelemetry/exporters/prometheus/exporter_factory.h"
# include "opentelemetry/exporters/prometheus/exporter_options.h"
# include "opentelemetry/sdk/metrics/meter_provider.h"
@ -41,6 +45,8 @@
# endif
ABSL_FLAG ( std : : string , target , " xds:///helloworld:50051 " , " Target string " ) ;
ABSL_FLAG ( std : : string , cookie_name , " GSSA " , " Cookie name " ) ;
ABSL_FLAG ( uint , delay_s , 5 , " Delay between requests " ) ;
using grpc : : Channel ;
using grpc : : ClientContext ;
@ -49,15 +55,13 @@ using helloworld::Greeter;
using helloworld : : HelloReply ;
using helloworld : : HelloRequest ;
namespace {
struct Cookie {
std : : string name ;
std : : string value ;
std : : set < std : : string > attributes ;
std : : pair < std : : string , std : : string > Header ( ) const {
return std : : make_pair ( " cookie " , absl : : StrFormat ( " %s=%s " , name , value ) ) ;
}
template < typename Sink >
friend void AbslStringify ( Sink & sink , const Cookie & cookie ) {
absl : : Format ( & sink , " (Cookie: %s, value: %s, attributes: {%s}) " ,
@ -66,49 +70,45 @@ struct Cookie {
}
} ;
class GreeterClient {
protected :
static Cookie ParseCookie ( absl : : string_view header ) {
Cookie cookie ;
std : : pair < absl : : string_view , absl : : string_view > name_value =
absl : : StrSplit ( header , absl : : MaxSplits ( ' = ' , 1 ) ) ;
cookie . name = std : : string ( name_value . first ) ;
std : : pair < absl : : string_view , absl : : string_view > value_attrs =
absl : : StrSplit ( name_value . second , absl : : MaxSplits ( ' ; ' , 1 ) ) ;
cookie . value = std : : string ( value_attrs . first ) ;
for ( absl : : string_view segment : absl : : StrSplit ( value_attrs . second , ' ; ' ) ) {
cookie . attributes . emplace ( absl : : StripAsciiWhitespace ( segment ) ) ;
}
return cookie ;
Cookie ParseCookie ( absl : : string_view header ) {
Cookie cookie ;
std : : pair < absl : : string_view , absl : : string_view > name_value =
absl : : StrSplit ( header , absl : : MaxSplits ( ' = ' , 1 ) ) ;
cookie . name = std : : string ( name_value . first ) ;
std : : pair < absl : : string_view , absl : : string_view > value_attrs =
absl : : StrSplit ( name_value . second , absl : : MaxSplits ( ' ; ' , 1 ) ) ;
cookie . value = std : : string ( value_attrs . first ) ;
for ( absl : : string_view segment : absl : : StrSplit ( value_attrs . second , ' ; ' ) ) {
cookie . attributes . emplace ( absl : : StripAsciiWhitespace ( segment ) ) ;
}
return cookie ;
}
static std : : vector < Cookie > GetCookies (
const std : : multimap < grpc : : string_ref , grpc : : string_ref > &
server_initial_metadata ,
absl : : string_view cookie_name ) {
std : : vector < Cookie > values ;
auto pair = server_initial_metadata . equal_range ( " set-cookie " ) ;
for ( auto it = pair . first ; it ! = pair . second ; + + it ) {
gpr_log ( GPR_INFO , " set-cookie header: %s " , it - > second . data ( ) ) ;
const auto cookie = ParseCookie ( it - > second . data ( ) ) ;
if ( cookie . name = = cookie_name ) {
values . emplace_back ( cookie ) ;
}
std : : vector < Cookie > GetCookies (
const std : : multimap < grpc : : string_ref , grpc : : string_ref > & initial_metadata ,
absl : : string_view cookie_name ) {
std : : vector < Cookie > values ;
auto pair = initial_metadata . equal_range ( " set-cookie " ) ;
for ( auto it = pair . first ; it ! = pair . second ; + + it ) {
const auto cookie = ParseCookie ( it - > second . data ( ) ) ;
if ( cookie . name = = cookie_name ) {
values . emplace_back ( std : : move ( cookie ) ) ;
}
return values ;
}
return values ;
}
class GreeterClient {
public :
GreeterClient ( std : : shared_ptr < Channel > channel )
: stub_ ( Greeter : : NewStub ( channel ) ) { }
GreeterClient ( std : : shared_ptr < Channel > channel , absl : : string_view cookie_name )
: stub_ ( Greeter : : NewStub ( channel ) ) , cookie_name_ ( cookie_name ) { }
// Assembles the client's payload, sends it and presents the response back
// from the server.
std : : string SayHello ( const std : : string & user , Cookie * cookieFromServer ,
const Cookie * cookieToServer ) {
void SayHello ( ) {
// Data we are sending to the server.
HelloRequest request ;
request . set_name ( user ) ;
request . set_name ( " world " ) ;
// Container for the data we expect from the server.
HelloReply reply ;
@ -120,59 +120,45 @@ class GreeterClient {
// The actual RPC.
std : : mutex mu ;
std : : condition_variable cv ;
bool done = false ;
Status status ;
if ( cookieToServer ! = NULL ) {
std : : pair < std : : string , std : : string > cookieHeader =
cookieToServer - > Header ( ) ;
context . AddMetadata ( cookieHeader . first , cookieHeader . second ) ;
absl : : optional < Status > status ;
// Set the cookie header if we already got a cookie from the server
if ( cookie_from_server_ . has_value ( ) ) {
context . AddMetadata ( " cookie " ,
absl : : StrFormat ( " %s=%s " , cookie_from_server_ - > name ,
cookie_from_server_ - > value ) ) ;
}
stub_ - > async ( ) - > SayHello ( & context , & request , & reply ,
[ & mu , & cv , & done , & status ] ( Status s ) {
status = std : : move ( s ) ;
std : : lock_guard < std : : mutex > lock ( mu ) ;
done = true ;
cv . notify_one ( ) ;
} ) ;
std : : unique_lock < std : : mutex > lock ( mu ) ;
while ( ! done ) {
stub_ - > async ( ) - > SayHello ( & context , & request , & reply , [ & ] ( Status s ) {
std : : lock_guard < std : : mutex > lock ( mu ) ;
status = std : : move ( s ) ;
cv . notify_one ( ) ;
} ) ;
while ( ! status . has_value ( ) ) {
cv . wait ( lock ) ;
}
// Act upon its status.
if ( status . ok ( ) ) {
if ( cookieFromServer ! = NULL ) {
const std : : multimap < grpc : : string_ref , grpc : : string_ref > &
server_initial_metadata = context . GetServerInitialMetadata ( ) ;
std : : vector < Cookie > cookies =
GetCookies ( server_initial_metadata , " GSSA " ) ;
if ( ! cookies . empty ( ) ) {
* cookieFromServer = cookies . front ( ) ;
}
}
return reply . message ( ) ;
} else {
std : : cout < < status . error_code ( ) < < " : " < < status . error_message ( )
< < std : : endl ;
return " RPC failed " ;
if ( ! status - > ok ( ) ) {
std : : cout < < " RPC failed " < < status - > error_code ( ) < < " : "
< < status - > error_message ( ) < < std : : endl ;
return ;
}
const std : : multimap < grpc : : string_ref , grpc : : string_ref > &
server_initial_metadata = context . GetServerInitialMetadata ( ) ;
// Update a cookie after a successful request
std : : vector < Cookie > cookies =
GetCookies ( server_initial_metadata , cookie_name_ ) ;
if ( ! cookies . empty ( ) ) {
cookie_from_server_ . emplace ( std : : move ( cookies . front ( ) ) ) ;
}
std : : cout < < " Greeter received: " < < reply . message ( ) < < std : : endl ;
}
private :
std : : unique_ptr < Greeter : : Stub > stub_ ;
std : : string cookie_name_ ;
absl : : optional < Cookie > cookie_from_server_ ;
} ;
static void sayHello ( GreeterClient & greeter , Cookie * cookieFromServer ,
const Cookie * cookieToServer ) {
std : : string user ( " world " ) ;
std : : string reply = greeter . SayHello ( user , cookieFromServer , cookieToServer ) ;
std : : cout < < " Greeter received: " < < reply < < std : : endl ;
std : : this_thread : : sleep_for ( std : : chrono : : seconds ( 5 ) ) ;
}
int main ( int argc , char * * argv ) {
absl : : ParseCommandLine ( argc , argv ) ;
absl : : StatusOr < grpc : : experimental : : CsmObservability > InitializeObservability ( ) {
opentelemetry : : exporter : : metrics : : PrometheusExporterOptions opts ;
// default was "localhost:9464" which causes connection issue across GKE pods
opts . url = " 0.0.0.0:9464 " ;
@ -181,21 +167,29 @@ int main(int argc, char** argv) {
auto meter_provider =
std : : make_shared < opentelemetry : : sdk : : metrics : : MeterProvider > ( ) ;
meter_provider - > AddMetricReader ( std : : move ( prometheus_exporter ) ) ;
auto observability = grpc : : experimental : : CsmObservabilityBuilder ( )
. SetMeterProvider ( std : : move ( meter_provider ) )
. BuildAndRegister ( ) ;
return grpc : : experimental : : CsmObservabilityBuilder ( )
. SetMeterProvider ( std : : move ( meter_provider ) )
. BuildAndRegister ( ) ;
}
} // namespace
int main ( int argc , char * * argv ) {
absl : : ParseCommandLine ( argc , argv ) ;
// Setup the observability
auto observability = InitializeObservability ( ) ;
if ( ! observability . ok ( ) ) {
std : : cerr < < " CsmObservability::Init() failed: "
< < observability . status ( ) . ToString ( ) < < std : : endl ;
return static_cast < int > ( observability . status ( ) . code ( ) ) ;
}
GreeterClient greeter ( grpc : : CreateChannel (
absl : : GetFlag ( FLAGS_target ) , grpc : : InsecureChannelCredentials ( ) ) ) ;
Cookie session_cookie ;
sayHello ( greeter , & session_cookie , NULL ) ;
GreeterClient greeter ( grpc : : CreateChannel ( absl : : GetFlag ( FLAGS_target ) ,
grpc : : InsecureChannelCredentials ( ) ) ,
absl : : GetFlag ( FLAGS_cookie_name ) ) ;
while ( true ) {
sayHello ( greeter , NULL , & session_cookie ) ;
greeter . SayHello ( ) ;
std : : this_thread : : sleep_for (
std : : chrono : : seconds ( absl : : GetFlag ( FLAGS_delay_s ) ) ) ;
}
return 0 ;
}