|
|
|
@ -33,6 +33,7 @@ |
|
|
|
|
#include <grpc/impl/connectivity_state.h> |
|
|
|
|
#include <grpc/impl/propagation_bits.h> |
|
|
|
|
#include <grpc/slice.h> |
|
|
|
|
#include "absl/log/check.h" |
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
#include <grpc/support/port_platform.h> |
|
|
|
|
|
|
|
|
@ -78,7 +79,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( |
|
|
|
|
factory_->interested_parties(), Slice::FromStaticString(method), |
|
|
|
|
/*authority=*/absl::nullopt, Timestamp::InfFuture(), |
|
|
|
|
/*registered_method=*/true); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
CHECK_NE(call_, nullptr); |
|
|
|
|
// Init data associated with the call.
|
|
|
|
|
grpc_metadata_array_init(&initial_metadata_recv_); |
|
|
|
|
grpc_metadata_array_init(&trailing_metadata_recv_); |
|
|
|
@ -108,7 +109,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( |
|
|
|
|
this->Ref(DEBUG_LOCATION, "OnRecvInitialMetadata").release(), nullptr); |
|
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
call_, ops, static_cast<size_t>(op - ops), &on_recv_initial_metadata_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
|
// Start a batch for recv_trailing_metadata.
|
|
|
|
|
memset(ops, 0, sizeof(ops)); |
|
|
|
|
op = ops; |
|
|
|
@ -125,7 +126,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( |
|
|
|
|
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); |
|
|
|
|
call_error = grpc_call_start_batch_and_execute( |
|
|
|
|
call_, ops, static_cast<size_t>(op - ops), &on_status_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
|
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -135,12 +136,12 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
|
|
|
grpc_byte_buffer_destroy(send_message_payload_); |
|
|
|
|
grpc_byte_buffer_destroy(recv_message_payload_); |
|
|
|
|
CSliceUnref(status_details_); |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
CHECK_NE(call_, nullptr); |
|
|
|
|
grpc_call_unref(call_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() { |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
CHECK_NE(call_, nullptr); |
|
|
|
|
// If we are here because xds_client wants to cancel the call,
|
|
|
|
|
// OnStatusReceived() will complete the cancellation and clean up.
|
|
|
|
|
// Otherwise, we are here because xds_client has to orphan a failed call,
|
|
|
|
@ -164,7 +165,7 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage( |
|
|
|
|
Ref(DEBUG_LOCATION, "OnRequestSent").release(); |
|
|
|
|
grpc_call_error call_error = |
|
|
|
|
grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
|
|
@ -174,10 +175,10 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
|
|
|
memset(&op, 0, sizeof(op)); |
|
|
|
|
op.op = GRPC_OP_RECV_MESSAGE; |
|
|
|
|
op.data.recv_message.recv_message = &recv_message_payload_; |
|
|
|
|
GPR_ASSERT(call_ != nullptr); |
|
|
|
|
CHECK_NE(call_, nullptr); |
|
|
|
|
const grpc_call_error call_error = |
|
|
|
|
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); |
|
|
|
|
GPR_ASSERT(GRPC_CALL_OK == call_error); |
|
|
|
|
CHECK_EQ(call_error, GRPC_CALL_OK); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: |
|
|
|
@ -271,7 +272,7 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport( |
|
|
|
|
channel_ = CreateXdsChannel( |
|
|
|
|
factory->args_, |
|
|
|
|
static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(server)); |
|
|
|
|
GPR_ASSERT(channel_ != nullptr); |
|
|
|
|
CHECK_NE(channel_, nullptr); |
|
|
|
|
if (channel_->IsLame()) { |
|
|
|
|
*status = absl::UnavailableError("xds client has a lame channel"); |
|
|
|
|
} else { |
|
|
|
|