[chaotic-good] Connection setup & test suites (#35650)
Closes #35650
COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/35650 from ctiller:shush-tsan fd55ea1be3
PiperOrigin-RevId: 601221780
pull/35630/head^2
parent
7d5b53e2a7
commit
870a66d9a0
71 changed files with 2548 additions and 33 deletions
@ -0,0 +1,328 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/client/chaotic_good_connector.h" |
||||
|
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <utility> |
||||
|
||||
#include "absl/random/bit_gen_ref.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/frame.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
||||
#include "src/core/ext/transport/chaotic_good/settings_metadata.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/gprpp/debug_location.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/race.h" |
||||
#include "src/core/lib/promise/sleep.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/promise/wait_for_callback.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/transport/handshaker.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
using grpc_event_engine::experimental::EventEngine; |
||||
namespace { |
||||
void MaybeNotify(const DebugLocation& location, grpc_closure*& notify, |
||||
grpc_error_handle error) { |
||||
if (notify != nullptr) { |
||||
ExecCtx exec_ctx; |
||||
ExecCtx::Run(location, std::exchange(notify, nullptr), error); |
||||
} |
||||
} |
||||
const int32_t kDataAlignmentBytes = 64; |
||||
const int32_t kTimeoutSecs = 5; |
||||
} // namespace
|
||||
|
||||
ChaoticGoodConnector::ChaoticGoodConnector( |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine) |
||||
: event_engine_(std::move(event_engine)), |
||||
handshake_mgr_(std::make_shared<HandshakeManager>()), |
||||
data_endpoint_latch_( |
||||
std::make_shared<Latch<std::shared_ptr<PromiseEndpoint>>>()), |
||||
wait_for_data_endpoint_callback_(std::make_shared<WaitForCallback>()) { |
||||
channel_args_ = channel_args_.SetObject(event_engine_); |
||||
channel_args_ = |
||||
channel_args_.Set(GRPC_ARG_RESOURCE_QUOTA, ResourceQuota::Default()); |
||||
} |
||||
|
||||
ChaoticGoodConnector::~ChaoticGoodConnector() { |
||||
if (connect_activity_ != nullptr) { |
||||
connect_activity_.reset(); |
||||
} |
||||
} |
||||
|
||||
auto ChaoticGoodConnector::DataEndpointReadSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self) { |
||||
GPR_ASSERT(self->data_endpoint_ != nullptr); |
||||
return TrySeq( |
||||
self->data_endpoint_->ReadSlice(FrameHeader::kFrameHeaderSize), |
||||
[self](Slice slice) mutable { |
||||
// Read setting frame;
|
||||
// Parse frame header
|
||||
GPR_ASSERT(self->data_endpoint_ != nullptr); |
||||
auto frame_header_ = |
||||
FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
||||
GRPC_SLICE_START_PTR(slice.c_slice()))); |
||||
return If( |
||||
frame_header_.ok(), |
||||
[frame_header_ = *frame_header_, self]() { |
||||
auto frame_header_length = frame_header_.GetFrameLength(); |
||||
return TrySeq(self->data_endpoint_->Read(frame_header_length), |
||||
[]() { return absl::OkStatus(); }); |
||||
}, |
||||
[status = frame_header_.status()]() { return status; }); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodConnector::DataEndpointWriteSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self) { |
||||
GPR_ASSERT(self->data_endpoint_ != nullptr); |
||||
return [self]() { |
||||
// Serialize setting frame.
|
||||
SettingsFrame frame; |
||||
// frame.header set connectiion_type: control
|
||||
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kData, |
||||
self->connection_id_, kDataAlignmentBytes} |
||||
.ToMetadataBatch(GetContext<Arena>()); |
||||
auto write_buffer = frame.Serialize(&self->hpack_compressor_); |
||||
return self->data_endpoint_->Write(std::move(write_buffer.control)); |
||||
}; |
||||
} |
||||
|
||||
auto ChaoticGoodConnector::WaitForDataEndpointSetup( |
||||
RefCountedPtr<ChaoticGoodConnector> self) { |
||||
// Data endpoint on_connect callback.
|
||||
grpc_event_engine::experimental::EventEngine::OnConnectCallback |
||||
on_data_endpoint_connect = |
||||
[self](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> |
||||
endpoint) mutable { |
||||
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { |
||||
auto error = GRPC_ERROR_CREATE("connect endpoint failed"); |
||||
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
||||
return; |
||||
} |
||||
self->data_endpoint_latch_->Set(std::make_shared<PromiseEndpoint>( |
||||
std::move(endpoint.value()), SliceBuffer())); |
||||
auto cb = self->wait_for_data_endpoint_callback_->MakeCallback(); |
||||
// Wake up wait_for_data_endpoint_callback_.
|
||||
cb(); |
||||
}; |
||||
self->event_engine_->Connect( |
||||
std::move(on_data_endpoint_connect), *self->resolved_addr_, |
||||
grpc_event_engine::experimental::ChannelArgsEndpointConfig( |
||||
self->channel_args_), |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"data_endpoint_connection"), |
||||
EventEngine::Duration(kTimeoutSecs)); |
||||
|
||||
return TrySeq( |
||||
self->wait_for_data_endpoint_callback_->MakeWaitPromise(), |
||||
Race(TrySeq( |
||||
self->data_endpoint_latch_->Wait(), |
||||
[self](std::shared_ptr<PromiseEndpoint> data_endpoint) mutable { |
||||
self->data_endpoint_.swap(data_endpoint); |
||||
return TrySeq( |
||||
DataEndpointWriteSettingsFrame(self), |
||||
DataEndpointReadSettingsFrame(self), |
||||
[]() -> absl::Status { return absl::OkStatus(); }); |
||||
}), |
||||
TrySeq(Sleep(Timestamp::Now() + Duration::Seconds(kTimeoutSecs)), |
||||
[]() -> absl::Status { |
||||
return absl::DeadlineExceededError( |
||||
"Data endpoint connect deadline exceeded."); |
||||
}))); |
||||
} |
||||
|
||||
auto ChaoticGoodConnector::ControlEndpointReadSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self) { |
||||
GPR_ASSERT(self->control_endpoint_ != nullptr); |
||||
return TrySeq( |
||||
self->control_endpoint_->ReadSlice(FrameHeader::kFrameHeaderSize), |
||||
[self](Slice slice) { |
||||
// Parse frame header
|
||||
auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
||||
GRPC_SLICE_START_PTR(slice.c_slice()))); |
||||
return If( |
||||
frame_header.ok(), |
||||
TrySeq( |
||||
self->control_endpoint_->Read(frame_header->GetFrameLength()), |
||||
[frame_header = *frame_header, self](SliceBuffer buffer) { |
||||
// Deserialize setting frame.
|
||||
SettingsFrame frame; |
||||
BufferPair buffer_pair{std::move(buffer), SliceBuffer()}; |
||||
auto status = frame.Deserialize( |
||||
&self->hpack_parser_, frame_header, |
||||
absl::BitGenRef(self->bitgen_), GetContext<Arena>(), |
||||
std::move(buffer_pair), FrameLimits{}); |
||||
if (!status.ok()) return status; |
||||
if (frame.headers == nullptr) { |
||||
return absl::UnavailableError("no settings headers"); |
||||
} |
||||
auto settings_metadata = |
||||
SettingsMetadata::FromMetadataBatch(*frame.headers); |
||||
if (!settings_metadata.ok()) { |
||||
return settings_metadata.status(); |
||||
} |
||||
if (!settings_metadata->connection_id.has_value()) { |
||||
return absl::UnavailableError( |
||||
"no connection id in settings frame"); |
||||
} |
||||
self->connection_id_ = *settings_metadata->connection_id; |
||||
return absl::OkStatus(); |
||||
}, |
||||
WaitForDataEndpointSetup(self)), |
||||
[status = frame_header.status()]() { return status; }); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodConnector::ControlEndpointWriteSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self) { |
||||
return [self]() { |
||||
GPR_ASSERT(self->control_endpoint_ != nullptr); |
||||
// Serialize setting frame.
|
||||
SettingsFrame frame; |
||||
// frame.header set connectiion_type: control
|
||||
frame.headers = SettingsMetadata{SettingsMetadata::ConnectionType::kControl, |
||||
absl::nullopt, absl::nullopt} |
||||
.ToMetadataBatch(GetContext<Arena>()); |
||||
auto write_buffer = frame.Serialize(&self->hpack_compressor_); |
||||
return self->control_endpoint_->Write(std::move(write_buffer.control)); |
||||
}; |
||||
} |
||||
|
||||
void ChaoticGoodConnector::Connect(const Args& args, Result* result, |
||||
grpc_closure* notify) { |
||||
{ |
||||
MutexLock lock(&mu_); |
||||
result_ = result; |
||||
if (is_shutdown_) { |
||||
auto error = GRPC_ERROR_CREATE("connector shutdown"); |
||||
MaybeNotify(DEBUG_LOCATION, notify, error); |
||||
return; |
||||
} |
||||
} |
||||
args_ = args; |
||||
notify_ = notify; |
||||
resolved_addr_ = EventEngine::ResolvedAddress( |
||||
reinterpret_cast<const sockaddr*>(args_.address->addr), |
||||
args_.address->len); |
||||
GPR_ASSERT(resolved_addr_.value().address() != nullptr); |
||||
grpc_event_engine::experimental::EventEngine::OnConnectCallback on_connect = |
||||
[self = RefAsSubclass<ChaoticGoodConnector>()]( |
||||
absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> |
||||
endpoint) mutable { |
||||
if (!endpoint.ok() || self->handshake_mgr_ == nullptr) { |
||||
auto error = GRPC_ERROR_CREATE("connect endpoint failed"); |
||||
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
||||
return; |
||||
} |
||||
ExecCtx exec_ctx; |
||||
auto* p = self.release(); |
||||
p->handshake_mgr_->DoHandshake( |
||||
grpc_event_engine_endpoint_create(std::move(endpoint.value())), |
||||
p->channel_args_, p->args_.deadline, nullptr /* acceptor */, |
||||
OnHandshakeDone, p); |
||||
}; |
||||
event_engine_->Connect( |
||||
std::move(on_connect), *resolved_addr_, |
||||
grpc_event_engine::experimental::ChannelArgsEndpointConfig(channel_args_), |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"data_endpoint_connection"), |
||||
EventEngine::Duration(kTimeoutSecs)); |
||||
} |
||||
|
||||
void ChaoticGoodConnector::OnHandshakeDone(void* arg, grpc_error_handle error) { |
||||
auto* args = static_cast<HandshakerArgs*>(arg); |
||||
RefCountedPtr<ChaoticGoodConnector> self( |
||||
static_cast<ChaoticGoodConnector*>(args->user_data)); |
||||
gpr_log(GPR_ERROR, "SubchannelConnector::OnHandshakeDone:%p", |
||||
static_cast<SubchannelConnector*>(self.get())); |
||||
grpc_slice_buffer_destroy(args->read_buffer); |
||||
gpr_free(args->read_buffer); |
||||
// Start receiving setting frames;
|
||||
{ |
||||
MutexLock lock(&self->mu_); |
||||
if (!error.ok() || self->is_shutdown_) { |
||||
if (error.ok()) { |
||||
error = GRPC_ERROR_CREATE("connector shutdown"); |
||||
// We were shut down after handshaking completed successfully, so
|
||||
// destroy the endpoint here.
|
||||
if (args->endpoint != nullptr) { |
||||
grpc_endpoint_shutdown(args->endpoint, error); |
||||
grpc_endpoint_destroy(args->endpoint); |
||||
} |
||||
} |
||||
self->result_->Reset(); |
||||
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
||||
return; |
||||
} |
||||
} |
||||
if (args->endpoint != nullptr) { |
||||
GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint( |
||||
args->endpoint)); |
||||
self->control_endpoint_ = std::make_shared<PromiseEndpoint>( |
||||
grpc_event_engine::experimental:: |
||||
grpc_take_wrapped_event_engine_endpoint(args->endpoint), |
||||
SliceBuffer()); |
||||
auto activity = MakeActivity( |
||||
[self] { |
||||
return TrySeq(ControlEndpointWriteSettingsFrame(self), |
||||
ControlEndpointReadSettingsFrame(self), |
||||
[]() { return absl::OkStatus(); }); |
||||
}, |
||||
EventEngineWakeupScheduler(self->event_engine_), |
||||
[self](absl::Status status) { |
||||
MaybeNotify(DEBUG_LOCATION, self->notify_, status); |
||||
}, |
||||
self->arena_.get(), self->event_engine_.get()); |
||||
MutexLock lock(&self->mu_); |
||||
if (!self->is_shutdown_) { |
||||
self->connect_activity_ = std::move(activity); |
||||
} |
||||
} else { |
||||
// Handshaking succeeded but there is no endpoint.
|
||||
MutexLock lock(&self->mu_); |
||||
self->result_->Reset(); |
||||
auto error = GRPC_ERROR_CREATE("handshake complete with empty endpoint."); |
||||
MaybeNotify(DEBUG_LOCATION, self->notify_, error); |
||||
} |
||||
} |
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,114 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_CHAOTIC_GOOD_CONNECTOR_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_CHAOTIC_GOOD_CONNECTOR_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <cstddef> |
||||
#include <cstdint> |
||||
#include <memory> |
||||
|
||||
#include "absl/random/random.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/ext/filters/client_channel/connector.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/wait_for_callback.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/transport/handshaker.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
class ChaoticGoodConnector : public SubchannelConnector { |
||||
public: |
||||
explicit ChaoticGoodConnector( |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> |
||||
event_engine); |
||||
~ChaoticGoodConnector() override; |
||||
void Connect(const Args& args, Result* result, grpc_closure* notify) override; |
||||
void Shutdown(grpc_error_handle error) override { |
||||
gpr_log(GPR_ERROR, "SubchannelConnector::Shutdown: %s; mgr=%p", |
||||
error.ToString().c_str(), handshake_mgr_.get()); |
||||
ActivityPtr connect_activity; |
||||
MutexLock lock(&mu_); |
||||
if (is_shutdown_) return; |
||||
is_shutdown_ = true; |
||||
if (handshake_mgr_ != nullptr) { |
||||
handshake_mgr_->Shutdown(error); |
||||
} |
||||
connect_activity = std::move(connect_activity_); |
||||
}; |
||||
|
||||
private: |
||||
static auto DataEndpointReadSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self); |
||||
static auto DataEndpointWriteSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self); |
||||
static auto ControlEndpointReadSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self); |
||||
static auto ControlEndpointWriteSettingsFrame( |
||||
RefCountedPtr<ChaoticGoodConnector> self); |
||||
static auto WaitForDataEndpointSetup( |
||||
RefCountedPtr<ChaoticGoodConnector> self); |
||||
static void OnHandshakeDone(void* arg, grpc_error_handle error); |
||||
|
||||
grpc_event_engine::experimental::MemoryAllocator memory_allocator_ = |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"connect_activity"); |
||||
ScopedArenaPtr arena_ = MakeScopedArena(1024, &memory_allocator_); |
||||
Mutex mu_; |
||||
Args args_; |
||||
Result* result_ ABSL_GUARDED_BY(mu_); |
||||
grpc_closure* notify_; |
||||
bool is_shutdown_ ABSL_GUARDED_BY(mu_) = false; |
||||
ChannelArgs channel_args_; |
||||
absl::StatusOr<grpc_event_engine::experimental::EventEngine::ResolvedAddress> |
||||
resolved_addr_; |
||||
|
||||
std::shared_ptr<PromiseEndpoint> control_endpoint_; |
||||
std::shared_ptr<PromiseEndpoint> data_endpoint_; |
||||
ActivityPtr connect_activity_ ABSL_GUARDED_BY(mu_); |
||||
const std::shared_ptr<grpc_event_engine::experimental::EventEngine> |
||||
event_engine_; |
||||
std::shared_ptr<HandshakeManager> handshake_mgr_; |
||||
HPackCompressor hpack_compressor_; |
||||
HPackParser hpack_parser_; |
||||
absl::BitGen bitgen_; |
||||
std::shared_ptr<Latch<std::shared_ptr<PromiseEndpoint>>> data_endpoint_latch_; |
||||
std::shared_ptr<WaitForCallback> wait_for_data_endpoint_callback_; |
||||
std::string connection_id_; |
||||
}; |
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CLIENT_CHAOTIC_GOOD_CONNECTOR_H
|
@ -0,0 +1,410 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/server/chaotic_good_server.h" |
||||
|
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <random> |
||||
#include <string> |
||||
#include <utility> |
||||
#include <vector> |
||||
|
||||
#include "absl/random/bit_gen_ref.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/frame.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
||||
#include "src/core/ext/transport/chaotic_good/settings_metadata.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/event_engine/channel_args_endpoint_config.h" |
||||
#include "src/core/lib/event_engine/default_event_engine.h" |
||||
#include "src/core/lib/event_engine/tcp_socket_utils.h" |
||||
#include "src/core/lib/gprpp/orphanable.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/status_helper.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/event_engine_shims/endpoint.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/if.h" |
||||
#include "src/core/lib/promise/latch.h" |
||||
#include "src/core/lib/promise/race.h" |
||||
#include "src/core/lib/promise/sleep.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "src/core/lib/transport/handshaker.h" |
||||
#include "src/core/lib/transport/metadata.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
namespace { |
||||
const Duration kConnectionDeadline = Duration::Seconds(5); |
||||
} // namespace
|
||||
|
||||
using grpc_event_engine::experimental::EventEngine; |
||||
ChaoticGoodServerListener::ChaoticGoodServerListener( |
||||
Server* server, const ChannelArgs& args, |
||||
absl::AnyInvocable<std::string()> connection_id_generator) |
||||
: server_(server), |
||||
args_(args), |
||||
event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()), |
||||
connection_id_generator_(std::move(connection_id_generator)) {} |
||||
|
||||
ChaoticGoodServerListener::~ChaoticGoodServerListener() { |
||||
event_engine_->Run([on_destroy_done = on_destroy_done_]() { |
||||
ExecCtx exec_ctx; |
||||
if (on_destroy_done != nullptr) { |
||||
ExecCtx::Run(DEBUG_LOCATION, on_destroy_done, absl::OkStatus()); |
||||
ExecCtx::Get()->Flush(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
absl::StatusOr<int> ChaoticGoodServerListener::Bind(const char* addr) { |
||||
EventEngine::Listener::AcceptCallback accept_cb = |
||||
[self = Ref()](std::unique_ptr<EventEngine::Endpoint> ep, |
||||
MemoryAllocator) { |
||||
ExecCtx exec_ctx; |
||||
MutexLock lock(&self->mu_); |
||||
self->connection_list_.emplace( |
||||
MakeOrphanable<ActiveConnection>(self, std::move(ep))); |
||||
}; |
||||
auto shutdown_cb = [](absl::Status status) { |
||||
if (!status.ok()) { |
||||
gpr_log(GPR_ERROR, "Server accept connection failed: %s", |
||||
StatusToString(status).c_str()); |
||||
} |
||||
}; |
||||
GPR_ASSERT(event_engine_ != nullptr); |
||||
auto ee_listener = event_engine_->CreateListener( |
||||
std::move(accept_cb), std::move(shutdown_cb), |
||||
grpc_event_engine::experimental::ChannelArgsEndpointConfig(args_), |
||||
std::make_unique<MemoryQuota>("chaotic_good_server_listener")); |
||||
if (!ee_listener.ok()) { |
||||
return ee_listener.status(); |
||||
} |
||||
ee_listener_ = std::move(ee_listener.value()); |
||||
auto resolved_addr = |
||||
grpc_event_engine::experimental::URIToResolvedAddress(addr); |
||||
GPR_ASSERT(resolved_addr.ok()); |
||||
if (!resolved_addr.ok()) { |
||||
return resolved_addr.status(); |
||||
} |
||||
auto port_num = ee_listener_->Bind(resolved_addr.value()); |
||||
if (!port_num.ok()) { |
||||
return port_num.status(); |
||||
} |
||||
server_->AddListener(OrphanablePtr<Server::ListenerInterface>(this)); |
||||
return port_num; |
||||
} |
||||
|
||||
absl::Status ChaoticGoodServerListener::StartListening() { |
||||
GPR_ASSERT(ee_listener_ != nullptr); |
||||
auto status = ee_listener_->Start(); |
||||
return status; |
||||
} |
||||
|
||||
ChaoticGoodServerListener::ActiveConnection::ActiveConnection( |
||||
RefCountedPtr<ChaoticGoodServerListener> listener, |
||||
std::unique_ptr<EventEngine::Endpoint> endpoint) |
||||
: InternallyRefCounted("ActiveConnection"), |
||||
memory_allocator_(listener->memory_allocator_), |
||||
listener_(listener) { |
||||
handshaking_state_ = MakeRefCounted<HandshakingState>(Ref()); |
||||
handshaking_state_->Start(std::move(endpoint)); |
||||
} |
||||
|
||||
ChaoticGoodServerListener::ActiveConnection::~ActiveConnection() { |
||||
if (receive_settings_activity_ != nullptr) receive_settings_activity_.reset(); |
||||
} |
||||
|
||||
void ChaoticGoodServerListener::ActiveConnection::NewConnectionID() { |
||||
bool has_new_id = false; |
||||
MutexLock lock(&listener_->mu_); |
||||
while (!has_new_id) { |
||||
connection_id_ = listener_->connection_id_generator_(); |
||||
if (!listener_->connectivity_map_.contains(connection_id_)) { |
||||
has_new_id = true; |
||||
} |
||||
} |
||||
listener_->connectivity_map_.emplace( |
||||
connection_id_, |
||||
std::make_shared<InterActivityLatch<std::shared_ptr<PromiseEndpoint>>>()); |
||||
} |
||||
|
||||
void ChaoticGoodServerListener::ActiveConnection::Fail( |
||||
absl::string_view error) { |
||||
gpr_log(GPR_ERROR, "ActiveConnection::Fail:%p %s", this, |
||||
std::string(error).c_str()); |
||||
// Can easily be holding various locks here: bounce through EE to ensure no
|
||||
// deadlocks.
|
||||
listener_->event_engine_->Run([self = Ref()]() { |
||||
ExecCtx exec_ctx; |
||||
OrphanablePtr<ActiveConnection> con; |
||||
MutexLock lock(&self->listener_->mu_); |
||||
auto v = self->listener_->connection_list_.extract(self.get()); |
||||
if (!v.empty()) con = std::move(v.value()); |
||||
}); |
||||
} |
||||
|
||||
ChaoticGoodServerListener::ActiveConnection::HandshakingState::HandshakingState( |
||||
RefCountedPtr<ActiveConnection> connection) |
||||
: memory_allocator_(connection->memory_allocator_), |
||||
connection_(std::move(connection)), |
||||
handshake_mgr_(MakeRefCounted<HandshakeManager>()) {} |
||||
|
||||
void ChaoticGoodServerListener::ActiveConnection::HandshakingState::Start( |
||||
std::unique_ptr<EventEngine::Endpoint> endpoint) { |
||||
handshake_mgr_->DoHandshake( |
||||
grpc_event_engine_endpoint_create(std::move(endpoint)), |
||||
connection_->args(), GetConnectionDeadline(), nullptr, OnHandshakeDone, |
||||
Ref().release()); |
||||
} |
||||
|
||||
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
EndpointReadSettingsFrame(RefCountedPtr<HandshakingState> self) { |
||||
return TrySeq( |
||||
self->connection_->endpoint_->ReadSlice(FrameHeader::kFrameHeaderSize), |
||||
[self](Slice slice) { |
||||
// Parse frame header
|
||||
auto frame_header = FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
||||
GRPC_SLICE_START_PTR(slice.c_slice()))); |
||||
return If( |
||||
frame_header.ok(), |
||||
[self, &frame_header]() { |
||||
return TrySeq( |
||||
self->connection_->endpoint_->Read( |
||||
frame_header->GetFrameLength()), |
||||
[frame_header = *frame_header, |
||||
self](SliceBuffer buffer) -> absl::StatusOr<bool> { |
||||
// Read Setting frame.
|
||||
SettingsFrame frame; |
||||
// Deserialize frame from read buffer.
|
||||
BufferPair buffer_pair{std::move(buffer), SliceBuffer()}; |
||||
auto status = frame.Deserialize( |
||||
&self->connection_->hpack_parser_, frame_header, |
||||
absl::BitGenRef(self->connection_->bitgen_), |
||||
GetContext<Arena>(), std::move(buffer_pair), |
||||
FrameLimits{}); |
||||
if (!status.ok()) return status; |
||||
if (frame.headers == nullptr) { |
||||
return absl::UnavailableError("no settings headers"); |
||||
} |
||||
auto settings_metadata = |
||||
SettingsMetadata::FromMetadataBatch(*frame.headers); |
||||
if (!settings_metadata.ok()) { |
||||
return settings_metadata.status(); |
||||
} |
||||
const bool is_control_endpoint = |
||||
settings_metadata->connection_type == |
||||
SettingsMetadata::ConnectionType::kControl; |
||||
if (!is_control_endpoint) { |
||||
if (!settings_metadata->connection_id.has_value()) { |
||||
return absl::UnavailableError( |
||||
"no connection id in data endpoint settings frame"); |
||||
} |
||||
if (!settings_metadata->alignment.has_value()) { |
||||
return absl::UnavailableError( |
||||
"no alignment in data endpoint settings frame"); |
||||
} |
||||
// Get connection-id and data-alignment for data endpoint.
|
||||
self->connection_->connection_id_ = |
||||
*settings_metadata->connection_id; |
||||
self->connection_->data_alignment_ = |
||||
*settings_metadata->alignment; |
||||
} |
||||
return is_control_endpoint; |
||||
}); |
||||
}, |
||||
[&frame_header]() { |
||||
return [r = frame_header.status()]() -> absl::StatusOr<bool> { |
||||
return r; |
||||
}; |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
WaitForDataEndpointSetup(RefCountedPtr<HandshakingState> self) { |
||||
return Race(TrySeq( |
||||
[]() { |
||||
// TODO(ladynana): find a way to resolve SeqState to actual
|
||||
// value.
|
||||
return absl::OkStatus(); |
||||
}, |
||||
[self]() { |
||||
MutexLock lock(&self->connection_->listener_->mu_); |
||||
auto latch = self->connection_->listener_->connectivity_map_ |
||||
.find(self->connection_->connection_id_) |
||||
->second; |
||||
return latch->Wait(); |
||||
}, |
||||
[](std::shared_ptr<PromiseEndpoint> ret) -> absl::Status { |
||||
if (ret == nullptr) { |
||||
return absl::UnavailableError("no data endpoint"); |
||||
} |
||||
// TODO(ladynana): initialize server transport.
|
||||
return absl::OkStatus(); |
||||
}), |
||||
// Set timeout for waiting data endpoint connect.
|
||||
TrySeq( |
||||
// []() {
|
||||
Sleep(Timestamp::Now() + kConnectionDeadline), |
||||
[self]() mutable -> absl::Status { |
||||
MutexLock lock(&self->connection_->listener_->mu_); |
||||
// Delete connection id from map when timeout;
|
||||
self->connection_->listener_->connectivity_map_.erase( |
||||
self->connection_->connection_id_); |
||||
return absl::DeadlineExceededError("Deadline exceeded."); |
||||
})); |
||||
} |
||||
|
||||
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
ControlEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) { |
||||
return TrySeq( |
||||
[self]() { |
||||
self->connection_->NewConnectionID(); |
||||
SettingsFrame frame; |
||||
frame.headers = |
||||
SettingsMetadata{absl::nullopt, self->connection_->connection_id_, |
||||
absl::nullopt} |
||||
.ToMetadataBatch(GetContext<Arena>()); |
||||
auto write_buffer = |
||||
frame.Serialize(&self->connection_->hpack_compressor_); |
||||
return self->connection_->endpoint_->Write( |
||||
std::move(write_buffer.control)); |
||||
}, |
||||
WaitForDataEndpointSetup(self)); |
||||
} |
||||
|
||||
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
DataEndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self) { |
||||
return TrySeq( |
||||
[self]() { |
||||
// Send data endpoint setting frame
|
||||
SettingsFrame frame; |
||||
frame.headers = |
||||
SettingsMetadata{absl::nullopt, self->connection_->connection_id_, |
||||
self->connection_->data_alignment_} |
||||
.ToMetadataBatch(GetContext<Arena>()); |
||||
auto write_buffer = |
||||
frame.Serialize(&self->connection_->hpack_compressor_); |
||||
return self->connection_->endpoint_->Write( |
||||
std::move(write_buffer.control)); |
||||
}, |
||||
[self]() mutable { |
||||
MutexLock lock(&self->connection_->listener_->mu_); |
||||
// Set endpoint to latch
|
||||
auto it = self->connection_->listener_->connectivity_map_.find( |
||||
self->connection_->connection_id_); |
||||
if (it == self->connection_->listener_->connectivity_map_.end()) { |
||||
return absl::InternalError( |
||||
absl::StrCat("Connection not in map: ", |
||||
absl::CEscape(self->connection_->connection_id_))); |
||||
} |
||||
it->second->Set(std::move(self->connection_->endpoint_)); |
||||
return absl::OkStatus(); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
EndpointWriteSettingsFrame(RefCountedPtr<HandshakingState> self, |
||||
bool is_control_endpoint) { |
||||
return If(is_control_endpoint, ControlEndpointWriteSettingsFrame(self), |
||||
DataEndpointWriteSettingsFrame(self)); |
||||
} |
||||
|
||||
void ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
OnHandshakeDone(void* arg, grpc_error_handle error) { |
||||
auto* args = static_cast<HandshakerArgs*>(arg); |
||||
GPR_ASSERT(args != nullptr); |
||||
RefCountedPtr<HandshakingState> self( |
||||
static_cast<HandshakingState*>(args->user_data)); |
||||
grpc_slice_buffer_destroy(args->read_buffer); |
||||
gpr_free(args->read_buffer); |
||||
if (!error.ok()) { |
||||
self->connection_->Fail( |
||||
absl::StrCat("Handshake failed: ", StatusToString(error))); |
||||
return; |
||||
} |
||||
if (args->endpoint == nullptr) { |
||||
self->connection_->Fail("Server handshake done but has empty endpoint."); |
||||
return; |
||||
} |
||||
GPR_ASSERT(grpc_event_engine::experimental::grpc_is_event_engine_endpoint( |
||||
args->endpoint)); |
||||
self->connection_->endpoint_ = std::make_shared<PromiseEndpoint>( |
||||
grpc_event_engine::experimental::grpc_take_wrapped_event_engine_endpoint( |
||||
args->endpoint), |
||||
SliceBuffer()); |
||||
auto activity = MakeActivity( |
||||
[self]() { |
||||
return TrySeq(Race(EndpointReadSettingsFrame(self), |
||||
TrySeq(Sleep(Timestamp::Now() + kConnectionDeadline), |
||||
[]() -> absl::StatusOr<bool> { |
||||
return absl::DeadlineExceededError( |
||||
"Waiting for initial settings frame"); |
||||
})), |
||||
[self](bool is_control_endpoint) { |
||||
return EndpointWriteSettingsFrame(self, |
||||
is_control_endpoint); |
||||
}); |
||||
}, |
||||
EventEngineWakeupScheduler( |
||||
grpc_event_engine::experimental::GetDefaultEventEngine()), |
||||
[self](absl::Status status) { |
||||
if (!status.ok()) { |
||||
self->connection_->Fail( |
||||
absl::StrCat("Server setting frame handling failed: ", |
||||
StatusToString(status))); |
||||
} |
||||
}, |
||||
self->connection_->arena_.get(), |
||||
grpc_event_engine::experimental::GetDefaultEventEngine().get()); |
||||
MutexLock lock(&self->connection_->mu_); |
||||
if (self->connection_->orphaned_) return; |
||||
self->connection_->receive_settings_activity_ = std::move(activity); |
||||
} |
||||
|
||||
Timestamp ChaoticGoodServerListener::ActiveConnection::HandshakingState:: |
||||
GetConnectionDeadline() { |
||||
if (connection_->args().Contains(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS)) { |
||||
return Timestamp::Now() + |
||||
connection_->args() |
||||
.GetDurationFromIntMillis(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS) |
||||
.value(); |
||||
} |
||||
return Timestamp::Now() + kConnectionDeadline; |
||||
} |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,202 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <cstddef> |
||||
#include <cstdint> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/container/flat_hash_map.h" |
||||
#include "absl/random/random.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channelz.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/iomgr_fwd.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/inter_activity_latch.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "src/core/lib/transport/handshaker.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
class ChaoticGoodServerListener final |
||||
: public Server::ListenerInterface, |
||||
public RefCounted<ChaoticGoodServerListener> { |
||||
public: |
||||
static absl::AnyInvocable<std::string()> DefaultConnectionIDGenerator() { |
||||
return [bitgen = absl::BitGen()]() mutable { |
||||
return absl::StrCat(absl::Hex(absl::Uniform<uint64_t>(bitgen))); |
||||
}; |
||||
} |
||||
|
||||
ChaoticGoodServerListener( |
||||
Server* server, const ChannelArgs& args, |
||||
absl::AnyInvocable<std::string()> connection_id_generator = |
||||
DefaultConnectionIDGenerator()); |
||||
~ChaoticGoodServerListener() override; |
||||
// Bind address to EventEngine listener.
|
||||
absl::StatusOr<int> Bind(const char* addr); |
||||
absl::Status StartListening(); |
||||
const ChannelArgs& args() const { return args_; } |
||||
void Orphan() override { |
||||
gpr_log(GPR_INFO, "ORPHAN"); |
||||
{ |
||||
absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list; |
||||
MutexLock lock(&mu_); |
||||
connection_list = std::move(connection_list_); |
||||
} |
||||
ee_listener_.reset(); |
||||
Unref(); |
||||
gpr_log(GPR_INFO, "~ORPHAN"); |
||||
}; |
||||
|
||||
class ActiveConnection : public InternallyRefCounted<ActiveConnection> { |
||||
public: |
||||
ActiveConnection( |
||||
RefCountedPtr<ChaoticGoodServerListener> listener, |
||||
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> |
||||
endpoint); |
||||
~ActiveConnection() override; |
||||
const ChannelArgs& args() const { return listener_->args(); } |
||||
|
||||
void Orphan() override { |
||||
gpr_log(GPR_INFO, "ORPHAN ActiveConnection:%p", this); |
||||
if (handshaking_state_ != nullptr) { |
||||
handshaking_state_->Shutdown(); |
||||
handshaking_state_.reset(); |
||||
} |
||||
ActivityPtr activity; |
||||
{ |
||||
MutexLock lock(&mu_); |
||||
orphaned_ = true; |
||||
activity = std::move(receive_settings_activity_); |
||||
} |
||||
activity.reset(); |
||||
Unref(); |
||||
gpr_log(GPR_INFO, "~ORPHAN ActiveConnection"); |
||||
} |
||||
|
||||
class HandshakingState : public RefCounted<HandshakingState> { |
||||
public: |
||||
explicit HandshakingState(RefCountedPtr<ActiveConnection> connection); |
||||
~HandshakingState() override{}; |
||||
void Start(std::unique_ptr< |
||||
grpc_event_engine::experimental::EventEngine::Endpoint> |
||||
endpoint); |
||||
|
||||
void Shutdown() { |
||||
gpr_log(GPR_INFO, "Shutdown:%p", this); |
||||
handshake_mgr_->Shutdown(absl::CancelledError("Shutdown")); |
||||
} |
||||
|
||||
private: |
||||
static auto EndpointReadSettingsFrame( |
||||
RefCountedPtr<HandshakingState> self); |
||||
static auto EndpointWriteSettingsFrame( |
||||
RefCountedPtr<HandshakingState> self, bool is_control_endpoint); |
||||
static auto WaitForDataEndpointSetup( |
||||
RefCountedPtr<HandshakingState> self); |
||||
static auto ControlEndpointWriteSettingsFrame( |
||||
RefCountedPtr<HandshakingState> self); |
||||
static auto DataEndpointWriteSettingsFrame( |
||||
RefCountedPtr<HandshakingState> self); |
||||
|
||||
static void OnHandshakeDone(void* arg, grpc_error_handle error); |
||||
Timestamp GetConnectionDeadline(); |
||||
const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> |
||||
memory_allocator_; |
||||
const RefCountedPtr<ActiveConnection> connection_; |
||||
const RefCountedPtr<HandshakeManager> handshake_mgr_; |
||||
}; |
||||
|
||||
private: |
||||
void Fail(absl::string_view error); |
||||
void NewConnectionID(); |
||||
const std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> |
||||
memory_allocator_; |
||||
ScopedArenaPtr arena_ = MakeScopedArena(1024, memory_allocator_.get()); |
||||
const RefCountedPtr<ChaoticGoodServerListener> listener_; |
||||
RefCountedPtr<HandshakingState> handshaking_state_; |
||||
Mutex mu_; |
||||
ActivityPtr receive_settings_activity_ ABSL_GUARDED_BY(mu_); |
||||
bool orphaned_ ABSL_GUARDED_BY(mu_) = false; |
||||
std::shared_ptr<PromiseEndpoint> endpoint_; |
||||
HPackCompressor hpack_compressor_; |
||||
HPackParser hpack_parser_; |
||||
absl::BitGen bitgen_; |
||||
std::string connection_id_; |
||||
int32_t data_alignment_; |
||||
}; |
||||
|
||||
void Start(Server*, const std::vector<grpc_pollset*>*) override { |
||||
StartListening().IgnoreError(); |
||||
}; |
||||
|
||||
channelz::ListenSocketNode* channelz_listen_socket_node() const override { |
||||
return nullptr; |
||||
} |
||||
|
||||
void SetOnDestroyDone(grpc_closure* closure) override { |
||||
MutexLock lock(&mu_); |
||||
on_destroy_done_ = closure; |
||||
}; |
||||
|
||||
private: |
||||
Server* server_; |
||||
ChannelArgs args_; |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; |
||||
std::unique_ptr<grpc_event_engine::experimental::EventEngine::Listener> |
||||
ee_listener_; |
||||
Mutex mu_; |
||||
// Map of connection id to endpoints connectivity.
|
||||
absl::flat_hash_map< |
||||
std::string, |
||||
std::shared_ptr<InterActivityLatch<std::shared_ptr<PromiseEndpoint>>>> |
||||
connectivity_map_ ABSL_GUARDED_BY(mu_); |
||||
absl::flat_hash_set<OrphanablePtr<ActiveConnection>> connection_list_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
absl::AnyInvocable<std::string()> connection_id_generator_ |
||||
ABSL_GUARDED_BY(mu_); |
||||
grpc_closure* on_destroy_done_ ABSL_GUARDED_BY(mu_) = nullptr; |
||||
std::shared_ptr<grpc_event_engine::experimental::MemoryAllocator> |
||||
memory_allocator_ = |
||||
std::make_shared<grpc_event_engine::experimental::MemoryAllocator>( |
||||
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( |
||||
"server_connection")); |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_CHAOTIC_GOOD_SERVER_H
|
@ -0,0 +1,81 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/settings_metadata.h" |
||||
|
||||
#include "absl/status/status.h" |
||||
|
||||
#include "src/core/lib/gprpp/crash.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
Arena::PoolPtr<grpc_metadata_batch> SettingsMetadata::ToMetadataBatch( |
||||
Arena* arena) { |
||||
auto md = Arena::MakePooled<grpc_metadata_batch>(arena); |
||||
auto add = [&md](absl::string_view key, std::string value) { |
||||
md->Append(key, Slice::FromCopiedString(value), |
||||
[key, value](absl::string_view error, const Slice&) { |
||||
Crash(absl::StrCat("Failed to add metadata '", key, "' = '", |
||||
value, "': ", error)); |
||||
}); |
||||
}; |
||||
if (connection_type.has_value()) { |
||||
add("chaotic-good-connection-type", |
||||
connection_type.value() == ConnectionType::kControl ? "control" |
||||
: "data"); |
||||
} |
||||
if (connection_id.has_value()) { |
||||
add("chaotic-good-connection-id", connection_id.value()); |
||||
} |
||||
if (alignment.has_value()) { |
||||
add("chaotic-good-alignment", absl::StrCat(alignment.value())); |
||||
} |
||||
return md; |
||||
} |
||||
|
||||
absl::StatusOr<SettingsMetadata> SettingsMetadata::FromMetadataBatch( |
||||
const grpc_metadata_batch& batch) { |
||||
SettingsMetadata md; |
||||
std::string buffer; |
||||
auto v = batch.GetStringValue("chaotic-good-connection-type", &buffer); |
||||
if (v.has_value()) { |
||||
if (*v == "control") { |
||||
md.connection_type = ConnectionType::kControl; |
||||
} else if (*v == "data") { |
||||
md.connection_type = ConnectionType::kData; |
||||
} else { |
||||
return absl::UnavailableError( |
||||
absl::StrCat("Invalid connection type: ", *v)); |
||||
} |
||||
} |
||||
v = batch.GetStringValue("chaotic-good-connection-id", &buffer); |
||||
if (v.has_value()) { |
||||
md.connection_id = std::string(*v); |
||||
} |
||||
v = batch.GetStringValue("chaotic-good-alignment", &buffer); |
||||
if (v.has_value()) { |
||||
uint32_t alignment; |
||||
if (!absl::SimpleAtoi(*v, &alignment)) { |
||||
return absl::UnavailableError(absl::StrCat("Invalid alignment: ", *v)); |
||||
} |
||||
md.alignment = alignment; |
||||
} |
||||
return md; |
||||
} |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,46 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SETTINGS_METADATA_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SETTINGS_METADATA_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/types/optional.h" |
||||
|
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
// Captures metadata sent in a chaotic good settings frame.
|
||||
struct SettingsMetadata { |
||||
enum class ConnectionType { |
||||
kControl, |
||||
kData, |
||||
}; |
||||
absl::optional<ConnectionType> connection_type; |
||||
absl::optional<std::string> connection_id; |
||||
absl::optional<uint32_t> alignment; |
||||
|
||||
Arena::PoolPtr<grpc_metadata_batch> ToMetadataBatch(Arena* arena); |
||||
static absl::StatusOr<SettingsMetadata> FromMetadataBatch( |
||||
const grpc_metadata_batch& batch); |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SETTINGS_METADATA_H
|
@ -0,0 +1,34 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_TEST_CORE_END2END_FUZZERS_SERVER_FUZZER_H |
||||
#define GRPC_TEST_CORE_END2END_FUZZERS_SERVER_FUZZER_H |
||||
|
||||
#include "absl/functional/function_ref.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "test/core/end2end/fuzzers/fuzzer_input.pb.h" |
||||
|
||||
namespace grpc_core { |
||||
|
||||
void RunServerFuzzer( |
||||
const fuzzer_input::Msg& msg, |
||||
absl::FunctionRef<void(grpc_server*, int, const ChannelArgs&)> |
||||
server_setup); |
||||
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_TEST_CORE_END2END_FUZZERS_SERVER_FUZZER_H
|
@ -0,0 +1,36 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/grpc_security.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/server/chaotic_good_server.h" |
||||
#include "src/libfuzzer/libfuzzer_macro.h" |
||||
#include "test/core/end2end/fuzzers/server_fuzzer.h" |
||||
|
||||
DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) { |
||||
grpc_core::RunServerFuzzer( |
||||
msg, [](grpc_server* server, int port_num, |
||||
const grpc_core::ChannelArgs& channel_args) { |
||||
grpc_core::ExecCtx exec_ctx; |
||||
auto* listener = new grpc_core::chaotic_good::ChaoticGoodServerListener( |
||||
grpc_core::Server::FromC(server), channel_args, |
||||
[next = uint64_t(0)]() mutable { |
||||
return absl::StrCat(absl::Hex(next++)); |
||||
}); |
||||
auto port = |
||||
listener->Bind(absl::StrCat("ipv4:0.0.0.0:", port_num).c_str()); |
||||
GPR_ASSERT(port.ok()); |
||||
GPR_ASSERT(port.value() == port_num); |
||||
}); |
||||
} |
@ -0,0 +1,5 @@ |
||||
network_input { |
||||
} |
||||
config_vars { |
||||
trace: "" |
||||
} |
@ -0,0 +1,4 @@ |
||||
config_vars { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,6 @@ |
||||
channel_args { |
||||
args { |
||||
resource_quota { |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,6 @@ |
||||
api_actions { |
||||
} |
||||
event_engine_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,2 @@ |
||||
api_actions { |
||||
} |
@ -0,0 +1,4 @@ |
||||
api_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,2 @@ |
||||
config_vars { |
||||
} |
@ -0,0 +1,2 @@ |
||||
channel_args { |
||||
} |
@ -0,0 +1,6 @@ |
||||
api_actions { |
||||
} |
||||
api_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,6 @@ |
||||
api_actions { |
||||
create_server { |
||||
} |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,5 @@ |
||||
config_vars { |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,3 @@ |
||||
config_vars { |
||||
trace: "" |
||||
} |
@ -0,0 +1,25 @@ |
||||
network_input { |
||||
single_read_bytes: "\005\000" |
||||
connect_delay_ms: 4352 |
||||
endpoint_config { |
||||
args { |
||||
resource_quota { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
resize_resource_quota: 0 |
||||
} |
||||
api_actions { |
||||
create_call { |
||||
host { |
||||
value: "\001\000\000\024" |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
verbosity: "" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,5 @@ |
||||
network_input { |
||||
single_read_bytes: "\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357\357" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,20 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
header { |
||||
simple_header { |
||||
chaotic_good_alignment: "X" |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
stacktrace_minloglevel: "" |
||||
trace: "" |
||||
} |
||||
channel_args { |
||||
args { |
||||
i: 8 |
||||
} |
||||
} |
@ -0,0 +1,17 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
header { |
||||
raw_bytes: "\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305\305" |
||||
} |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
close_channel { |
||||
} |
||||
} |
||||
api_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,12 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
client_prefix { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,10 @@ |
||||
network_input { |
||||
single_read_bytes: "\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213\213" |
||||
connect_delay_ms: 96 |
||||
} |
||||
api_actions { |
||||
} |
||||
event_engine_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,14 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
goaway { |
||||
debug_data: "\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377" |
||||
} |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
verbosity: "" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,22 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
chaotic_good { |
||||
headers_simple_header { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
endpoint_config { |
||||
args { |
||||
i: 64 |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
} |
||||
config_vars { |
||||
dns_resolver: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,40 @@ |
||||
network_input { |
||||
single_read_bytes: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" |
||||
endpoint_config { |
||||
args { |
||||
key: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" |
||||
} |
||||
} |
||||
} |
||||
network_input { |
||||
connect_timeout_ms: 11776 |
||||
} |
||||
network_input { |
||||
connect_timeout_ms: -6 |
||||
} |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
chaotic_good { |
||||
headers_simple_header { |
||||
x_envoy_peer_metadata: "+" |
||||
path: "\000\000" |
||||
grpc_retry_pushback_ms: "7" |
||||
chaotic_good_connection_id: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" |
||||
chaotic_good_alignment: "7" |
||||
} |
||||
} |
||||
} |
||||
} |
||||
connect_timeout_ms: 768 |
||||
endpoint_config { |
||||
args { |
||||
i: 64 |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,17 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
chaotic_good { |
||||
headers_raw_bytes: "" |
||||
trailers_none { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
stacktrace_minloglevel: "" |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,13 @@ |
||||
network_input { |
||||
single_read_bytes: ";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;" |
||||
} |
||||
api_actions { |
||||
request_call { |
||||
} |
||||
} |
||||
api_actions { |
||||
cancel_all_calls_if_shutdown { |
||||
} |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,8 @@ |
||||
network_input { |
||||
single_read_bytes: "\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177\177" |
||||
} |
||||
config_vars { |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,5 @@ |
||||
network_input { |
||||
single_read_bytes: "\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,13 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
client_prefix { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
config_vars { |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,7 @@ |
||||
network_input { |
||||
single_read_bytes: "GGGGGGGGGGGGGGGGGGGGGGGGGGG" |
||||
} |
||||
api_actions { |
||||
} |
||||
config_vars { |
||||
} |
@ -0,0 +1,16 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
rst_stream { |
||||
error_code: 1694498816 |
||||
} |
||||
} |
||||
segments { |
||||
rst_stream { |
||||
error_code: 1694498816 |
||||
} |
||||
} |
||||
} |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,34 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
chaotic_good { |
||||
headers_simple_header { |
||||
headers { |
||||
} |
||||
user_agent: "" |
||||
chaotic_good_connection_id: "\001\000\000\001" |
||||
chaotic_good_alignment: "0" |
||||
} |
||||
} |
||||
} |
||||
} |
||||
endpoint_config { |
||||
args { |
||||
key: "\'" |
||||
i: 64 |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
} |
||||
api_actions { |
||||
sleep_ms: 4096 |
||||
} |
||||
config_vars { |
||||
dns_resolver: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" |
||||
} |
||||
channel_args { |
||||
args { |
||||
key: "\000\000\000\000\000\017B@" |
||||
} |
||||
} |
@ -0,0 +1,5 @@ |
||||
network_input { |
||||
single_read_bytes: "\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207\207" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,10 @@ |
||||
network_input { |
||||
single_read_bytes: "\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377\377" |
||||
} |
||||
network_input { |
||||
} |
||||
config_vars { |
||||
experiments: 0 |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,8 @@ |
||||
network_input { |
||||
} |
||||
api_actions { |
||||
} |
||||
event_engine_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,14 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
client_prefix { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
event_engine_actions { |
||||
} |
||||
config_vars { |
||||
stacktrace_minloglevel: "SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS" |
||||
trace: "" |
||||
} |
@ -0,0 +1,7 @@ |
||||
network_input { |
||||
} |
||||
network_input { |
||||
single_read_bytes: "\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035\035" |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,11 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
client_prefix { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
network_input { |
||||
connect_delay_ms: 32000 |
||||
} |
@ -0,0 +1,14 @@ |
||||
network_input { |
||||
input_segments { |
||||
segments { |
||||
client_prefix { |
||||
} |
||||
} |
||||
} |
||||
} |
||||
api_actions { |
||||
} |
||||
api_actions { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,11 @@ |
||||
network_input { |
||||
single_read_bytes: "222222222222222222222222222222222222222222222222222222222222222222222" |
||||
} |
||||
api_actions { |
||||
create_server { |
||||
channel_args { |
||||
} |
||||
} |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,2 @@ |
||||
event_engine_actions { |
||||
} |
@ -0,0 +1,4 @@ |
||||
network_input { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,6 @@ |
||||
network_input { |
||||
} |
||||
network_input { |
||||
} |
||||
channel_args { |
||||
} |
@ -0,0 +1,28 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/grpc_security.h> |
||||
|
||||
#include "src/libfuzzer/libfuzzer_macro.h" |
||||
#include "test/core/end2end/fuzzers/server_fuzzer.h" |
||||
|
||||
DEFINE_PROTO_FUZZER(const fuzzer_input::Msg& msg) { |
||||
grpc_core::RunServerFuzzer(msg, [](grpc_server* server, int port_num, |
||||
const grpc_core::ChannelArgs&) { |
||||
auto* creds = grpc_insecure_server_credentials_create(); |
||||
grpc_server_add_http2_port( |
||||
server, absl::StrCat("0.0.0.0:", port_num).c_str(), creds); |
||||
grpc_server_credentials_release(creds); |
||||
}); |
||||
} |
@ -0,0 +1,149 @@ |
||||
// Copyright 2024 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/server/chaotic_good_server.h" |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/time/time.h" |
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/status.h> |
||||
#include <grpcpp/server.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/client/chaotic_good_connector.h" |
||||
#include "src/core/lib/address_utils/parse_address.h" |
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/gprpp/notification.h" |
||||
#include "src/core/lib/gprpp/time.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/surface/server.h" |
||||
#include "src/core/lib/uri/uri_parser.h" |
||||
#include "test/core/event_engine/event_engine_test_utils.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
namespace testing { |
||||
using grpc_event_engine::experimental::EventEngine; |
||||
class ChaoticGoodServerTest : public ::testing::Test { |
||||
public: |
||||
ChaoticGoodServerTest() { |
||||
event_engine_ = std::shared_ptr<EventEngine>( |
||||
grpc_event_engine::experimental::CreateEventEngine()); |
||||
StartServer(); |
||||
ConstructConnector(); |
||||
} |
||||
~ChaoticGoodServerTest() override { |
||||
args_.channel_args = ChannelArgs(); |
||||
if (connector_ != nullptr) connector_->Shutdown(absl::CancelledError()); |
||||
connector_.reset(); |
||||
auto* shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr); |
||||
grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr); |
||||
auto ev = grpc_completion_queue_pluck( |
||||
shutdown_cq, nullptr, grpc_timeout_milliseconds_to_deadline(15000), |
||||
nullptr); |
||||
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); |
||||
GPR_ASSERT(ev.tag == nullptr); |
||||
grpc_completion_queue_destroy(shutdown_cq); |
||||
grpc_server_destroy(server_); |
||||
grpc_event_engine::experimental::WaitForSingleOwner( |
||||
std::move(event_engine_)); |
||||
} |
||||
|
||||
void StartServer() { |
||||
port_ = grpc_pick_unused_port_or_die(); |
||||
addr_ = absl::StrCat("ipv6:[::1]:", port_); |
||||
server_ = grpc_server_create(nullptr, nullptr); |
||||
core_server_ = Server::FromC(server_); |
||||
auto* listener = |
||||
new ChaoticGoodServerListener(core_server_, channel_args()); |
||||
auto port = listener->Bind(addr_.c_str()); |
||||
EXPECT_TRUE(port.ok()); |
||||
EXPECT_EQ(port.value(), port_); |
||||
grpc_server_start(server_); |
||||
} |
||||
|
||||
void ConstructConnector() { |
||||
auto uri = URI::Parse(addr_); |
||||
GPR_ASSERT(uri.ok()); |
||||
GPR_ASSERT(grpc_parse_uri(*uri, &resolved_addr_)); |
||||
args_.address = &resolved_addr_; |
||||
args_.deadline = Timestamp::Now() + Duration::Seconds(5); |
||||
args_.channel_args = channel_args(); |
||||
connector_ = MakeRefCounted<ChaoticGoodConnector>(event_engine_); |
||||
} |
||||
|
||||
protected: |
||||
static void OnConnectingFinished(void* arg, grpc_error_handle error) { |
||||
gpr_log(GPR_ERROR, "OnConnectingFinished: %p %s", arg, |
||||
error.ToString().c_str()); |
||||
Notification* connect_finished_ = static_cast<Notification*>(arg); |
||||
connect_finished_->Notify(); |
||||
} |
||||
|
||||
ChannelArgs channel_args() { |
||||
return ChannelArgs() |
||||
.SetObject(event_engine_) |
||||
.Set(GRPC_ARG_RESOURCE_QUOTA, ResourceQuota::Default()); |
||||
} |
||||
|
||||
grpc_server* server_; |
||||
Server* core_server_; |
||||
ChaoticGoodConnector::Args args_; |
||||
ChaoticGoodConnector::Result connecting_result_; |
||||
grpc_closure on_connecting_finished_; |
||||
int port_; |
||||
std::string addr_; |
||||
grpc_resolved_address resolved_addr_; |
||||
RefCountedPtr<ChaoticGoodConnector> connector_; |
||||
std::shared_ptr<EventEngine> event_engine_; |
||||
}; |
||||
|
||||
TEST_F(ChaoticGoodServerTest, Connect) { |
||||
Notification connect_finished; |
||||
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, |
||||
&connect_finished, grpc_schedule_on_exec_ctx); |
||||
connector_->Connect(args_, &connecting_result_, &on_connecting_finished_); |
||||
connect_finished.WaitForNotification(); |
||||
} |
||||
|
||||
TEST_F(ChaoticGoodServerTest, ConnectAndShutdown) { |
||||
Notification connect_finished; |
||||
GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, |
||||
&connect_finished, grpc_schedule_on_exec_ctx); |
||||
connector_->Connect(args_, &connecting_result_, &on_connecting_finished_); |
||||
connector_->Shutdown(absl::InternalError("shutdown")); |
||||
connect_finished.WaitForNotification(); |
||||
} |
||||
|
||||
} // namespace testing
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
// Must call to create default EventEngine.
|
||||
grpc_init(); |
||||
int ret = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return ret; |
||||
} |
Loading…
Reference in new issue