mirror of https://github.com/grpc/grpc.git
[Transport] Update Chttp2 context list to include relative offset of traced RPCs within outgoing buffer (#32825)
The PR also creates a separate BUILD target for: - chttp2 context list - iomgr buffer_list - iomgr internal errqueue This would allow the context list to be included as standalone dependencies for EventEngine implementations.pull/32907/head
parent
702958b3c2
commit
c515eba30b
26 changed files with 223 additions and 475 deletions
@ -1,71 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/context_list.h" |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/internal.h" |
||||
|
||||
namespace { |
||||
void (*write_timestamps_callback_g)(void*, grpc_core::Timestamps*, |
||||
grpc_error_handle error) = nullptr; |
||||
void* (*get_copied_context_fn_g)(void*) = nullptr; |
||||
} // namespace
|
||||
|
||||
namespace grpc_core { |
||||
void ContextList::Append(ContextList** head, grpc_chttp2_stream* s) { |
||||
if (get_copied_context_fn_g == nullptr || |
||||
write_timestamps_callback_g == nullptr) { |
||||
return; |
||||
} |
||||
// Create a new element in the list and add it at the front
|
||||
ContextList* elem = new ContextList(); |
||||
elem->trace_context_ = get_copied_context_fn_g(s->context); |
||||
elem->byte_offset_ = s->byte_counter; |
||||
elem->next_ = *head; |
||||
*head = elem; |
||||
} |
||||
|
||||
void ContextList::Execute(void* arg, Timestamps* ts, grpc_error_handle error) { |
||||
ContextList* head = static_cast<ContextList*>(arg); |
||||
ContextList* to_be_freed; |
||||
while (head != nullptr) { |
||||
if (write_timestamps_callback_g) { |
||||
if (ts) { |
||||
ts->byte_offset = static_cast<uint32_t>(head->byte_offset_); |
||||
} |
||||
write_timestamps_callback_g(head->trace_context_, ts, error); |
||||
} |
||||
to_be_freed = head; |
||||
head = head->next_; |
||||
delete to_be_freed; |
||||
} |
||||
} |
||||
|
||||
void grpc_http2_set_write_timestamps_callback( |
||||
void (*fn)(void*, Timestamps*, grpc_error_handle error)) { |
||||
write_timestamps_callback_g = fn; |
||||
} |
||||
|
||||
void grpc_http2_set_fn_get_copied_context(void* (*fn)(void*)) { |
||||
get_copied_context_fn_g = fn; |
||||
} |
||||
} // namespace grpc_core
|
@ -1,54 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/frame.h" |
||||
#include "src/core/lib/iomgr/buffer_list.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
|
||||
namespace grpc_core { |
||||
/// A list of RPC Contexts
|
||||
class ContextList { |
||||
public: |
||||
// Creates a new element with \a context as the value and appends it to the
|
||||
// list.
|
||||
static void Append(ContextList** head, grpc_chttp2_stream* s); |
||||
|
||||
// Executes a function \a fn with each context in the list and \a ts. It also
|
||||
// frees up the entire list after this operation. It is intended as a callback
|
||||
// and hence does not take a ref on \a error
|
||||
static void Execute(void* arg, Timestamps* ts, grpc_error_handle error); |
||||
|
||||
private: |
||||
void* trace_context_ = nullptr; |
||||
ContextList* next_ = nullptr; |
||||
size_t byte_offset_ = 0; |
||||
}; |
||||
|
||||
void grpc_http2_set_write_timestamps_callback( |
||||
void (*fn)(void*, Timestamps*, grpc_error_handle error)); |
||||
void grpc_http2_set_fn_get_copied_context(void* (*fn)(void*)); |
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_H
|
@ -0,0 +1,70 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_ENTRY_H |
||||
#define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_ENTRY_H |
||||
|
||||
#include <grpc/support/port_platform.h> |
||||
|
||||
#include <stddef.h> |
||||
#include <stdint.h> |
||||
|
||||
#include <vector> |
||||
|
||||
namespace grpc_core { |
||||
|
||||
/// An RPC trace context and associated information. Each RPC/stream is
|
||||
/// associated with a unique \a context. A new ContextList entry is created when
|
||||
/// a chunk of data stored in an outgoing buffer is going to be
|
||||
// sent over the wire. A data chunk being written over the wire is multiplexed
|
||||
// with bytes from multiple RPCs. If one such RPC is traced, we store the
|
||||
// following information about the traced RPC:
|
||||
// - byte_offset_in_stream: Number of bytes belonging to that traced RPC which
|
||||
// have been sent so far from the start of the RPC stream.
|
||||
// - relative_start_pos_in_chunk: Starting offset of the traced RPC within
|
||||
// the current chunk that is being sent.
|
||||
// - num_traced_bytes_in_chunk: Number of bytes belonging to the traced RPC
|
||||
// within the current chunk.
|
||||
class ContextListEntry { |
||||
public: |
||||
ContextListEntry(void* context, int64_t relative_start_pos, |
||||
int64_t num_traced_bytes, size_t byte_offset) |
||||
: trace_context_(context), |
||||
relative_start_pos_in_chunk_(relative_start_pos), |
||||
num_traced_bytes_in_chunk_(num_traced_bytes), |
||||
byte_offset_in_stream_(byte_offset) {} |
||||
|
||||
ContextListEntry() = delete; |
||||
|
||||
void* TraceContext() { return trace_context_; } |
||||
int64_t RelativeStartPosInChunk() { return relative_start_pos_in_chunk_; } |
||||
int64_t NumTracedBytesInChunk() { return num_traced_bytes_in_chunk_; } |
||||
size_t ByteOffsetInStream() { return byte_offset_in_stream_; } |
||||
|
||||
private: |
||||
void* trace_context_; |
||||
int64_t relative_start_pos_in_chunk_; |
||||
int64_t num_traced_bytes_in_chunk_; |
||||
size_t byte_offset_in_stream_; |
||||
}; |
||||
|
||||
/// A list of RPC Contexts
|
||||
typedef std::vector<ContextListEntry> ContextList; |
||||
} // namespace grpc_core
|
||||
|
||||
#endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CONTEXT_LIST_ENTRY_H
|
@ -1,181 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2018 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/context_list.h" |
||||
|
||||
#include <stdint.h> |
||||
|
||||
#include <algorithm> |
||||
#include <vector> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "gtest/gtest.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/slice.h> |
||||
#include <grpc/support/alloc.h> |
||||
#include <grpc/support/atm.h> |
||||
|
||||
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
||||
#include "src/core/ext/transport/chttp2/transport/internal.h" |
||||
#include "src/core/lib/channel/channel_args_preconditioning.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/iomgr/endpoint.h" |
||||
#include "src/core/lib/iomgr/exec_ctx.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
#include "src/core/lib/transport/transport_fwd.h" |
||||
#include "test/core/util/mock_endpoint.h" |
||||
#include "test/core/util/test_config.h" |
||||
|
||||
namespace grpc_core { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
const uint32_t kByteOffset = 123; |
||||
|
||||
void* PhonyArgsCopier(void* arg) { return arg; } |
||||
|
||||
void TestExecuteFlushesListVerifier(void* arg, Timestamps* ts, |
||||
grpc_error_handle error) { |
||||
ASSERT_NE(arg, nullptr); |
||||
EXPECT_EQ(error, absl::OkStatus()); |
||||
if (ts) { |
||||
EXPECT_EQ(ts->byte_offset, kByteOffset); |
||||
} |
||||
gpr_atm* done = reinterpret_cast<gpr_atm*>(arg); |
||||
gpr_atm_rel_store(done, gpr_atm{1}); |
||||
} |
||||
|
||||
void discard_write(grpc_slice /*slice*/) {} |
||||
|
||||
class ContextListTest : public ::testing::Test { |
||||
protected: |
||||
void SetUp() override { |
||||
grpc_http2_set_write_timestamps_callback(TestExecuteFlushesListVerifier); |
||||
grpc_http2_set_fn_get_copied_context(PhonyArgsCopier); |
||||
} |
||||
}; |
||||
|
||||
/// Tests that all ContextList elements in the list are flushed out on
|
||||
/// execute.
|
||||
/// Also tests that arg and byte_counter are passed correctly.
|
||||
///
|
||||
TEST_F(ContextListTest, ExecuteFlushesList) { |
||||
ContextList* list = nullptr; |
||||
const int kNumElems = 5; |
||||
ExecCtx exec_ctx; |
||||
grpc_stream_refcount ref; |
||||
GRPC_STREAM_REF_INIT(&ref, 1, nullptr, nullptr, "phony ref"); |
||||
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write); |
||||
auto args = CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(nullptr); |
||||
grpc_transport* t = grpc_create_chttp2_transport(args, mock_endpoint, true); |
||||
std::vector<grpc_chttp2_stream*> s; |
||||
s.reserve(kNumElems); |
||||
gpr_atm verifier_called[kNumElems]; |
||||
for (auto i = 0; i < kNumElems; i++) { |
||||
s.push_back(static_cast<grpc_chttp2_stream*>( |
||||
gpr_malloc(grpc_transport_stream_size(t)))); |
||||
grpc_transport_init_stream(reinterpret_cast<grpc_transport*>(t), |
||||
reinterpret_cast<grpc_stream*>(s[i]), &ref, |
||||
nullptr, nullptr); |
||||
s[i]->context = &verifier_called[i]; |
||||
s[i]->byte_counter = kByteOffset; |
||||
gpr_atm_rel_store(&verifier_called[i], gpr_atm{0}); |
||||
ContextList::Append(&list, s[i]); |
||||
} |
||||
Timestamps ts; |
||||
ContextList::Execute(list, &ts, absl::OkStatus()); |
||||
for (auto i = 0; i < kNumElems; i++) { |
||||
EXPECT_EQ(gpr_atm_acq_load(&verifier_called[i]), 1); |
||||
grpc_transport_destroy_stream(reinterpret_cast<grpc_transport*>(t), |
||||
reinterpret_cast<grpc_stream*>(s[i]), |
||||
nullptr); |
||||
exec_ctx.Flush(); |
||||
gpr_free(s[i]); |
||||
} |
||||
grpc_transport_destroy(t); |
||||
exec_ctx.Flush(); |
||||
} |
||||
|
||||
TEST_F(ContextListTest, EmptyList) { |
||||
ContextList* list = nullptr; |
||||
ExecCtx exec_ctx; |
||||
Timestamps ts; |
||||
ContextList::Execute(list, &ts, absl::OkStatus()); |
||||
exec_ctx.Flush(); |
||||
} |
||||
|
||||
TEST_F(ContextListTest, EmptyListEmptyTimestamp) { |
||||
ContextList* list = nullptr; |
||||
ExecCtx exec_ctx; |
||||
ContextList::Execute(list, nullptr, absl::OkStatus()); |
||||
exec_ctx.Flush(); |
||||
} |
||||
|
||||
TEST_F(ContextListTest, NonEmptyListEmptyTimestamp) { |
||||
ContextList* list = nullptr; |
||||
const int kNumElems = 5; |
||||
ExecCtx exec_ctx; |
||||
grpc_stream_refcount ref; |
||||
GRPC_STREAM_REF_INIT(&ref, 1, nullptr, nullptr, "phony ref"); |
||||
grpc_endpoint* mock_endpoint = grpc_mock_endpoint_create(discard_write); |
||||
auto args = CoreConfiguration::Get() |
||||
.channel_args_preconditioning() |
||||
.PreconditionChannelArgs(nullptr); |
||||
grpc_transport* t = grpc_create_chttp2_transport(args, mock_endpoint, true); |
||||
std::vector<grpc_chttp2_stream*> s; |
||||
s.reserve(kNumElems); |
||||
gpr_atm verifier_called[kNumElems]; |
||||
for (auto i = 0; i < kNumElems; i++) { |
||||
s.push_back(static_cast<grpc_chttp2_stream*>( |
||||
gpr_malloc(grpc_transport_stream_size(t)))); |
||||
grpc_transport_init_stream(reinterpret_cast<grpc_transport*>(t), |
||||
reinterpret_cast<grpc_stream*>(s[i]), &ref, |
||||
nullptr, nullptr); |
||||
s[i]->context = &verifier_called[i]; |
||||
s[i]->byte_counter = kByteOffset; |
||||
gpr_atm_rel_store(&verifier_called[i], gpr_atm{0}); |
||||
ContextList::Append(&list, s[i]); |
||||
} |
||||
ContextList::Execute(list, nullptr, absl::OkStatus()); |
||||
for (auto i = 0; i < kNumElems; i++) { |
||||
EXPECT_EQ(gpr_atm_acq_load(&verifier_called[i]), 1); |
||||
grpc_transport_destroy_stream(reinterpret_cast<grpc_transport*>(t), |
||||
reinterpret_cast<grpc_stream*>(s[i]), |
||||
nullptr); |
||||
exec_ctx.Flush(); |
||||
gpr_free(s[i]); |
||||
} |
||||
grpc_transport_destroy(t); |
||||
exec_ctx.Flush(); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc_core
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
grpc_init(); |
||||
int ret = RUN_ALL_TESTS(); |
||||
grpc_shutdown(); |
||||
return ret; |
||||
} |
Loading…
Reference in new issue