mirror of https://github.com/grpc/grpc.git
Merge https://github.com/grpc/grpc into scsf
commit
789b365616
104 changed files with 3567 additions and 1452 deletions
@ -0,0 +1,19 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" |
||||
|
||||
namespace grpc_core {} // namespace grpc_core
|
@ -0,0 +1,111 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "absl/random/random.h" |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/frame.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
||||
#include "src/core/lib/promise/if.h" |
||||
#include "src/core/lib/promise/promise.h" |
||||
#include "src/core/lib/promise/try_join.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
class ChaoticGoodTransport { |
||||
public: |
||||
ChaoticGoodTransport(std::unique_ptr<PromiseEndpoint> control_endpoint, |
||||
std::unique_ptr<PromiseEndpoint> data_endpoint) |
||||
: control_endpoint_(std::move(control_endpoint)), |
||||
data_endpoint_(std::move(data_endpoint)) {} |
||||
|
||||
auto WriteFrame(const FrameInterface& frame) { |
||||
auto buffers = frame.Serialize(&encoder_); |
||||
return TryJoin<absl::StatusOr>( |
||||
control_endpoint_->Write(std::move(buffers.control)), |
||||
data_endpoint_->Write(std::move(buffers.data))); |
||||
} |
||||
|
||||
// Read frame header and payloads for control and data portions of one frame.
|
||||
// Resolves to StatusOr<tuple<FrameHeader, BufferPair>>.
|
||||
auto ReadFrameBytes() { |
||||
return TrySeq( |
||||
control_endpoint_->ReadSlice(FrameHeader::frame_header_size_), |
||||
[this](Slice read_buffer) { |
||||
auto frame_header = |
||||
FrameHeader::Parse(reinterpret_cast<const uint8_t*>( |
||||
GRPC_SLICE_START_PTR(read_buffer.c_slice()))); |
||||
// Read header and trailers from control endpoint.
|
||||
// Read message padding and message from data endpoint.
|
||||
return If( |
||||
frame_header.ok(), |
||||
[this, &frame_header] { |
||||
const uint32_t message_padding = std::exchange( |
||||
last_message_padding_, frame_header->message_padding); |
||||
const uint32_t message_length = frame_header->message_length; |
||||
return Map( |
||||
TryJoin<absl::StatusOr>( |
||||
control_endpoint_->Read(frame_header->GetFrameLength()), |
||||
TrySeq(data_endpoint_->Read(message_padding), |
||||
[this, message_length]() { |
||||
return data_endpoint_->Read(message_length); |
||||
})), |
||||
[frame_header = *frame_header]( |
||||
absl::StatusOr<std::tuple<SliceBuffer, SliceBuffer>> |
||||
buffers) |
||||
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> { |
||||
if (!buffers.ok()) return buffers.status(); |
||||
return std::tuple<FrameHeader, BufferPair>( |
||||
frame_header, |
||||
BufferPair{std::move(std::get<0>(*buffers)), |
||||
std::move(std::get<1>(*buffers))}); |
||||
}); |
||||
}, |
||||
[&frame_header]() |
||||
-> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> { |
||||
return frame_header.status(); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
absl::Status DeserializeFrame(FrameHeader header, BufferPair buffers, |
||||
Arena* arena, FrameInterface& frame) { |
||||
return frame.Deserialize(&parser_, header, bitgen_, arena, |
||||
std::move(buffers)); |
||||
} |
||||
|
||||
// Skip a frame, but correctly handle any hpack state updates.
|
||||
void SkipFrame(FrameHeader, BufferPair) { Crash("not implemented"); } |
||||
|
||||
private: |
||||
const std::unique_ptr<PromiseEndpoint> control_endpoint_; |
||||
const std::unique_ptr<PromiseEndpoint> data_endpoint_; |
||||
uint32_t last_message_padding_ = 0; |
||||
HPackCompressor encoder_; |
||||
HPackParser parser_; |
||||
absl::BitGen bitgen_; |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H
|
@ -0,0 +1,332 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/server_transport.h" |
||||
|
||||
#include <memory> |
||||
#include <string> |
||||
#include <tuple> |
||||
|
||||
#include "absl/random/bit_gen_ref.h" |
||||
#include "absl/random/random.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/frame.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" |
||||
#include "src/core/lib/promise/for_each.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/switch.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
auto ChaoticGoodServerTransport::TransportWriteLoop() { |
||||
return Loop([this] { |
||||
return TrySeq( |
||||
// Get next outgoing frame.
|
||||
outgoing_frames_.Next(), |
||||
// Serialize and write it out.
|
||||
[this](ServerFrame client_frame) { |
||||
return transport_.WriteFrame(GetFrameInterface(client_frame)); |
||||
}, |
||||
[]() -> LoopCtl<absl::Status> { |
||||
// The write failures will be caught in TrySeq and exit loop.
|
||||
// Therefore, only need to return Continue() in the last lambda
|
||||
// function.
|
||||
return Continue(); |
||||
}); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::PushFragmentIntoCall( |
||||
CallInitiator call_initiator, ClientFragmentFrame frame) { |
||||
auto& headers = frame.headers; |
||||
return TrySeq( |
||||
If( |
||||
headers != nullptr, |
||||
[call_initiator, &headers]() mutable { |
||||
return call_initiator.PushClientInitialMetadata(std::move(headers)); |
||||
}, |
||||
[]() -> StatusFlag { return Success{}; }), |
||||
[call_initiator, message = std::move(frame.message)]() mutable { |
||||
return If( |
||||
message.has_value(), |
||||
[&call_initiator, &message]() mutable { |
||||
return call_initiator.PushMessage(std::move(message->message)); |
||||
}, |
||||
[]() -> StatusFlag { return Success{}; }); |
||||
}, |
||||
[call_initiator, |
||||
end_of_stream = frame.end_of_stream]() mutable -> StatusFlag { |
||||
if (end_of_stream) call_initiator.FinishSends(); |
||||
return Success{}; |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall( |
||||
absl::optional<CallInitiator> call_initiator, absl::Status error, |
||||
ClientFragmentFrame frame) { |
||||
return If( |
||||
call_initiator.has_value() && error.ok(), |
||||
[this, &call_initiator, &frame]() { |
||||
return Map( |
||||
call_initiator->SpawnWaitable( |
||||
"push-fragment", |
||||
[call_initiator, frame = std::move(frame), this]() mutable { |
||||
return call_initiator->CancelIfFails( |
||||
PushFragmentIntoCall(*call_initiator, std::move(frame))); |
||||
}), |
||||
[](StatusFlag status) { return StatusCast<absl::Status>(status); }); |
||||
}, |
||||
[error = std::move(error)]() { return error; }); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::CallOutboundLoop( |
||||
uint32_t stream_id, CallInitiator call_initiator) { |
||||
auto send_fragment = [stream_id, |
||||
outgoing_frames = outgoing_frames_.MakeSender()]( |
||||
ServerFragmentFrame frame) mutable { |
||||
frame.stream_id = stream_id; |
||||
return Map(outgoing_frames.Send(std::move(frame)), |
||||
[](bool success) -> absl::Status { |
||||
if (!success) { |
||||
// Failed to send outgoing frame.
|
||||
return absl::UnavailableError("Transport closed."); |
||||
} |
||||
return absl::OkStatus(); |
||||
}); |
||||
}; |
||||
return Seq( |
||||
TrySeq( |
||||
// Wait for initial metadata then send it out.
|
||||
call_initiator.PullServerInitialMetadata(), |
||||
[send_fragment](ServerMetadataHandle md) mutable { |
||||
ServerFragmentFrame frame; |
||||
frame.headers = std::move(md); |
||||
return send_fragment(std::move(frame)); |
||||
}, |
||||
// Continuously send client frame with client to server messages.
|
||||
ForEach(OutgoingMessages(call_initiator), |
||||
[send_fragment, aligned_bytes = aligned_bytes_]( |
||||
MessageHandle message) mutable { |
||||
ServerFragmentFrame frame; |
||||
// Construct frame header (flags, header_length and
|
||||
// trailer_length will be added in serialization).
|
||||
const uint32_t message_length = |
||||
message->payload()->Length(); |
||||
const uint32_t padding = |
||||
message_length % aligned_bytes == 0 |
||||
? 0 |
||||
: aligned_bytes - message_length % aligned_bytes; |
||||
GPR_ASSERT((message_length + padding) % aligned_bytes == 0); |
||||
frame.message = FragmentMessage(std::move(message), padding, |
||||
message_length); |
||||
return send_fragment(std::move(frame)); |
||||
})), |
||||
call_initiator.PullServerTrailingMetadata(), |
||||
[send_fragment](ServerMetadataHandle md) mutable { |
||||
ServerFragmentFrame frame; |
||||
frame.trailers = std::move(md); |
||||
return send_fragment(std::move(frame)); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall( |
||||
FrameHeader frame_header, BufferPair buffers) { |
||||
ClientFragmentFrame fragment_frame; |
||||
ScopedArenaPtr arena(acceptor_->CreateArena()); |
||||
absl::Status status = transport_.DeserializeFrame( |
||||
frame_header, std::move(buffers), arena.get(), fragment_frame); |
||||
absl::optional<CallInitiator> call_initiator; |
||||
if (status.ok()) { |
||||
auto create_call_result = |
||||
acceptor_->CreateCall(*fragment_frame.headers, arena.release()); |
||||
if (create_call_result.ok()) { |
||||
call_initiator.emplace(std::move(*create_call_result)); |
||||
call_initiator->SpawnGuarded( |
||||
"server-write", [this, stream_id = frame_header.stream_id, |
||||
call_initiator = *call_initiator]() { |
||||
return CallOutboundLoop(stream_id, call_initiator); |
||||
}); |
||||
} else { |
||||
status = create_call_result.status(); |
||||
} |
||||
} |
||||
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), |
||||
std::move(fragment_frame)); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall( |
||||
FrameHeader frame_header, BufferPair buffers) { |
||||
absl::optional<CallInitiator> call_initiator = |
||||
LookupStream(frame_header.stream_id); |
||||
Arena* arena = nullptr; |
||||
if (call_initiator.has_value()) arena = call_initiator->arena(); |
||||
ClientFragmentFrame fragment_frame; |
||||
absl::Status status = transport_.DeserializeFrame( |
||||
frame_header, std::move(buffers), arena, fragment_frame); |
||||
return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status), |
||||
std::move(fragment_frame)); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::TransportReadLoop() { |
||||
return Loop([this] { |
||||
return TrySeq( |
||||
transport_.ReadFrameBytes(), |
||||
[this](std::tuple<FrameHeader, BufferPair> frame_bytes) { |
||||
const auto& frame_header = std::get<0>(frame_bytes); |
||||
auto& buffers = std::get<1>(frame_bytes); |
||||
return Switch( |
||||
frame_header.type, |
||||
Case(FrameType::kSettings, |
||||
[]() -> absl::Status { |
||||
return absl::InternalError("Unexpected settings frame"); |
||||
}), |
||||
Case(FrameType::kFragment, |
||||
[this, &frame_header, &buffers]() { |
||||
return If( |
||||
frame_header.flags.is_set(0), |
||||
[this, &frame_header, &buffers]() { |
||||
return DeserializeAndPushFragmentToNewCall( |
||||
frame_header, std::move(buffers)); |
||||
}, |
||||
[this, &frame_header, &buffers]() { |
||||
return DeserializeAndPushFragmentToExistingCall( |
||||
frame_header, std::move(buffers)); |
||||
}); |
||||
}), |
||||
Case(FrameType::kCancel, |
||||
[this, &frame_header]() { |
||||
absl::optional<CallInitiator> call_initiator = |
||||
ExtractStream(frame_header.stream_id); |
||||
return If( |
||||
call_initiator.has_value(), |
||||
[&call_initiator]() { |
||||
auto c = std::move(*call_initiator); |
||||
return c.SpawnWaitable("cancel", [c]() mutable { |
||||
c.Cancel(); |
||||
return absl::OkStatus(); |
||||
}); |
||||
}, |
||||
[]() -> absl::Status { |
||||
return absl::InternalError( |
||||
"Unexpected cancel frame"); |
||||
}); |
||||
}), |
||||
Default([frame_header]() { |
||||
return absl::InternalError( |
||||
absl::StrCat("Unexpected frame type: ", |
||||
static_cast<uint8_t>(frame_header.type))); |
||||
})); |
||||
}, |
||||
[]() -> LoopCtl<absl::Status> { return Continue{}; }); |
||||
}); |
||||
} |
||||
|
||||
auto ChaoticGoodServerTransport::OnTransportActivityDone() { |
||||
return [this](absl::Status status) { |
||||
if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { |
||||
this->AbortWithError(); |
||||
} |
||||
}; |
||||
} |
||||
|
||||
ChaoticGoodServerTransport::ChaoticGoodServerTransport( |
||||
const ChannelArgs& args, std::unique_ptr<PromiseEndpoint> control_endpoint, |
||||
std::unique_ptr<PromiseEndpoint> data_endpoint, |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine) |
||||
: outgoing_frames_(4), |
||||
transport_(std::move(control_endpoint), std::move(data_endpoint)), |
||||
allocator_(args.GetObject<ResourceQuota>() |
||||
->memory_quota() |
||||
->CreateMemoryAllocator("chaotic-good")), |
||||
event_engine_(event_engine), |
||||
writer_{MakeActivity(TransportWriteLoop(), |
||||
EventEngineWakeupScheduler(event_engine), |
||||
OnTransportActivityDone())}, |
||||
reader_{nullptr} {} |
||||
|
||||
void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) { |
||||
GPR_ASSERT(acceptor_ == nullptr); |
||||
GPR_ASSERT(acceptor != nullptr); |
||||
acceptor_ = acceptor; |
||||
reader_ = MakeActivity(TransportReadLoop(), |
||||
EventEngineWakeupScheduler(event_engine_), |
||||
OnTransportActivityDone()); |
||||
} |
||||
|
||||
ChaoticGoodServerTransport::~ChaoticGoodServerTransport() { |
||||
if (writer_ != nullptr) { |
||||
writer_.reset(); |
||||
} |
||||
if (reader_ != nullptr) { |
||||
reader_.reset(); |
||||
} |
||||
} |
||||
|
||||
void ChaoticGoodServerTransport::AbortWithError() { |
||||
// Mark transport as unavailable when the endpoint write/read failed.
|
||||
// Close all the available pipes.
|
||||
outgoing_frames_.MarkClosed(); |
||||
ReleasableMutexLock lock(&mu_); |
||||
StreamMap stream_map = std::move(stream_map_); |
||||
stream_map_.clear(); |
||||
lock.Release(); |
||||
for (const auto& pair : stream_map) { |
||||
auto call_initiator = pair.second; |
||||
call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable { |
||||
call_initiator.Cancel(); |
||||
return Empty{}; |
||||
}); |
||||
} |
||||
} |
||||
|
||||
absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream( |
||||
uint32_t stream_id) { |
||||
MutexLock lock(&mu_); |
||||
auto it = stream_map_.find(stream_id); |
||||
if (it == stream_map_.end()) return absl::nullopt; |
||||
return it->second; |
||||
} |
||||
|
||||
absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream( |
||||
uint32_t stream_id) { |
||||
MutexLock lock(&mu_); |
||||
auto it = stream_map_.find(stream_id); |
||||
if (it == stream_map_.end()) return absl::nullopt; |
||||
auto r = std::move(it->second); |
||||
stream_map_.erase(it); |
||||
return std::move(r); |
||||
} |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,145 @@ |
||||
// Copyright 2022 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stdint.h> |
||||
#include <stdio.h> |
||||
|
||||
#include <cstdint> |
||||
#include <initializer_list> // IWYU pragma: keep |
||||
#include <iostream> |
||||
#include <map> |
||||
#include <memory> |
||||
#include <optional> |
||||
#include <string> |
||||
#include <tuple> |
||||
#include <type_traits> |
||||
#include <utility> |
||||
|
||||
#include "absl/base/thread_annotations.h" |
||||
#include "absl/container/flat_hash_map.h" |
||||
#include "absl/functional/any_invocable.h" |
||||
#include "absl/random/random.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/types/optional.h" |
||||
#include "absl/types/variant.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame.h" |
||||
#include "src/core/ext/transport/chaotic_good/frame_header.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" |
||||
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h" |
||||
#include "src/core/lib/event_engine/default_event_engine.h" // IWYU pragma: keep |
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/gprpp/sync.h" |
||||
#include "src/core/lib/promise/activity.h" |
||||
#include "src/core/lib/promise/context.h" |
||||
#include "src/core/lib/promise/if.h" |
||||
#include "src/core/lib/promise/inter_activity_pipe.h" |
||||
#include "src/core/lib/promise/loop.h" |
||||
#include "src/core/lib/promise/mpsc.h" |
||||
#include "src/core/lib/promise/party.h" |
||||
#include "src/core/lib/promise/pipe.h" |
||||
#include "src/core/lib/promise/poll.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "src/core/lib/promise/try_join.h" |
||||
#include "src/core/lib/promise/try_seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
|
||||
class ChaoticGoodServerTransport final : public Transport, |
||||
public ServerTransport { |
||||
public: |
||||
ChaoticGoodServerTransport( |
||||
const ChannelArgs& args, |
||||
std::unique_ptr<PromiseEndpoint> control_endpoint, |
||||
std::unique_ptr<PromiseEndpoint> data_endpoint, |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> |
||||
event_engine); |
||||
~ChaoticGoodServerTransport() override; |
||||
|
||||
FilterStackTransport* filter_stack_transport() override { return nullptr; } |
||||
ClientTransport* client_transport() override { return nullptr; } |
||||
ServerTransport* server_transport() override { return this; } |
||||
absl::string_view GetTransportName() const override { return "chaotic_good"; } |
||||
void SetPollset(grpc_stream*, grpc_pollset*) override {} |
||||
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {} |
||||
void PerformOp(grpc_transport_op*) override { Crash("unimplemented"); } |
||||
grpc_endpoint* GetEndpoint() override { return nullptr; } |
||||
void Orphan() override { delete this; } |
||||
|
||||
void SetAcceptor(Acceptor* acceptor) override; |
||||
void AbortWithError(); |
||||
|
||||
private: |
||||
using StreamMap = absl::flat_hash_map<uint32_t, CallInitiator>; |
||||
|
||||
absl::Status NewStream(uint32_t stream_id, CallInitiator call_initiator); |
||||
absl::optional<CallInitiator> LookupStream(uint32_t stream_id); |
||||
absl::optional<CallInitiator> ExtractStream(uint32_t stream_id); |
||||
auto CallOutboundLoop(uint32_t stream_id, CallInitiator call_initiator); |
||||
auto OnTransportActivityDone(); |
||||
auto TransportReadLoop(); |
||||
auto TransportWriteLoop(); |
||||
// Read different parts of the server frame from control/data endpoints
|
||||
// based on frame header.
|
||||
// Resolves to a StatusOr<tuple<SliceBuffer, SliceBuffer>>
|
||||
auto ReadFrameBody(Slice read_buffer); |
||||
void SendCancel(uint32_t stream_id, absl::Status why); |
||||
auto DeserializeAndPushFragmentToNewCall(FrameHeader frame_header, |
||||
BufferPair buffers); |
||||
auto DeserializeAndPushFragmentToExistingCall(FrameHeader frame_header, |
||||
BufferPair buffers); |
||||
auto MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator, |
||||
absl::Status error, ClientFragmentFrame frame); |
||||
auto PushFragmentIntoCall(CallInitiator call_initiator, |
||||
ClientFragmentFrame frame); |
||||
|
||||
Acceptor* acceptor_ = nullptr; |
||||
MpscReceiver<ServerFrame> outgoing_frames_; |
||||
ChaoticGoodTransport transport_; |
||||
// Assigned aligned bytes from setting frame.
|
||||
size_t aligned_bytes_ = 64; |
||||
Mutex mu_; |
||||
// Map of stream incoming server frames, key is stream_id.
|
||||
StreamMap stream_map_ ABSL_GUARDED_BY(mu_); |
||||
grpc_event_engine::experimental::MemoryAllocator allocator_; |
||||
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; |
||||
ActivityPtr writer_; |
||||
ActivityPtr reader_; |
||||
}; |
||||
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_SERVER_TRANSPORT_H
|
@ -0,0 +1,23 @@ |
||||
// Copyright 2021 gRPC authors. |
||||
// |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package frame_fuzzer; |
||||
|
||||
message Test { |
||||
bool is_server = 1; |
||||
bytes control = 2; |
||||
bytes data = 3; |
||||
} |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,89 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "test/core/transport/chaotic_good/mock_promise_endpoint.h" |
||||
|
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
using EventEngineSlice = grpc_event_engine::experimental::Slice; |
||||
using grpc_event_engine::experimental::EventEngine; |
||||
|
||||
using testing::WithArgs; |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
namespace testing { |
||||
|
||||
void MockPromiseEndpoint::ExpectRead( |
||||
std::initializer_list<EventEngineSlice> slices_init, |
||||
EventEngine* schedule_on_event_engine) { |
||||
std::vector<EventEngineSlice> slices; |
||||
for (auto&& slice : slices_init) slices.emplace_back(slice.Copy()); |
||||
EXPECT_CALL(*endpoint, Read) |
||||
.InSequence(read_sequence) |
||||
.WillOnce(WithArgs<0, 1>( |
||||
[slices = std::move(slices), schedule_on_event_engine]( |
||||
absl::AnyInvocable<void(absl::Status)> on_read, |
||||
grpc_event_engine::experimental::SliceBuffer* buffer) mutable { |
||||
for (auto& slice : slices) { |
||||
buffer->Append(std::move(slice)); |
||||
} |
||||
if (schedule_on_event_engine != nullptr) { |
||||
schedule_on_event_engine->Run( |
||||
[on_read = std::move(on_read)]() mutable { |
||||
on_read(absl::OkStatus()); |
||||
}); |
||||
return false; |
||||
} else { |
||||
return true; |
||||
} |
||||
})); |
||||
} |
||||
|
||||
void MockPromiseEndpoint::ExpectWrite( |
||||
std::initializer_list<EventEngineSlice> slices, |
||||
EventEngine* schedule_on_event_engine) { |
||||
SliceBuffer expect; |
||||
for (auto&& slice : slices) { |
||||
expect.Append(grpc_event_engine::experimental::internal::SliceCast<Slice>( |
||||
slice.Copy())); |
||||
} |
||||
EXPECT_CALL(*endpoint, Write) |
||||
.InSequence(write_sequence) |
||||
.WillOnce(WithArgs<0, 1>( |
||||
[expect = expect.JoinIntoString(), schedule_on_event_engine]( |
||||
absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
grpc_event_engine::experimental::SliceBuffer* buffer) mutable { |
||||
SliceBuffer tmp; |
||||
grpc_slice_buffer_swap(buffer->c_slice_buffer(), |
||||
tmp.c_slice_buffer()); |
||||
EXPECT_EQ(tmp.JoinIntoString(), expect); |
||||
if (schedule_on_event_engine != nullptr) { |
||||
schedule_on_event_engine->Run( |
||||
[on_writable = std::move(on_writable)]() mutable { |
||||
on_writable(absl::OkStatus()); |
||||
}); |
||||
return false; |
||||
} else { |
||||
return true; |
||||
} |
||||
})); |
||||
} |
||||
|
||||
} // namespace testing
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
@ -0,0 +1,77 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#ifndef GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_PROMISE_ENDPOINT_H |
||||
#define GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_PROMISE_ENDPOINT_H |
||||
|
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
|
||||
#include "src/core/lib/transport/promise_endpoint.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
namespace testing { |
||||
|
||||
class MockEndpoint |
||||
: public grpc_event_engine::experimental::EventEngine::Endpoint { |
||||
public: |
||||
MOCK_METHOD( |
||||
bool, Read, |
||||
(absl::AnyInvocable<void(absl::Status)> on_read, |
||||
grpc_event_engine::experimental::SliceBuffer* buffer, |
||||
const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* |
||||
args), |
||||
(override)); |
||||
|
||||
MOCK_METHOD( |
||||
bool, Write, |
||||
(absl::AnyInvocable<void(absl::Status)> on_writable, |
||||
grpc_event_engine::experimental::SliceBuffer* data, |
||||
const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* |
||||
args), |
||||
(override)); |
||||
|
||||
MOCK_METHOD( |
||||
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, |
||||
GetPeerAddress, (), (const, override)); |
||||
MOCK_METHOD( |
||||
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, |
||||
GetLocalAddress, (), (const, override)); |
||||
}; |
||||
|
||||
struct MockPromiseEndpoint { |
||||
::testing::StrictMock<MockEndpoint>* endpoint = |
||||
new ::testing::StrictMock<MockEndpoint>(); |
||||
std::unique_ptr<PromiseEndpoint> promise_endpoint = |
||||
std::make_unique<PromiseEndpoint>( |
||||
std::unique_ptr<::testing::StrictMock<MockEndpoint>>(endpoint), |
||||
SliceBuffer()); |
||||
::testing::Sequence read_sequence; |
||||
::testing::Sequence write_sequence; |
||||
void ExpectRead( |
||||
std::initializer_list<grpc_event_engine::experimental::Slice> slices_init, |
||||
grpc_event_engine::experimental::EventEngine* schedule_on_event_engine); |
||||
void ExpectWrite( |
||||
std::initializer_list<grpc_event_engine::experimental::Slice> slices, |
||||
grpc_event_engine::experimental::EventEngine* schedule_on_event_engine); |
||||
}; |
||||
|
||||
} // namespace testing
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_TEST_CORE_TRANSPORT_CHAOTIC_GOOD_MOCK_PROMISE_ENDPOINT_H
|
@ -0,0 +1,198 @@ |
||||
// Copyright 2023 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "src/core/ext/transport/chaotic_good/server_transport.h" |
||||
|
||||
#include <algorithm> |
||||
#include <memory> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "absl/functional/any_invocable.h" |
||||
#include "absl/status/status.h" |
||||
#include "absl/status/statusor.h" |
||||
#include "absl/strings/str_format.h" |
||||
#include "absl/types/optional.h" |
||||
#include "gmock/gmock.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/event_engine/event_engine.h> |
||||
#include <grpc/event_engine/memory_allocator.h> |
||||
#include <grpc/event_engine/slice.h> |
||||
#include <grpc/event_engine/slice_buffer.h> |
||||
#include <grpc/grpc.h> |
||||
#include <grpc/status.h> |
||||
|
||||
#include "src/core/lib/gprpp/ref_counted_ptr.h" |
||||
#include "src/core/lib/iomgr/timer_manager.h" |
||||
#include "src/core/lib/promise/seq.h" |
||||
#include "src/core/lib/resource_quota/arena.h" |
||||
#include "src/core/lib/resource_quota/memory_quota.h" |
||||
#include "src/core/lib/resource_quota/resource_quota.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/slice/slice_internal.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" |
||||
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" |
||||
#include "test/core/transport/chaotic_good/mock_promise_endpoint.h" |
||||
#include "test/core/transport/chaotic_good/transport_test.h" |
||||
|
||||
using testing::_; |
||||
using testing::MockFunction; |
||||
using testing::Return; |
||||
using testing::StrictMock; |
||||
using testing::WithArgs; |
||||
|
||||
using EventEngineSlice = grpc_event_engine::experimental::Slice; |
||||
|
||||
namespace grpc_core { |
||||
namespace chaotic_good { |
||||
namespace testing { |
||||
|
||||
// Encoded string of header ":path: /demo.Service/Step".
|
||||
const uint8_t kPathDemoServiceStep[] = { |
||||
0x40, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f, |
||||
0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76, |
||||
0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70}; |
||||
|
||||
// Encoded string of trailer "grpc-status: 0".
|
||||
const uint8_t kGrpcStatus0[] = {0x40, 0x0b, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x73, |
||||
0x74, 0x61, 0x74, 0x75, 0x73, 0x01, 0x30}; |
||||
|
||||
ServerMetadataHandle TestInitialMetadata() { |
||||
auto md = |
||||
GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>()); |
||||
md->Set(HttpPathMetadata(), Slice::FromStaticString("/demo.Service/Step")); |
||||
return md; |
||||
} |
||||
|
||||
ServerMetadataHandle TestTrailingMetadata() { |
||||
auto md = |
||||
GetContext<Arena>()->MakePooled<ServerMetadata>(GetContext<Arena>()); |
||||
md->Set(GrpcStatusMetadata(), GRPC_STATUS_OK); |
||||
return md; |
||||
} |
||||
|
||||
class MockAcceptor : public ServerTransport::Acceptor { |
||||
public: |
||||
virtual ~MockAcceptor() = default; |
||||
MOCK_METHOD(Arena*, CreateArena, (), (override)); |
||||
MOCK_METHOD(absl::StatusOr<CallInitiator>, CreateCall, |
||||
(ClientMetadata & client_initial_metadata, Arena* arena), |
||||
(override)); |
||||
}; |
||||
|
||||
TEST_F(TransportTest, ReadAndWriteOneMessage) { |
||||
MockPromiseEndpoint control_endpoint; |
||||
MockPromiseEndpoint data_endpoint; |
||||
StrictMock<MockAcceptor> acceptor; |
||||
auto transport = MakeOrphanable<ChaoticGoodServerTransport>( |
||||
CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(nullptr), |
||||
std::move(control_endpoint.promise_endpoint), |
||||
std::move(data_endpoint.promise_endpoint), event_engine()); |
||||
// Once we set the acceptor, expect to read some frames.
|
||||
// We'll return a new request with a payload of "12345678".
|
||||
control_endpoint.ExpectRead( |
||||
{SerializedFrameHeader(FrameType::kFragment, 7, 1, 26, 8, 56, 0), |
||||
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, |
||||
sizeof(kPathDemoServiceStep))}, |
||||
event_engine().get()); |
||||
data_endpoint.ExpectRead( |
||||
{EventEngineSlice::FromCopiedString("12345678"), Zeros(56)}, nullptr); |
||||
// Once that's read we'll create a new call
|
||||
auto* call_arena = Arena::Create(1024, memory_allocator()); |
||||
CallInitiatorAndHandler call = MakeCall(event_engine().get(), call_arena); |
||||
EXPECT_CALL(acceptor, CreateArena).WillOnce(Return(call_arena)); |
||||
EXPECT_CALL(acceptor, CreateCall(_, call_arena)) |
||||
.WillOnce(WithArgs<0>([call_initiator = std::move(call.initiator)]( |
||||
ClientMetadata& client_initial_metadata) { |
||||
EXPECT_EQ(client_initial_metadata.get_pointer(HttpPathMetadata()) |
||||
->as_string_view(), |
||||
"/demo.Service/Step"); |
||||
return call_initiator; |
||||
})); |
||||
transport->SetAcceptor(&acceptor); |
||||
StrictMock<MockFunction<void()>> on_done; |
||||
EXPECT_CALL(on_done, Call()); |
||||
EXPECT_CALL(*control_endpoint.endpoint, Read) |
||||
.InSequence(control_endpoint.read_sequence) |
||||
.WillOnce(Return(false)); |
||||
control_endpoint.ExpectWrite( |
||||
{SerializedFrameHeader(FrameType::kFragment, 1, 1, |
||||
sizeof(kPathDemoServiceStep), 0, 0, 0), |
||||
EventEngineSlice::FromCopiedBuffer(kPathDemoServiceStep, |
||||
sizeof(kPathDemoServiceStep))}, |
||||
nullptr); |
||||
control_endpoint.ExpectWrite( |
||||
{SerializedFrameHeader(FrameType::kFragment, 2, 1, 0, 8, 56, 0)}, |
||||
nullptr); |
||||
data_endpoint.ExpectWrite( |
||||
{EventEngineSlice::FromCopiedString("87654321"), Zeros(56)}, nullptr); |
||||
control_endpoint.ExpectWrite( |
||||
{SerializedFrameHeader(FrameType::kFragment, 4, 1, 0, 0, 0, |
||||
sizeof(kGrpcStatus0)), |
||||
EventEngineSlice::FromCopiedBuffer(kGrpcStatus0, sizeof(kGrpcStatus0))}, |
||||
nullptr); |
||||
call.handler.SpawnInfallible( |
||||
"test-io", [&on_done, handler = call.handler]() mutable { |
||||
return Seq( |
||||
handler.PullClientInitialMetadata(), |
||||
[](ValueOrFailure<ServerMetadataHandle> md) { |
||||
EXPECT_TRUE(md.ok()); |
||||
EXPECT_EQ( |
||||
md.value()->get_pointer(HttpPathMetadata())->as_string_view(), |
||||
"/demo.Service/Step"); |
||||
return Empty{}; |
||||
}, |
||||
handler.PullMessage(), |
||||
[](NextResult<MessageHandle> msg) { |
||||
EXPECT_TRUE(msg.has_value()); |
||||
EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "12345678"); |
||||
return Empty{}; |
||||
}, |
||||
handler.PullMessage(), |
||||
[](NextResult<MessageHandle> msg) { |
||||
EXPECT_FALSE(msg.has_value()); |
||||
return Empty{}; |
||||
}, |
||||
handler.PushServerInitialMetadata(TestInitialMetadata()), |
||||
handler.PushMessage(Arena::MakePooled<Message>( |
||||
SliceBuffer(Slice::FromCopiedString("87654321")), 0)), |
||||
[handler]() mutable { |
||||
return handler.PushServerTrailingMetadata(TestTrailingMetadata()); |
||||
}, |
||||
[&on_done]() mutable { |
||||
on_done.Call(); |
||||
return Empty{}; |
||||
}); |
||||
}); |
||||
// Wait until ClientTransport's internal activities to finish.
|
||||
event_engine()->TickUntilIdle(); |
||||
event_engine()->UnsetGlobalHooks(); |
||||
} |
||||
|
||||
} // namespace testing
|
||||
} // namespace chaotic_good
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
// Must call to create default EventEngine.
|
||||
grpc_init(); |
||||
int ret = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return ret; |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue