@ -54,7 +54,6 @@
# include "src/core/lib/channel/channel_fwd.h"
# include "src/core/lib/channel/channel_stack.h"
# include "src/core/lib/channel/context.h"
# include "src/core/lib/channel/promise_based_filter.h"
# include "src/core/lib/config/core_configuration.h"
# include "src/core/lib/gprpp/host_port.h"
# include "src/core/lib/gprpp/time.h"
@ -71,7 +70,6 @@
# include "src/core/lib/transport/metadata_batch.h"
# include "src/core/lib/transport/transport.h"
# include "src/core/lib/uri/uri_parser.h"
# include "src/cpp/ext/filters/census/client_filter.h"
namespace grpc_core {
@ -342,112 +340,104 @@ class CallData {
LoggingSink : : Config config_ ;
} ;
class ClientLoggingFilter final : public ChannelFilter {
public :
static const grpc_channel_filter kFilter ;
static absl : : StatusOr < ClientLoggingFilter > Create (
const ChannelArgs & args , ChannelFilter : : Args /*filter_args*/ ) {
absl : : optional < absl : : string_view > default_authority =
args . GetString ( GRPC_ARG_DEFAULT_AUTHORITY ) ;
if ( default_authority . has_value ( ) ) {
return ClientLoggingFilter ( std : : string ( default_authority . value ( ) ) ) ;
}
absl : : optional < std : : string > server_uri =
args . GetOwnedString ( GRPC_ARG_SERVER_URI ) ;
if ( server_uri . has_value ( ) ) {
return ClientLoggingFilter (
CoreConfiguration : : Get ( ) . resolver_registry ( ) . GetDefaultAuthority (
* server_uri ) ) ;
}
return ClientLoggingFilter ( " " ) ;
}
} // namespace
// Construct a promise for one call.
ArenaPromise < ServerMetadataHandle > MakeCallPromise (
CallArgs call_args , NextPromiseFactory next_promise_factory ) override {
CallData * calld = GetContext < Arena > ( ) - > ManagedNew < CallData > (
true , call_args , default_authority_ ) ;
if ( ! calld - > ShouldLog ( ) ) {
return next_promise_factory ( std : : move ( call_args ) ) ;
}
calld - > LogClientHeader (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
call_args . client_initial_metadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ calld ] ( ServerMetadataHandle metadata ) {
calld - > LogServerHeader (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
metadata . get ( ) ) ;
return metadata ;
} ) ;
call_args . client_to_server_messages - > InterceptAndMapWithHalfClose (
[ calld ] ( MessageHandle message ) {
calld - > LogClientMessage (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ,
[ calld ] {
calld - > LogClientHalfClose (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ) ;
} ) ;
call_args . server_to_client_messages - > InterceptAndMap (
[ calld ] ( MessageHandle message ) {
calld - > LogServerMessage (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ) ;
return OnCancel (
Map ( next_promise_factory ( std : : move ( call_args ) ) ,
[ calld ] ( ServerMetadataHandle md ) {
calld - > LogServerTrailer (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
md . get ( ) ) ;
return md ;
} ) ,
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[ calld , ctx = GetContext < grpc_call_context_element > ( ) ] ( ) {
calld - > LogCancel (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
ctx [ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ] . value ) ) ;
} ) ;
absl : : StatusOr < ClientLoggingFilter > ClientLoggingFilter : : Create (
const ChannelArgs & args , ChannelFilter : : Args /*filter_args*/ ) {
absl : : optional < absl : : string_view > default_authority =
args . GetString ( GRPC_ARG_DEFAULT_AUTHORITY ) ;
if ( default_authority . has_value ( ) ) {
return ClientLoggingFilter ( std : : string ( default_authority . value ( ) ) ) ;
}
absl : : optional < std : : string > server_uri =
args . GetOwnedString ( GRPC_ARG_SERVER_URI ) ;
if ( server_uri . has_value ( ) ) {
return ClientLoggingFilter (
CoreConfiguration : : Get ( ) . resolver_registry ( ) . GetDefaultAuthority (
* server_uri ) ) ;
}
return ClientLoggingFilter ( " " ) ;
}
private :
explicit ClientLoggingFilter ( std : : string default_authority )
: default_authority_ ( std : : move ( default_authority ) ) { }
std : : string default_authority_ ;
} ;
// Construct a promise for one call.
ArenaPromise < ServerMetadataHandle > ClientLoggingFilter : : MakeCallPromise (
CallArgs call_args , NextPromiseFactory next_promise_factory ) {
CallData * calld = GetContext < Arena > ( ) - > ManagedNew < CallData > (
true , call_args , default_authority_ ) ;
if ( ! calld - > ShouldLog ( ) ) {
return next_promise_factory ( std : : move ( call_args ) ) ;
}
calld - > LogClientHeader (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
call_args . client_initial_metadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ calld ] ( ServerMetadataHandle metadata ) {
calld - > LogServerHeader (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
metadata . get ( ) ) ;
return metadata ;
} ) ;
call_args . client_to_server_messages - > InterceptAndMapWithHalfClose (
[ calld ] ( MessageHandle message ) {
calld - > LogClientMessage (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ,
[ calld ] {
calld - > LogClientHalfClose (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ) ;
} ) ;
call_args . server_to_client_messages - > InterceptAndMap (
[ calld ] ( MessageHandle message ) {
calld - > LogServerMessage (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ) ;
return OnCancel (
Map ( next_promise_factory ( std : : move ( call_args ) ) ,
[ calld ] ( ServerMetadataHandle md ) {
calld - > LogServerTrailer (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
md . get ( ) ) ;
return md ;
} ) ,
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture it here.
// This ought to be easy to fix once client side promises are completely
// rolled out.
[ calld , ctx = GetContext < grpc_call_context_element > ( ) ] ( ) {
calld - > LogCancel (
/*is_client=*/ true ,
static_cast < CallTracerAnnotationInterface * > (
ctx [ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ] . value ) ) ;
} ) ;
}
const grpc_channel_filter ClientLoggingFilter : : kFilter =
MakePromiseBasedFilter < ClientLoggingFilter , FilterEndpoint : : kClient ,
@ -455,91 +445,86 @@ const grpc_channel_filter ClientLoggingFilter::kFilter =
kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages > ( " logging " ) ;
class ServerLoggingFilter final : public ChannelFilter {
public :
static const grpc_channel_filter kFilter ;
static absl : : StatusOr < ServerLoggingFilter > Create (
const ChannelArgs & /*args*/ , ChannelFilter : : Args /*filter_args*/ ) {
return ServerLoggingFilter ( ) ;
}
absl : : StatusOr < ServerLoggingFilter > ServerLoggingFilter : : Create (
const ChannelArgs & /*args*/ , ChannelFilter : : Args /*filter_args*/ ) {
return ServerLoggingFilter ( ) ;
}
// Construct a promise for one call.
ArenaPromise < ServerMetadataHandle > MakeCallPromise (
CallArgs call_args , NextPromiseFactory next_promise_factory ) override {
CallData * calld = GetContext < Arena > ( ) - > ManagedNew < CallData > (
false , call_args , /*default_authority=*/ " " ) ;
if ( ! calld - > ShouldLog ( ) ) {
return next_promise_factory ( std : : move ( call_args ) ) ;
}
auto * call_tracer = static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ;
calld - > LogClientHeader (
/*is_client=*/ false , call_tracer , call_args . client_initial_metadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ calld ] ( ServerMetadataHandle metadata ) {
calld - > LogServerHeader (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
metadata . get ( ) ) ;
return metadata ;
} ) ;
call_args . client_to_server_messages - > InterceptAndMapWithHalfClose (
[ calld ] ( MessageHandle message ) {
calld - > LogClientMessage (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ,
[ calld ] {
calld - > LogClientHalfClose (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ) ;
} ) ;
call_args . server_to_client_messages - > InterceptAndMap (
[ calld ] ( MessageHandle message ) {
calld - > LogServerMessage (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ) ;
return OnCancel (
Map ( next_promise_factory ( std : : move ( call_args ) ) ,
[ calld ] ( ServerMetadataHandle md ) {
calld - > LogServerTrailer (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
md . get ( ) ) ;
return md ;
} ) ,
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture
// call_tracer.
[ calld , call_tracer ] ( ) {
calld - > LogCancel (
/*is_client=*/ false , call_tracer ) ;
} ) ;
// Construct a promise for one call.
ArenaPromise < ServerMetadataHandle > ServerLoggingFilter : : MakeCallPromise (
CallArgs call_args , NextPromiseFactory next_promise_factory ) {
CallData * calld = GetContext < Arena > ( ) - > ManagedNew < CallData > (
false , call_args , /*default_authority=*/ " " ) ;
if ( ! calld - > ShouldLog ( ) ) {
return next_promise_factory ( std : : move ( call_args ) ) ;
}
} ;
auto * call_tracer = static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ;
calld - > LogClientHeader (
/*is_client=*/ false , call_tracer , call_args . client_initial_metadata ) ;
call_args . server_initial_metadata - > InterceptAndMap (
[ calld ] ( ServerMetadataHandle metadata ) {
calld - > LogServerHeader (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
metadata . get ( ) ) ;
return metadata ;
} ) ;
call_args . client_to_server_messages - > InterceptAndMapWithHalfClose (
[ calld ] ( MessageHandle message ) {
calld - > LogClientMessage (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ,
[ calld ] {
calld - > LogClientHalfClose (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ) ;
} ) ;
call_args . server_to_client_messages - > InterceptAndMap (
[ calld ] ( MessageHandle message ) {
calld - > LogServerMessage (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
message - > payload ( ) ) ;
return message ;
} ) ;
return OnCancel (
Map ( next_promise_factory ( std : : move ( call_args ) ) ,
[ calld ] ( ServerMetadataHandle md ) {
calld - > LogServerTrailer (
/*is_client=*/ false ,
static_cast < CallTracerAnnotationInterface * > (
GetContext < grpc_call_context_element > ( )
[ GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE ]
. value ) ,
md . get ( ) ) ;
return md ;
} ) ,
// TODO(yashykt/ctiller): GetContext<grpc_call_context_element> is not
// valid for the cancellation function requiring us to capture
// call_tracer.
[ calld , call_tracer ] ( ) {
calld - > LogCancel (
/*is_client=*/ false , call_tracer ) ;
} ) ;
}
const grpc_channel_filter ServerLoggingFilter : : kFilter =
MakePromiseBasedFilter < ServerLoggingFilter , FilterEndpoint : : kServer ,
@ -547,8 +532,6 @@ const grpc_channel_filter ServerLoggingFilter::kFilter =
kFilterExaminesInboundMessages |
kFilterExaminesOutboundMessages > ( " logging " ) ;
} // namespace
void RegisterLoggingFilter ( LoggingSink * sink ) {
g_logging_sink = sink ;
CoreConfiguration : : RegisterBuilder ( [ ] ( CoreConfiguration : : Builder * builder ) {
@ -559,8 +542,7 @@ void RegisterLoggingFilter(LoggingSink* sink) {
builder - > channel_init ( )
- > RegisterFilter ( GRPC_CLIENT_CHANNEL , & ClientLoggingFilter : : kFilter )
// TODO(yashykt) : Figure out a good place to place this channel arg
. IfChannelArg ( " grpc.experimental.enable_observability " , true )
. After ( { & grpc : : internal : : OpenCensusClientFilter : : kFilter } ) ;
. IfChannelArg ( " grpc.experimental.enable_observability " , true ) ;
} ) ;
}