mirror of https://github.com/grpc/grpc.git
[cpp] Reland removal of channel_filter code (#34598)
Co-authored-by: ctiller <ctiller@users.noreply.github.com>pull/32852/head
parent
0496589405
commit
c9df0ca470
13 changed files with 0 additions and 1080 deletions
@ -1,96 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include "src/cpp/common/channel_filter.h" |
||||
|
||||
#include "absl/strings/str_cat.h" |
||||
#include "absl/strings/string_view.h" |
||||
|
||||
#include <grpc/support/log.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/channel_stack_builder.h" |
||||
#include "src/core/lib/config/core_configuration.h" |
||||
#include "src/core/lib/slice/slice.h" |
||||
|
||||
namespace grpc { |
||||
|
||||
// MetadataBatch
|
||||
|
||||
void MetadataBatch::AddMetadata(const string& key, const string& value) { |
||||
batch_->Append(key, grpc_core::Slice::FromCopiedString(value), |
||||
[&](absl::string_view error, const grpc_core::Slice&) { |
||||
gpr_log(GPR_INFO, "%s", |
||||
absl::StrCat("MetadataBatch::AddMetadata error:", |
||||
error, " key=", key, " value=", value) |
||||
.c_str()); |
||||
}); |
||||
} |
||||
|
||||
// ChannelData
|
||||
|
||||
void ChannelData::StartTransportOp(grpc_channel_element* elem, |
||||
TransportOp* op) { |
||||
grpc_channel_next_op(elem, op->op()); |
||||
} |
||||
|
||||
void ChannelData::GetInfo(grpc_channel_element* elem, |
||||
const grpc_channel_info* channel_info) { |
||||
grpc_channel_next_get_info(elem, channel_info); |
||||
} |
||||
|
||||
// CallData
|
||||
|
||||
void CallData::StartTransportStreamOpBatch(grpc_call_element* elem, |
||||
TransportStreamOpBatch* op) { |
||||
grpc_call_next_op(elem, op->op()); |
||||
} |
||||
|
||||
void CallData::SetPollsetOrPollsetSet(grpc_call_element* elem, |
||||
grpc_polling_entity* pollent) { |
||||
grpc_call_stack_ignore_set_pollset_or_pollset_set(elem, pollent); |
||||
} |
||||
|
||||
namespace internal { |
||||
|
||||
void RegisterChannelFilter( |
||||
grpc_channel_stack_type stack_type, int priority, |
||||
std::function<bool(const grpc_core::ChannelArgs&)> include_filter, |
||||
const grpc_channel_filter* filter) { |
||||
auto maybe_add_filter = [include_filter, |
||||
filter](grpc_core::ChannelStackBuilder* builder) { |
||||
if (include_filter != nullptr) { |
||||
if (!include_filter(builder->channel_args())) { |
||||
return true; |
||||
} |
||||
} |
||||
builder->PrependFilter(filter); |
||||
return true; |
||||
}; |
||||
grpc_core::CoreConfiguration::RegisterBuilder( |
||||
[stack_type, priority, |
||||
maybe_add_filter](grpc_core::CoreConfiguration::Builder* builder) { |
||||
builder->channel_init()->RegisterStage(stack_type, priority, |
||||
maybe_add_filter); |
||||
}); |
||||
} |
||||
|
||||
} // namespace internal
|
||||
|
||||
} // namespace grpc
|
@ -1,344 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#ifndef GRPC_SRC_CPP_COMMON_CHANNEL_FILTER_H |
||||
#define GRPC_SRC_CPP_COMMON_CHANNEL_FILTER_H |
||||
|
||||
#include <stddef.h> |
||||
|
||||
#include <functional> |
||||
#include <new> |
||||
#include <string> |
||||
#include <utility> |
||||
|
||||
#include "absl/status/status.h" |
||||
#include "absl/types/optional.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpcpp/support/config.h> |
||||
|
||||
#include "src/core/lib/channel/channel_args.h" |
||||
#include "src/core/lib/channel/channel_fwd.h" |
||||
#include "src/core/lib/channel/channel_stack.h" |
||||
#include "src/core/lib/channel/context.h" |
||||
#include "src/core/lib/iomgr/closure.h" |
||||
#include "src/core/lib/iomgr/error.h" |
||||
#include "src/core/lib/iomgr/polling_entity.h" |
||||
#include "src/core/lib/slice/slice_buffer.h" |
||||
#include "src/core/lib/surface/channel_stack_type.h" |
||||
#include "src/core/lib/transport/metadata_batch.h" |
||||
#include "src/core/lib/transport/transport.h" |
||||
|
||||
/// An interface to define filters.
|
||||
///
|
||||
/// To define a filter, implement a subclass of each of \c CallData and
|
||||
/// \c ChannelData. Then register the filter using something like this:
|
||||
/// \code{.cpp}
|
||||
/// RegisterChannelFilter<MyChannelDataSubclass, MyCallDataSubclass>(
|
||||
/// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
|
||||
/// \endcode
|
||||
|
||||
namespace grpc { |
||||
|
||||
/// A C++ wrapper for the \c grpc_metadata_batch struct.
|
||||
class MetadataBatch { |
||||
public: |
||||
/// Borrows a pointer to \a batch, but does NOT take ownership.
|
||||
/// The caller must ensure that \a batch continues to exist for as
|
||||
/// long as the MetadataBatch object does.
|
||||
explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {} |
||||
|
||||
grpc_metadata_batch* batch() const { return batch_; } |
||||
|
||||
/// Adds metadata.
|
||||
void AddMetadata(const string& key, const string& value); |
||||
|
||||
private: |
||||
grpc_metadata_batch* batch_; // Not owned.
|
||||
}; |
||||
|
||||
/// A C++ wrapper for the \c grpc_transport_op struct.
|
||||
class TransportOp { |
||||
public: |
||||
/// Borrows a pointer to \a op, but does NOT take ownership.
|
||||
/// The caller must ensure that \a op continues to exist for as
|
||||
/// long as the TransportOp object does.
|
||||
explicit TransportOp(grpc_transport_op* op) : op_(op) {} |
||||
|
||||
grpc_transport_op* op() const { return op_; } |
||||
|
||||
// TODO(roth): Add a C++ wrapper for grpc_error?
|
||||
grpc_error_handle disconnect_with_error() const { |
||||
return op_->disconnect_with_error; |
||||
} |
||||
bool send_goaway() const { return !op_->goaway_error.ok(); } |
||||
|
||||
// TODO(roth): Add methods for additional fields as needed.
|
||||
|
||||
private: |
||||
grpc_transport_op* op_; // Not owned.
|
||||
}; |
||||
|
||||
/// A C++ wrapper for the \c grpc_transport_stream_op_batch struct.
|
||||
class TransportStreamOpBatch { |
||||
public: |
||||
/// Borrows a pointer to \a op, but does NOT take ownership.
|
||||
/// The caller must ensure that \a op continues to exist for as
|
||||
/// long as the TransportStreamOpBatch object does.
|
||||
explicit TransportStreamOpBatch(grpc_transport_stream_op_batch* op) |
||||
: op_(op), |
||||
send_initial_metadata_( |
||||
op->send_initial_metadata |
||||
? op->payload->send_initial_metadata.send_initial_metadata |
||||
: nullptr), |
||||
send_trailing_metadata_( |
||||
op->send_trailing_metadata |
||||
? op->payload->send_trailing_metadata.send_trailing_metadata |
||||
: nullptr), |
||||
recv_initial_metadata_( |
||||
op->recv_initial_metadata |
||||
? op->payload->recv_initial_metadata.recv_initial_metadata |
||||
: nullptr), |
||||
recv_trailing_metadata_( |
||||
op->recv_trailing_metadata |
||||
? op->payload->recv_trailing_metadata.recv_trailing_metadata |
||||
: nullptr) {} |
||||
|
||||
grpc_transport_stream_op_batch* op() const { return op_; } |
||||
|
||||
grpc_closure* on_complete() const { return op_->on_complete; } |
||||
void set_on_complete(grpc_closure* closure) { op_->on_complete = closure; } |
||||
|
||||
MetadataBatch* send_initial_metadata() { |
||||
return op_->send_initial_metadata ? &send_initial_metadata_ : nullptr; |
||||
} |
||||
MetadataBatch* send_trailing_metadata() { |
||||
return op_->send_trailing_metadata ? &send_trailing_metadata_ : nullptr; |
||||
} |
||||
MetadataBatch* recv_initial_metadata() { |
||||
return op_->recv_initial_metadata ? &recv_initial_metadata_ : nullptr; |
||||
} |
||||
MetadataBatch* recv_trailing_metadata() { |
||||
return op_->recv_trailing_metadata ? &recv_trailing_metadata_ : nullptr; |
||||
} |
||||
|
||||
grpc_closure* recv_initial_metadata_ready() const { |
||||
return op_->recv_initial_metadata |
||||
? op_->payload->recv_initial_metadata.recv_initial_metadata_ready |
||||
: nullptr; |
||||
} |
||||
void set_recv_initial_metadata_ready(grpc_closure* closure) { |
||||
op_->payload->recv_initial_metadata.recv_initial_metadata_ready = closure; |
||||
} |
||||
|
||||
grpc_core::SliceBuffer* send_message() const { |
||||
return op_->send_message ? op_->payload->send_message.send_message |
||||
: nullptr; |
||||
} |
||||
|
||||
void set_send_message(grpc_core::SliceBuffer* send_message) { |
||||
op_->send_message = true; |
||||
op_->payload->send_message.send_message = send_message; |
||||
} |
||||
|
||||
absl::optional<grpc_core::SliceBuffer>* recv_message() const { |
||||
return op_->recv_message ? op_->payload->recv_message.recv_message |
||||
: nullptr; |
||||
} |
||||
void set_recv_message(absl::optional<grpc_core::SliceBuffer>* recv_message) { |
||||
op_->recv_message = true; |
||||
op_->payload->recv_message.recv_message = recv_message; |
||||
} |
||||
|
||||
census_context* get_census_context() const { |
||||
return static_cast<census_context*>( |
||||
op_->payload->context[GRPC_CONTEXT_TRACING].value); |
||||
} |
||||
|
||||
private: |
||||
grpc_transport_stream_op_batch* op_; // Not owned.
|
||||
MetadataBatch send_initial_metadata_; |
||||
MetadataBatch send_trailing_metadata_; |
||||
MetadataBatch recv_initial_metadata_; |
||||
MetadataBatch recv_trailing_metadata_; |
||||
}; |
||||
|
||||
/// Represents channel data.
|
||||
class ChannelData { |
||||
public: |
||||
ChannelData() {} |
||||
virtual ~ChannelData() {} |
||||
|
||||
// TODO(roth): Come up with a more C++-like API for the channel element.
|
||||
|
||||
/// Initializes the channel data.
|
||||
virtual grpc_error_handle Init(grpc_channel_element* /*elem*/, |
||||
grpc_channel_element_args* /*args*/) { |
||||
return absl::OkStatus(); |
||||
} |
||||
|
||||
// Called before destruction.
|
||||
virtual void Destroy(grpc_channel_element* /*elem*/) {} |
||||
|
||||
virtual void StartTransportOp(grpc_channel_element* elem, TransportOp* op); |
||||
|
||||
virtual void GetInfo(grpc_channel_element* elem, |
||||
const grpc_channel_info* channel_info); |
||||
}; |
||||
|
||||
/// Represents call data.
|
||||
class CallData { |
||||
public: |
||||
CallData() {} |
||||
virtual ~CallData() {} |
||||
|
||||
// TODO(roth): Come up with a more C++-like API for the call element.
|
||||
|
||||
/// Initializes the call data.
|
||||
virtual grpc_error_handle Init(grpc_call_element* /*elem*/, |
||||
const grpc_call_element_args* /*args*/) { |
||||
return absl::OkStatus(); |
||||
} |
||||
|
||||
// Called before destruction.
|
||||
virtual void Destroy(grpc_call_element* /*elem*/, |
||||
const grpc_call_final_info* /*final_info*/, |
||||
grpc_closure* /*then_call_closure*/) {} |
||||
|
||||
/// Starts a new stream operation.
|
||||
virtual void StartTransportStreamOpBatch(grpc_call_element* elem, |
||||
TransportStreamOpBatch* op); |
||||
|
||||
/// Sets a pollset or pollset set.
|
||||
virtual void SetPollsetOrPollsetSet(grpc_call_element* elem, |
||||
grpc_polling_entity* pollent); |
||||
}; |
||||
|
||||
namespace internal { |
||||
|
||||
// Defines static members for passing to C core.
|
||||
// Members of this class correspond to the members of the C
|
||||
// grpc_channel_filter struct.
|
||||
template <typename ChannelDataType, typename CallDataType> |
||||
class ChannelFilter final { |
||||
public: |
||||
static const size_t channel_data_size = sizeof(ChannelDataType); |
||||
|
||||
static grpc_error_handle InitChannelElement(grpc_channel_element* elem, |
||||
grpc_channel_element_args* args) { |
||||
// Construct the object in the already-allocated memory.
|
||||
ChannelDataType* channel_data = new (elem->channel_data) ChannelDataType(); |
||||
return channel_data->Init(elem, args); |
||||
} |
||||
|
||||
static void DestroyChannelElement(grpc_channel_element* elem) { |
||||
ChannelDataType* channel_data = |
||||
static_cast<ChannelDataType*>(elem->channel_data); |
||||
channel_data->Destroy(elem); |
||||
channel_data->~ChannelDataType(); |
||||
} |
||||
|
||||
static void StartTransportOp(grpc_channel_element* elem, |
||||
grpc_transport_op* op) { |
||||
ChannelDataType* channel_data = |
||||
static_cast<ChannelDataType*>(elem->channel_data); |
||||
TransportOp op_wrapper(op); |
||||
channel_data->StartTransportOp(elem, &op_wrapper); |
||||
} |
||||
|
||||
static void GetChannelInfo(grpc_channel_element* elem, |
||||
const grpc_channel_info* channel_info) { |
||||
ChannelDataType* channel_data = |
||||
static_cast<ChannelDataType*>(elem->channel_data); |
||||
channel_data->GetInfo(elem, channel_info); |
||||
} |
||||
|
||||
static const size_t call_data_size = sizeof(CallDataType); |
||||
|
||||
static grpc_error_handle InitCallElement(grpc_call_element* elem, |
||||
const grpc_call_element_args* args) { |
||||
// Construct the object in the already-allocated memory.
|
||||
CallDataType* call_data = new (elem->call_data) CallDataType(); |
||||
return call_data->Init(elem, args); |
||||
} |
||||
|
||||
static void DestroyCallElement(grpc_call_element* elem, |
||||
const grpc_call_final_info* final_info, |
||||
grpc_closure* then_call_closure) { |
||||
CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); |
||||
call_data->Destroy(elem, final_info, then_call_closure); |
||||
call_data->~CallDataType(); |
||||
} |
||||
|
||||
static void StartTransportStreamOpBatch(grpc_call_element* elem, |
||||
grpc_transport_stream_op_batch* op) { |
||||
CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); |
||||
TransportStreamOpBatch op_wrapper(op); |
||||
call_data->StartTransportStreamOpBatch(elem, &op_wrapper); |
||||
} |
||||
|
||||
static void SetPollsetOrPollsetSet(grpc_call_element* elem, |
||||
grpc_polling_entity* pollent) { |
||||
CallDataType* call_data = static_cast<CallDataType*>(elem->call_data); |
||||
call_data->SetPollsetOrPollsetSet(elem, pollent); |
||||
} |
||||
}; |
||||
|
||||
void RegisterChannelFilter( |
||||
grpc_channel_stack_type stack_type, int priority, |
||||
std::function<bool(const grpc_core::ChannelArgs&)> include_filter, |
||||
const grpc_channel_filter* filter); |
||||
|
||||
} // namespace internal
|
||||
|
||||
/// Registers a new filter.
|
||||
/// Must be called by only one thread at a time.
|
||||
/// The \a include_filter argument specifies a function that will be called
|
||||
/// to determine at run-time whether or not to add the filter. If the
|
||||
/// value is nullptr, the filter will be added unconditionally.
|
||||
/// If the channel stack type is GRPC_CLIENT_SUBCHANNEL, the caller should
|
||||
/// ensure that subchannels with different filter lists will always have
|
||||
/// different channel args. This requires setting a channel arg in case the
|
||||
/// registration function relies on some condition other than channel args to
|
||||
/// decide whether to add a filter or not.
|
||||
template <typename ChannelDataType, typename CallDataType> |
||||
void RegisterChannelFilter( |
||||
const char* name, grpc_channel_stack_type stack_type, int priority, |
||||
std::function<bool(const grpc_core::ChannelArgs&)> include_filter) { |
||||
using FilterType = internal::ChannelFilter<ChannelDataType, CallDataType>; |
||||
static const grpc_channel_filter filter = { |
||||
FilterType::StartTransportStreamOpBatch, |
||||
nullptr, |
||||
FilterType::StartTransportOp, |
||||
FilterType::call_data_size, |
||||
FilterType::InitCallElement, |
||||
FilterType::SetPollsetOrPollsetSet, |
||||
FilterType::DestroyCallElement, |
||||
FilterType::channel_data_size, |
||||
FilterType::InitChannelElement, |
||||
grpc_channel_stack_no_post_init, |
||||
FilterType::DestroyChannelElement, |
||||
FilterType::GetChannelInfo, |
||||
name}; |
||||
grpc::internal::RegisterChannelFilter(stack_type, priority, |
||||
std::move(include_filter), &filter); |
||||
} |
||||
|
||||
} // namespace grpc
|
||||
|
||||
#endif // GRPC_SRC_CPP_COMMON_CHANNEL_FILTER_H
|
@ -1,66 +0,0 @@ |
||||
//
|
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
#include "src/cpp/common/channel_filter.h" |
||||
|
||||
#include <limits.h> |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
|
||||
class MyChannelData : public ChannelData { |
||||
public: |
||||
MyChannelData() {} |
||||
|
||||
grpc_error_handle Init(grpc_channel_element* /*elem*/, |
||||
grpc_channel_element_args* args) override { |
||||
(void)args->channel_args; // Make sure field is available.
|
||||
return absl::OkStatus(); |
||||
} |
||||
}; |
||||
|
||||
class MyCallData : public CallData { |
||||
public: |
||||
MyCallData() {} |
||||
|
||||
grpc_error_handle Init(grpc_call_element* /*elem*/, |
||||
const grpc_call_element_args* args) override { |
||||
(void)args->path; // Make sure field is available.
|
||||
return absl::OkStatus(); |
||||
} |
||||
}; |
||||
|
||||
// This test ensures that when we make changes to the filter API in
|
||||
// C-core, we don't accidentally break the C++ filter API.
|
||||
TEST(ChannelFilterTest, RegisterChannelFilter) { |
||||
grpc::RegisterChannelFilter<MyChannelData, MyCallData>( |
||||
"myfilter", GRPC_CLIENT_CHANNEL, INT_MAX, nullptr); |
||||
} |
||||
|
||||
// TODO(roth): When we have time, add tests for all methods of the
|
||||
// filter API.
|
||||
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
@ -1,344 +0,0 @@ |
||||
//
|
||||
//
|
||||
// Copyright 2016 gRPC authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
//
|
||||
|
||||
#include <memory> |
||||
#include <mutex> |
||||
#include <thread> |
||||
|
||||
#include <gtest/gtest.h> |
||||
|
||||
#include "absl/memory/memory.h" |
||||
|
||||
#include <grpc/grpc.h> |
||||
#include <grpc/support/time.h> |
||||
#include <grpcpp/channel.h> |
||||
#include <grpcpp/client_context.h> |
||||
#include <grpcpp/create_channel.h> |
||||
#include <grpcpp/generic/async_generic_service.h> |
||||
#include <grpcpp/generic/generic_stub.h> |
||||
#include <grpcpp/impl/proto_utils.h> |
||||
#include <grpcpp/server.h> |
||||
#include <grpcpp/server_builder.h> |
||||
#include <grpcpp/server_context.h> |
||||
#include <grpcpp/support/config.h> |
||||
#include <grpcpp/support/slice.h> |
||||
|
||||
#include "src/cpp/common/channel_filter.h" |
||||
#include "src/proto/grpc/testing/echo.grpc.pb.h" |
||||
#include "test/core/util/port.h" |
||||
#include "test/core/util/test_config.h" |
||||
#include "test/cpp/util/byte_buffer_proto_helper.h" |
||||
|
||||
namespace grpc { |
||||
namespace testing { |
||||
namespace { |
||||
|
||||
void* tag(int i) { return reinterpret_cast<void*>(i); } |
||||
|
||||
void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { |
||||
bool ok; |
||||
void* got_tag; |
||||
EXPECT_TRUE(cq->Next(&got_tag, &ok)); |
||||
EXPECT_EQ(expect_ok, ok); |
||||
EXPECT_EQ(tag(i), got_tag); |
||||
} |
||||
|
||||
namespace { |
||||
|
||||
int global_num_connections = 0; |
||||
int global_num_calls = 0; |
||||
std::mutex global_mu; |
||||
|
||||
void IncrementConnectionCounter() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
++global_num_connections; |
||||
} |
||||
|
||||
void ResetConnectionCounter() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
global_num_connections = 0; |
||||
} |
||||
|
||||
int GetConnectionCounterValue() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
return global_num_connections; |
||||
} |
||||
|
||||
void IncrementCallCounter() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
++global_num_calls; |
||||
} |
||||
|
||||
void ResetCallCounter() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
global_num_calls = 0; |
||||
} |
||||
|
||||
int GetCallCounterValue() { |
||||
std::unique_lock<std::mutex> lock(global_mu); |
||||
return global_num_calls; |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
class ChannelDataImpl : public ChannelData { |
||||
public: |
||||
grpc_error_handle Init(grpc_channel_element* /*elem*/, |
||||
grpc_channel_element_args* /*args*/) override { |
||||
IncrementConnectionCounter(); |
||||
return absl::OkStatus(); |
||||
} |
||||
}; |
||||
|
||||
class CallDataImpl : public CallData { |
||||
public: |
||||
void StartTransportStreamOpBatch(grpc_call_element* elem, |
||||
TransportStreamOpBatch* op) override { |
||||
// Incrementing the counter could be done from Init(), but we want
|
||||
// to test that the individual methods are actually called correctly.
|
||||
if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); |
||||
grpc_call_next_op(elem, op->op()); |
||||
} |
||||
}; |
||||
|
||||
class FilterEnd2endTest : public ::testing::Test { |
||||
protected: |
||||
FilterEnd2endTest() : server_host_("localhost") {} |
||||
|
||||
static void SetUpTestSuite() { |
||||
// Workaround for
|
||||
// https://github.com/google/google-toolbox-for-mac/issues/242
|
||||
static bool setup_done = false; |
||||
if (!setup_done) { |
||||
setup_done = true; |
||||
grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>( |
||||
"test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); |
||||
} |
||||
} |
||||
|
||||
void SetUp() override { |
||||
int port = grpc_pick_unused_port_or_die(); |
||||
server_address_ << server_host_ << ":" << port; |
||||
// Setup server
|
||||
ServerBuilder builder; |
||||
builder.AddListeningPort(server_address_.str(), |
||||
InsecureServerCredentials()); |
||||
builder.RegisterAsyncGenericService(&generic_service_); |
||||
srv_cq_ = builder.AddCompletionQueue(); |
||||
server_ = builder.BuildAndStart(); |
||||
} |
||||
|
||||
void TearDown() override { |
||||
server_->Shutdown(); |
||||
void* ignored_tag; |
||||
bool ignored_ok; |
||||
cli_cq_.Shutdown(); |
||||
srv_cq_->Shutdown(); |
||||
while (cli_cq_.Next(&ignored_tag, &ignored_ok)) { |
||||
} |
||||
while (srv_cq_->Next(&ignored_tag, &ignored_ok)) { |
||||
} |
||||
} |
||||
|
||||
void ResetStub() { |
||||
std::shared_ptr<Channel> channel = grpc::CreateChannel( |
||||
server_address_.str(), InsecureChannelCredentials()); |
||||
generic_stub_ = std::make_unique<GenericStub>(channel); |
||||
ResetConnectionCounter(); |
||||
ResetCallCounter(); |
||||
} |
||||
|
||||
void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } |
||||
void client_ok(int i) { verify_ok(&cli_cq_, i, true); } |
||||
void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } |
||||
void client_fail(int i) { verify_ok(&cli_cq_, i, false); } |
||||
|
||||
void SendRpc(int num_rpcs) { |
||||
const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo"); |
||||
for (int i = 0; i < num_rpcs; i++) { |
||||
EchoRequest send_request; |
||||
EchoRequest recv_request; |
||||
EchoResponse send_response; |
||||
EchoResponse recv_response; |
||||
Status recv_status; |
||||
|
||||
ClientContext cli_ctx; |
||||
GenericServerContext srv_ctx; |
||||
GenericServerAsyncReaderWriter stream(&srv_ctx); |
||||
|
||||
// The string needs to be long enough to test heap-based slice.
|
||||
send_request.set_message("Hello world. Hello world. Hello world."); |
||||
std::thread request_call([this]() { server_ok(4); }); |
||||
std::unique_ptr<GenericClientAsyncReaderWriter> call = |
||||
generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); |
||||
call->StartCall(tag(1)); |
||||
client_ok(1); |
||||
std::unique_ptr<ByteBuffer> send_buffer = |
||||
SerializeToByteBuffer(&send_request); |
||||
call->Write(*send_buffer, tag(2)); |
||||
// Send ByteBuffer can be destroyed after calling Write.
|
||||
send_buffer.reset(); |
||||
client_ok(2); |
||||
call->WritesDone(tag(3)); |
||||
client_ok(3); |
||||
|
||||
generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), |
||||
srv_cq_.get(), tag(4)); |
||||
|
||||
request_call.join(); |
||||
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); |
||||
EXPECT_EQ(kMethodName, srv_ctx.method()); |
||||
ByteBuffer recv_buffer; |
||||
stream.Read(&recv_buffer, tag(5)); |
||||
server_ok(5); |
||||
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); |
||||
EXPECT_EQ(send_request.message(), recv_request.message()); |
||||
|
||||
send_response.set_message(recv_request.message()); |
||||
send_buffer = SerializeToByteBuffer(&send_response); |
||||
stream.Write(*send_buffer, tag(6)); |
||||
send_buffer.reset(); |
||||
server_ok(6); |
||||
|
||||
stream.Finish(Status::OK, tag(7)); |
||||
server_ok(7); |
||||
|
||||
recv_buffer.Clear(); |
||||
call->Read(&recv_buffer, tag(8)); |
||||
client_ok(8); |
||||
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); |
||||
|
||||
call->Finish(&recv_status, tag(9)); |
||||
client_ok(9); |
||||
|
||||
EXPECT_EQ(send_response.message(), recv_response.message()); |
||||
EXPECT_TRUE(recv_status.ok()); |
||||
} |
||||
} |
||||
|
||||
CompletionQueue cli_cq_; |
||||
std::unique_ptr<ServerCompletionQueue> srv_cq_; |
||||
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; |
||||
std::unique_ptr<grpc::GenericStub> generic_stub_; |
||||
std::unique_ptr<Server> server_; |
||||
AsyncGenericService generic_service_; |
||||
const std::string server_host_; |
||||
std::ostringstream server_address_; |
||||
}; |
||||
|
||||
TEST_F(FilterEnd2endTest, SimpleRpc) { |
||||
ResetStub(); |
||||
EXPECT_EQ(0, GetConnectionCounterValue()); |
||||
EXPECT_EQ(0, GetCallCounterValue()); |
||||
SendRpc(1); |
||||
EXPECT_EQ(1, GetConnectionCounterValue()); |
||||
EXPECT_EQ(1, GetCallCounterValue()); |
||||
} |
||||
|
||||
TEST_F(FilterEnd2endTest, SequentialRpcs) { |
||||
ResetStub(); |
||||
EXPECT_EQ(0, GetConnectionCounterValue()); |
||||
EXPECT_EQ(0, GetCallCounterValue()); |
||||
SendRpc(10); |
||||
EXPECT_EQ(1, GetConnectionCounterValue()); |
||||
EXPECT_EQ(10, GetCallCounterValue()); |
||||
} |
||||
|
||||
// One ping, one pong.
|
||||
TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { |
||||
ResetStub(); |
||||
EXPECT_EQ(0, GetConnectionCounterValue()); |
||||
EXPECT_EQ(0, GetCallCounterValue()); |
||||
|
||||
const std::string kMethodName( |
||||
"/grpc.cpp.test.util.EchoTestService/BidiStream"); |
||||
EchoRequest send_request; |
||||
EchoRequest recv_request; |
||||
EchoResponse send_response; |
||||
EchoResponse recv_response; |
||||
Status recv_status; |
||||
ClientContext cli_ctx; |
||||
GenericServerContext srv_ctx; |
||||
GenericServerAsyncReaderWriter srv_stream(&srv_ctx); |
||||
|
||||
cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); |
||||
send_request.set_message("Hello"); |
||||
std::thread request_call([this]() { server_ok(2); }); |
||||
std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream = |
||||
generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); |
||||
cli_stream->StartCall(tag(1)); |
||||
client_ok(1); |
||||
|
||||
generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), |
||||
srv_cq_.get(), tag(2)); |
||||
|
||||
request_call.join(); |
||||
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); |
||||
EXPECT_EQ(kMethodName, srv_ctx.method()); |
||||
|
||||
std::unique_ptr<ByteBuffer> send_buffer = |
||||
SerializeToByteBuffer(&send_request); |
||||
cli_stream->Write(*send_buffer, tag(3)); |
||||
send_buffer.reset(); |
||||
client_ok(3); |
||||
|
||||
ByteBuffer recv_buffer; |
||||
srv_stream.Read(&recv_buffer, tag(4)); |
||||
server_ok(4); |
||||
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); |
||||
EXPECT_EQ(send_request.message(), recv_request.message()); |
||||
|
||||
send_response.set_message(recv_request.message()); |
||||
send_buffer = SerializeToByteBuffer(&send_response); |
||||
srv_stream.Write(*send_buffer, tag(5)); |
||||
send_buffer.reset(); |
||||
server_ok(5); |
||||
|
||||
cli_stream->Read(&recv_buffer, tag(6)); |
||||
client_ok(6); |
||||
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); |
||||
EXPECT_EQ(send_response.message(), recv_response.message()); |
||||
|
||||
cli_stream->WritesDone(tag(7)); |
||||
client_ok(7); |
||||
|
||||
srv_stream.Read(&recv_buffer, tag(8)); |
||||
server_fail(8); |
||||
|
||||
srv_stream.Finish(Status::OK, tag(9)); |
||||
server_ok(9); |
||||
|
||||
cli_stream->Finish(&recv_status, tag(10)); |
||||
client_ok(10); |
||||
|
||||
EXPECT_EQ(send_response.message(), recv_response.message()); |
||||
EXPECT_TRUE(recv_status.ok()); |
||||
|
||||
EXPECT_EQ(1, GetCallCounterValue()); |
||||
EXPECT_EQ(1, GetConnectionCounterValue()); |
||||
} |
||||
|
||||
} // namespace
|
||||
} // namespace testing
|
||||
} // namespace grpc
|
||||
|
||||
int main(int argc, char** argv) { |
||||
grpc::testing::TestEnvironment env(&argc, argv); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
Loading…
Reference in new issue