|
|
|
@ -26,13 +26,18 @@ |
|
|
|
|
|
|
|
|
|
#include <grpc/event_engine/event_engine.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/client_channel/client_channel_factory.h" |
|
|
|
|
#include "src/core/client_channel/client_channel_filter.h" |
|
|
|
|
#include "src/core/ext/transport/chaotic_good/client_transport.h" |
|
|
|
|
#include "src/core/ext/transport/chaotic_good/frame.h" |
|
|
|
|
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
|
|
|
|
#include "src/core/ext/transport/chaotic_good/settings_metadata.h" |
|
|
|
|
#include "src/core/lib/channel/channel_args.h" |
|
|
|
|
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
|
|
|
|
#include "src/core/lib/event_engine/default_event_engine.h" |
|
|
|
|
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
|
|
|
|
#include "src/core/lib/gprpp/debug_location.h" |
|
|
|
|
#include "src/core/lib/gprpp/no_destruct.h" |
|
|
|
|
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
|
|
|
|
#include "src/core/lib/gprpp/time.h" |
|
|
|
|
#include "src/core/lib/iomgr/closure.h" |
|
|
|
@ -51,21 +56,16 @@ |
|
|
|
|
#include "src/core/lib/resource_quota/resource_quota.h" |
|
|
|
|
#include "src/core/lib/slice/slice.h" |
|
|
|
|
#include "src/core/lib/slice/slice_buffer.h" |
|
|
|
|
#include "src/core/lib/surface/api_trace.h" |
|
|
|
|
#include "src/core/lib/surface/channel.h" |
|
|
|
|
#include "src/core/lib/transport/error_utils.h" |
|
|
|
|
#include "src/core/lib/transport/handshaker.h" |
|
|
|
|
#include "src/core/lib/transport/metadata_batch.h" |
|
|
|
|
#include "src/core/lib/transport/promise_endpoint.h" |
|
|
|
|
|
|
|
|
|
namespace grpc_core { |
|
|
|
|
namespace chaotic_good { |
|
|
|
|
using grpc_event_engine::experimental::EventEngine; |
|
|
|
|
namespace { |
|
|
|
|
void MaybeNotify(const DebugLocation& location, grpc_closure*& notify, |
|
|
|
|
grpc_error_handle error) { |
|
|
|
|
if (notify != nullptr) { |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
ExecCtx::Run(location, std::exchange(notify, nullptr), error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
const int32_t kDataAlignmentBytes = 64; |
|
|
|
|
const int32_t kTimeoutSecs = 5; |
|
|
|
|
} // namespace
|
|
|
|
@ -73,16 +73,10 @@ const int32_t kTimeoutSecs = 5; |
|
|
|
|
ChaoticGoodConnector::ChaoticGoodConnector( |
|
|
|
|
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine) |
|
|
|
|
: event_engine_(std::move(event_engine)), |
|
|
|
|
handshake_mgr_(std::make_shared<HandshakeManager>()), |
|
|
|
|
data_endpoint_latch_( |
|
|
|
|
std::make_shared<Latch<std::shared_ptr<PromiseEndpoint>>>()), |
|
|
|
|
wait_for_data_endpoint_callback_(std::make_shared<WaitForCallback>()) { |
|
|
|
|
channel_args_ = channel_args_.SetObject(event_engine_); |
|
|
|
|
channel_args_ = |
|
|
|
|
channel_args_.Set(GRPC_ARG_RESOURCE_QUOTA, ResourceQuota::Default()); |
|
|
|
|
} |
|
|
|
|
handshake_mgr_(std::make_shared<HandshakeManager>()) {} |
|
|
|
|
|
|
|
|
|
ChaoticGoodConnector::~ChaoticGoodConnector() { |
|
|
|
|
GPR_ASSERT(notify_ == nullptr); |
|
|
|
|
if (connect_activity_ != nullptr) { |
|
|
|
|
connect_activity_.reset(); |
|
|
|
|
} |
|
|
|
@ -90,13 +84,11 @@ ChaoticGoodConnector::~ChaoticGoodConnector() { |
|
|
|
|
|
|
|
|
|
auto ChaoticGoodConnector::DataEndpointReadSettingsFrame( |
|
|
|
|
RefCountedPtr<ChaoticGoodConnector> self) { |
|
|
|
|
GPR_ASSERT(self->data_endpoint_ != nullptr); |
|
|
|
|
return TrySeq( |
|
|
|
|
self->data_endpoint_->ReadSlice(FrameHeader::kFrameHeaderSize), |
|
|
|
|
self->data_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize), |
|
|
|
|
[self](Slice slice) mutable { |
|
|
|
|
// Read setting frame;
|
|
|
|
|
// Parse frame header
|
|
|
|
|
GPR_ASSERT(self->data_endpoint_ != nullptr); |
|
|
|
|
auto frame_header_ = |
|
|
|
|
FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
|
|
|
|
GRPC_SLICE_START_PTR(slice.c_slice()))); |
|
|
|
@ -104,7 +96,7 @@ auto ChaoticGoodConnector::DataEndpointReadSettingsFrame( |
|
|
|
|
frame_header_.ok(), |
|
|
|
|
[frame_header_ = *frame_header_, self]() { |
|
|
|
|
auto frame_header_length = frame_header_.GetFrameLength(); |
|
|
|
|
return TrySeq(self->data_endpoint_->Read(frame_header_length), |
|
|
|
|
return TrySeq(self->data_endpoint_.Read(frame_header_length), |
|
|
|
|
[]() { return absl::OkStatus(); }); |
|
|
|
|
}, |
|
|
|
|
[status = frame_header_.status()]() { return status; }); |
|
|
|
@ -113,7 +105,6 @@ auto ChaoticGoodConnector::DataEndpointReadSettingsFrame( |
|
|
|
|
|
|
|
|
|
auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame( |
|
|
|
|
RefCountedPtr<ChaoticGoodConnector> self) { |
|
|
|
|
GPR_ASSERT(self->data_endpoint_ != nullptr); |
|
|
|
|
return [self]() { |
|
|
|
|
// Serialize setting frame.
|
|
|
|
|
SettingsFrame frame; |
|
|
|
@ -122,7 +113,7 @@ auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame( |
|
|
|
|
self->connection_id_, kDataAlignmentBytes} |
|
|
|
|
.ToMetadataBatch(GetContext<Arena>()); |
|
|
|
|
auto write_buffer = frame.Serialize(&self->hpack_compressor_); |
|
|
|
|
return self->data_endpoint_->Write(std::move(write_buffer.control)); |
|
|
|
|
return self->data_endpoint_.Write(std::move(write_buffer.control)); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -133,48 +124,43 @@ auto ChaoticGoodConnector::WaitForDataEndpointSetup( |
|
|
|
|
on_data_endpoint_connect = |
|
|
|
|
[self](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> |
|
|
|
|
endpoint) mutable { |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { |
|
|
|
|
auto error = GRPC_ERROR_CREATE("connect endpoint failed"); |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, |
|
|
|
|
std::exchange(self->notify_, nullptr), |
|
|
|
|
GRPC_ERROR_CREATE("connect endpoint failed")); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
self->data_endpoint_latch_->Set(std::make_shared<PromiseEndpoint>( |
|
|
|
|
std::move(endpoint.value()), SliceBuffer())); |
|
|
|
|
auto cb = self->wait_for_data_endpoint_callback_->MakeCallback(); |
|
|
|
|
// Wake up wait_for_data_endpoint_callback_.
|
|
|
|
|
cb(); |
|
|
|
|
self->data_endpoint_ = |
|
|
|
|
PromiseEndpoint(std::move(endpoint.value()), SliceBuffer()); |
|
|
|
|
self->data_endpoint_ready_.Set(); |
|
|
|
|
}; |
|
|
|
|
self->event_engine_->Connect( |
|
|
|
|
std::move(on_data_endpoint_connect), *self->resolved_addr_, |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig( |
|
|
|
|
self->channel_args_), |
|
|
|
|
self->args_.channel_args), |
|
|
|
|
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
|
|
|
|
"data_endpoint_connection"), |
|
|
|
|
EventEngine::Duration(kTimeoutSecs)); |
|
|
|
|
|
|
|
|
|
return TrySeq( |
|
|
|
|
self->wait_for_data_endpoint_callback_->MakeWaitPromise(), |
|
|
|
|
Race(TrySeq( |
|
|
|
|
self->data_endpoint_latch_->Wait(), |
|
|
|
|
[self](std::shared_ptr<PromiseEndpoint> data_endpoint) mutable { |
|
|
|
|
self->data_endpoint_.swap(data_endpoint); |
|
|
|
|
return TrySeq( |
|
|
|
|
DataEndpointWriteSettingsFrame(self), |
|
|
|
|
DataEndpointReadSettingsFrame(self), |
|
|
|
|
[]() -> absl::Status { return absl::OkStatus(); }); |
|
|
|
|
}), |
|
|
|
|
TrySeq(Sleep(Timestamp::Now() + Duration::Seconds(kTimeoutSecs)), |
|
|
|
|
[]() -> absl::Status { |
|
|
|
|
return absl::DeadlineExceededError( |
|
|
|
|
"Data endpoint connect deadline exceeded."); |
|
|
|
|
}))); |
|
|
|
|
return TrySeq(Race( |
|
|
|
|
TrySeq(self->data_endpoint_ready_.Wait(), |
|
|
|
|
[self]() mutable { |
|
|
|
|
return TrySeq(DataEndpointWriteSettingsFrame(self), |
|
|
|
|
DataEndpointReadSettingsFrame(self), |
|
|
|
|
[]() -> absl::Status { return absl::OkStatus(); }); |
|
|
|
|
}), |
|
|
|
|
TrySeq(Sleep(Timestamp::Now() + Duration::Seconds(kTimeoutSecs)), |
|
|
|
|
[]() -> absl::Status { |
|
|
|
|
return absl::DeadlineExceededError( |
|
|
|
|
"Data endpoint connect deadline exceeded."); |
|
|
|
|
}))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame( |
|
|
|
|
RefCountedPtr<ChaoticGoodConnector> self) { |
|
|
|
|
GPR_ASSERT(self->control_endpoint_ != nullptr); |
|
|
|
|
return TrySeq( |
|
|
|
|
self->control_endpoint_->ReadSlice(FrameHeader::kFrameHeaderSize), |
|
|
|
|
self->control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize), |
|
|
|
|
[self](Slice slice) { |
|
|
|
|
// Parse frame header
|
|
|
|
|
auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
|
|
|
@ -182,7 +168,7 @@ auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame( |
|
|
|
|
return If( |
|
|
|
|
frame_header.ok(), |
|
|
|
|
TrySeq( |
|
|
|
|
self->control_endpoint_->Read(frame_header->GetFrameLength()), |
|
|
|
|
self->control_endpoint_.Read(frame_header->GetFrameLength()), |
|
|
|
|
[frame_header = *frame_header, self](SliceBuffer buffer) { |
|
|
|
|
// Deserialize setting frame.
|
|
|
|
|
SettingsFrame frame; |
|
|
|
@ -215,7 +201,6 @@ auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame( |
|
|
|
|
auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame( |
|
|
|
|
RefCountedPtr<ChaoticGoodConnector> self) { |
|
|
|
|
return [self]() { |
|
|
|
|
GPR_ASSERT(self->control_endpoint_ != nullptr); |
|
|
|
|
// Serialize setting frame.
|
|
|
|
|
SettingsFrame frame; |
|
|
|
|
// frame.header set connectiion_type: control
|
|
|
|
@ -223,7 +208,7 @@ auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame( |
|
|
|
|
absl::nullopt, absl::nullopt} |
|
|
|
|
.ToMetadataBatch(GetContext<Arena>()); |
|
|
|
|
auto write_buffer = frame.Serialize(&self->hpack_compressor_); |
|
|
|
|
return self->control_endpoint_->Write(std::move(write_buffer.control)); |
|
|
|
|
return self->control_endpoint_.Write(std::move(write_buffer.control)); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -233,8 +218,9 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result, |
|
|
|
|
MutexLock lock(&mu_); |
|
|
|
|
result_ = result; |
|
|
|
|
if (is_shutdown_) { |
|
|
|
|
auto error = GRPC_ERROR_CREATE("connector shutdown"); |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, notify, error); |
|
|
|
|
GPR_ASSERT(notify_ == nullptr); |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, notify, |
|
|
|
|
GRPC_ERROR_CREATE("connector shutdown")); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -248,21 +234,25 @@ void ChaoticGoodConnector::Connect(const Args& args, Result* result, |
|
|
|
|
[self = RefAsSubclass<ChaoticGoodConnector>()]( |
|
|
|
|
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> |
|
|
|
|
endpoint) mutable { |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { |
|
|
|
|
auto error = GRPC_ERROR_CREATE("connect endpoint failed"); |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
|
|
|
|
auto endpoint_status = endpoint.status(); |
|
|
|
|
auto error = GRPC_ERROR_CREATE_REFERENCING("connect endpoint failed", |
|
|
|
|
&endpoint_status, 1); |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), |
|
|
|
|
error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
ExecCtx exec_ctx; |
|
|
|
|
auto* p = self.release(); |
|
|
|
|
p->handshake_mgr_->DoHandshake( |
|
|
|
|
grpc_event_engine_endpoint_create(std::move(endpoint.value())), |
|
|
|
|
p->channel_args_, p->args_.deadline, nullptr /* acceptor */, |
|
|
|
|
p->args_.channel_args, p->args_.deadline, nullptr /* acceptor */, |
|
|
|
|
OnHandshakeDone, p); |
|
|
|
|
}; |
|
|
|
|
event_engine_->Connect( |
|
|
|
|
std::move(on_connect), *resolved_addr_, |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args_), |
|
|
|
|
grpc_event_engine::experimental::ChannelArgsEndpointConfig( |
|
|
|
|
args_.channel_args), |
|
|
|
|
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
|
|
|
|
"data_endpoint_connection"), |
|
|
|
|
EventEngine::Duration(kTimeoutSecs)); |
|
|
|
@ -272,8 +262,6 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) { |
|
|
|
|
auto* args = static_cast<HandshakerArgs*>(arg); |
|
|
|
|
RefCountedPtr<ChaoticGoodConnector> self( |
|
|
|
|
static_cast<ChaoticGoodConnector*>(args->user_data)); |
|
|
|
|
gpr_log(GPR_ERROR, "SubchannelConnector::OnHandshakeDone:%p", |
|
|
|
|
static_cast<SubchannelConnector*>(self.get())); |
|
|
|
|
grpc_slice_buffer_destroy(args->read_buffer); |
|
|
|
|
gpr_free(args->read_buffer); |
|
|
|
|
// Start receiving setting frames;
|
|
|
|
@ -290,14 +278,15 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
self->result_->Reset(); |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), |
|
|
|
|
error); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if (args->endpoint != nullptr) { |
|
|
|
|
GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint( |
|
|
|
|
args->endpoint)); |
|
|
|
|
self->control_endpoint_ = std::make_shared<PromiseEndpoint>( |
|
|
|
|
self->control_endpoint_ = PromiseEndpoint( |
|
|
|
|
grpc_event_engine::experimental:: |
|
|
|
|
grpc_take_wrapped_event_engine_endpoint(args->endpoint), |
|
|
|
|
SliceBuffer()); |
|
|
|
@ -309,7 +298,24 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) { |
|
|
|
|
}, |
|
|
|
|
EventEngineWakeupScheduler(self->event_engine_), |
|
|
|
|
[self](absl::Status status) { |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, self->notify_, status); |
|
|
|
|
if (grpc_chaotic_good_trace.enabled()) { |
|
|
|
|
gpr_log(GPR_INFO, "ChaoticGoodConnector::OnHandshakeDone: %s", |
|
|
|
|
status.ToString().c_str()); |
|
|
|
|
} |
|
|
|
|
if (status.ok()) { |
|
|
|
|
MutexLock lock(&self->mu_); |
|
|
|
|
self->result_->transport = new ChaoticGoodClientTransport( |
|
|
|
|
std::move(self->control_endpoint_), |
|
|
|
|
std::move(self->data_endpoint_), self->args_.channel_args, |
|
|
|
|
self->event_engine_, std::move(self->hpack_parser_), |
|
|
|
|
std::move(self->hpack_compressor_)); |
|
|
|
|
self->result_->channel_args = self->args_.channel_args; |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), |
|
|
|
|
status); |
|
|
|
|
} else if (self->notify_ != nullptr) { |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), |
|
|
|
|
status); |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
self->arena_.get(), self->event_engine_.get()); |
|
|
|
|
MutexLock lock(&self->mu_); |
|
|
|
@ -321,8 +327,59 @@ void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) { |
|
|
|
|
MutexLock lock(&self->mu_); |
|
|
|
|
self->result_->Reset(); |
|
|
|
|
auto error = GRPC_ERROR_CREATE("handshake complete with empty endpoint."); |
|
|
|
|
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
|
|
|
|
ExecCtx::Run(DEBUG_LOCATION, std::exchange(self->notify_, nullptr), error); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
namespace { |
|
|
|
|
|
|
|
|
|
class ChaoticGoodChannelFactory final : public ClientChannelFactory { |
|
|
|
|
public: |
|
|
|
|
RefCountedPtr<Subchannel> CreateSubchannel( |
|
|
|
|
const grpc_resolved_address& address, const ChannelArgs& args) override { |
|
|
|
|
return Subchannel::Create( |
|
|
|
|
MakeOrphanable<ChaoticGoodConnector>( |
|
|
|
|
args.GetObjectRef<grpc_event_engine::experimental::EventEngine>()), |
|
|
|
|
address, args); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
} // namespace
|
|
|
|
|
} // namespace chaotic_good
|
|
|
|
|
} // namespace grpc_core
|
|
|
|
|
|
|
|
|
|
grpc_channel* grpc_chaotic_good_channel_create(const char* target, |
|
|
|
|
const grpc_channel_args* args) { |
|
|
|
|
grpc_core::ExecCtx exec_ctx; |
|
|
|
|
GRPC_API_TRACE("grpc_chaotic_good_channel_create(target=%s, args=%p)", 2, |
|
|
|
|
(target, (void*)args)); |
|
|
|
|
grpc_channel* channel = nullptr; |
|
|
|
|
grpc_error_handle error; |
|
|
|
|
// Create channel.
|
|
|
|
|
std::string canonical_target = grpc_core::CoreConfiguration::Get() |
|
|
|
|
.resolver_registry() |
|
|
|
|
.AddDefaultPrefixIfNeeded(target); |
|
|
|
|
auto r = grpc_core::Channel::Create( |
|
|
|
|
target, |
|
|
|
|
grpc_core::CoreConfiguration::Get() |
|
|
|
|
.channel_args_preconditioning() |
|
|
|
|
.PreconditionChannelArgs(args) |
|
|
|
|
.Set(GRPC_ARG_SERVER_URI, canonical_target) |
|
|
|
|
.SetObject( |
|
|
|
|
grpc_core::NoDestructSingleton< |
|
|
|
|
grpc_core::chaotic_good::ChaoticGoodChannelFactory>::Get()), |
|
|
|
|
GRPC_CLIENT_CHANNEL, nullptr); |
|
|
|
|
if (r.ok()) { |
|
|
|
|
return r->release()->c_ptr(); |
|
|
|
|
} |
|
|
|
|
error = absl_status_to_grpc_error(r.status()); |
|
|
|
|
intptr_t integer; |
|
|
|
|
grpc_status_code status = GRPC_STATUS_INTERNAL; |
|
|
|
|
if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus, |
|
|
|
|
&integer)) { |
|
|
|
|
status = static_cast<grpc_status_code>(integer); |
|
|
|
|
} |
|
|
|
|
channel = grpc_lame_client_channel_create( |
|
|
|
|
target, status, "Failed to create secure client channel"); |
|
|
|
|
return channel; |
|
|
|
|
} |
|
|
|
|