@ -23,6 +23,7 @@
# include <string_view>
# include <string_view>
# include <utility>
# include <utility>
# include "absl/log/check.h"
# include "absl/strings/str_cat.h"
# include "absl/strings/str_cat.h"
# include <grpc/byte_buffer.h>
# include <grpc/byte_buffer.h>
@ -78,7 +79,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
factory_ - > interested_parties ( ) , Slice : : FromStaticString ( method ) ,
factory_ - > interested_parties ( ) , Slice : : FromStaticString ( method ) ,
/*authority=*/ absl : : nullopt , Timestamp : : InfFuture ( ) ,
/*authority=*/ absl : : nullopt , Timestamp : : InfFuture ( ) ,
/*registered_method=*/ true ) ;
/*registered_method=*/ true ) ;
GPR_ASSERT ( call_ ! = nullptr ) ;
CHECK_NE ( call_ , nullptr ) ;
// Init data associated with the call.
// Init data associated with the call.
grpc_metadata_array_init ( & initial_metadata_recv_ ) ;
grpc_metadata_array_init ( & initial_metadata_recv_ ) ;
grpc_metadata_array_init ( & trailing_metadata_recv_ ) ;
grpc_metadata_array_init ( & trailing_metadata_recv_ ) ;
@ -108,7 +109,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
this - > Ref ( DEBUG_LOCATION , " OnRecvInitialMetadata " ) . release ( ) , nullptr ) ;
this - > Ref ( DEBUG_LOCATION , " OnRecvInitialMetadata " ) . release ( ) , nullptr ) ;
call_error = grpc_call_start_batch_and_execute (
call_error = grpc_call_start_batch_and_execute (
call_ , ops , static_cast < size_t > ( op - ops ) , & on_recv_initial_metadata_ ) ;
call_ , ops , static_cast < size_t > ( op - ops ) , & on_recv_initial_metadata_ ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
CHECK_EQ ( call_error , GRPC_CALL_OK ) ;
// Start a batch for recv_trailing_metadata.
// Start a batch for recv_trailing_metadata.
memset ( ops , 0 , sizeof ( ops ) ) ;
memset ( ops , 0 , sizeof ( ops ) ) ;
op = ops ;
op = ops ;
@ -125,7 +126,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
GRPC_CLOSURE_INIT ( & on_status_received_ , OnStatusReceived , this , nullptr ) ;
GRPC_CLOSURE_INIT ( & on_status_received_ , OnStatusReceived , this , nullptr ) ;
call_error = grpc_call_start_batch_and_execute (
call_error = grpc_call_start_batch_and_execute (
call_ , ops , static_cast < size_t > ( op - ops ) , & on_status_received_ ) ;
call_ , ops , static_cast < size_t > ( op - ops ) , & on_status_received_ ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
CHECK_EQ ( call_error , GRPC_CALL_OK ) ;
GRPC_CLOSURE_INIT ( & on_response_received_ , OnResponseReceived , this , nullptr ) ;
GRPC_CLOSURE_INIT ( & on_response_received_ , OnResponseReceived , this , nullptr ) ;
}
}
@ -135,12 +136,12 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
grpc_byte_buffer_destroy ( send_message_payload_ ) ;
grpc_byte_buffer_destroy ( send_message_payload_ ) ;
grpc_byte_buffer_destroy ( recv_message_payload_ ) ;
grpc_byte_buffer_destroy ( recv_message_payload_ ) ;
CSliceUnref ( status_details_ ) ;
CSliceUnref ( status_details_ ) ;
GPR_ASSERT ( call_ ! = nullptr ) ;
CHECK_NE ( call_ , nullptr ) ;
grpc_call_unref ( call_ ) ;
grpc_call_unref ( call_ ) ;
}
}
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : : Orphan ( ) {
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : : Orphan ( ) {
GPR_ASSERT ( call_ ! = nullptr ) ;
CHECK_NE ( call_ , nullptr ) ;
// If we are here because xds_client wants to cancel the call,
// If we are here because xds_client wants to cancel the call,
// OnStatusReceived() will complete the cancellation and clean up.
// OnStatusReceived() will complete the cancellation and clean up.
// Otherwise, we are here because xds_client has to orphan a failed call,
// Otherwise, we are here because xds_client has to orphan a failed call,
@ -164,7 +165,7 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
Ref ( DEBUG_LOCATION , " OnRequestSent " ) . release ( ) ;
Ref ( DEBUG_LOCATION , " OnRequestSent " ) . release ( ) ;
grpc_call_error call_error =
grpc_call_error call_error =
grpc_call_start_batch_and_execute ( call_ , & op , 1 , & on_request_sent_ ) ;
grpc_call_start_batch_and_execute ( call_ , & op , 1 , & on_request_sent_ ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
CHECK_EQ ( call_error , GRPC_CALL_OK ) ;
}
}
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : :
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : :
@ -174,10 +175,10 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
memset ( & op , 0 , sizeof ( op ) ) ;
memset ( & op , 0 , sizeof ( op ) ) ;
op . op = GRPC_OP_RECV_MESSAGE ;
op . op = GRPC_OP_RECV_MESSAGE ;
op . data . recv_message . recv_message = & recv_message_payload_ ;
op . data . recv_message . recv_message = & recv_message_payload_ ;
GPR_ASSERT ( call_ ! = nullptr ) ;
CHECK_NE ( call_ , nullptr ) ;
const grpc_call_error call_error =
const grpc_call_error call_error =
grpc_call_start_batch_and_execute ( call_ , & op , 1 , & on_response_received_ ) ;
grpc_call_start_batch_and_execute ( call_ , & op , 1 , & on_response_received_ ) ;
GPR_ASSERT ( GRPC_CALL_OK = = call_error ) ;
CHECK_EQ ( call_error , GRPC_CALL_OK ) ;
}
}
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : :
void GrpcXdsTransportFactory : : GrpcXdsTransport : : GrpcStreamingCall : :
@ -271,7 +272,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
channel_ = CreateXdsChannel (
channel_ = CreateXdsChannel (
factory - > args_ ,
factory - > args_ ,
static_cast < const GrpcXdsBootstrap : : GrpcXdsServer & > ( server ) ) ;
static_cast < const GrpcXdsBootstrap : : GrpcXdsServer & > ( server ) ) ;
GPR_ASSERT ( channel_ ! = nullptr ) ;
CHECK ( channel_ ! = nullptr ) ;
if ( channel_ - > IsLame ( ) ) {
if ( channel_ - > IsLame ( ) ) {
* status = absl : : UnavailableError ( " xds client has a lame channel " ) ;
* status = absl : : UnavailableError ( " xds client has a lame channel " ) ;
} else {
} else {