Add optional suffix to client-side stub objects (#25996)

* Adding suffix for stats handling to gRPC

* Fixing typo

* Fixing codegen, adding tests

* Fixing test failures

* Adding nullptr default in StubOptions

* Updating golden test file

* Replacing std::make_unique with absl::make_unique for pre-C++14 compatibility

* Fixing clang format  errors

* Reuse stub options for generic stubs

* Cleaning up compiler warnings in GenericStub
pull/26092/head
mkruskal-google 4 years ago committed by GitHub
parent d483841001
commit 2bf75aa68a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      include/grpcpp/generic/generic_stub.h
  2. 7
      include/grpcpp/impl/codegen/client_context.h
  3. 9
      include/grpcpp/impl/codegen/client_interceptor.h
  4. 21
      include/grpcpp/impl/codegen/rpc_method.h
  5. 15
      include/grpcpp/impl/codegen/stub_options.h
  6. 21
      src/compiler/cpp_generator.cc
  7. 6
      src/cpp/client/channel_cc.cc
  8. 4
      test/cpp/codegen/compiler_test_golden
  9. 73
      test/cpp/end2end/client_callback_end2end_test.cc
  10. 13
      test/cpp/end2end/client_interceptors_end2end_test.cc
  11. 5
      test/cpp/end2end/interceptors_util.cc
  12. 39
      test/cpp/end2end/interceptors_util.h

@ -22,6 +22,7 @@
#include <functional>
#include <grpcpp/client_context.h>
#include <grpcpp/impl/codegen/stub_options.h>
#include <grpcpp/impl/rpc_method.h>
#include <grpcpp/support/async_stream.h>
#include <grpcpp/support/async_unary_call.h>
@ -53,7 +54,8 @@ class TemplatedGenericStub final {
std::unique_ptr<ClientAsyncReaderWriter<RequestType, ResponseType>>
PrepareCall(ClientContext* context, const std::string& method,
::grpc::CompletionQueue* cq) {
return CallInternal(channel_.get(), context, method, cq, false, nullptr);
return CallInternal(channel_.get(), context, method, /*options=*/{}, cq,
false, nullptr);
}
/// Setup a unary call to a named method \a method using \a context, and don't
@ -67,6 +69,7 @@ class TemplatedGenericStub final {
internal::ClientAsyncResponseReaderHelper::Create<ResponseType>(
channel_.get(), cq,
grpc::internal::RpcMethod(method.c_str(),
/*suffix_for_stats=*/nullptr,
grpc::internal::RpcMethod::NORMAL_RPC),
context, request));
}
@ -80,7 +83,8 @@ class TemplatedGenericStub final {
std::unique_ptr<ClientAsyncReaderWriter<RequestType, ResponseType>> Call(
ClientContext* context, const std::string& method,
::grpc::CompletionQueue* cq, void* tag) {
return CallInternal(channel_.get(), context, method, cq, true, tag);
return CallInternal(channel_.get(), context, method, /*options=*/{}, cq,
true, tag);
}
#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
@ -89,7 +93,7 @@ class TemplatedGenericStub final {
void UnaryCall(ClientContext* context, const std::string& method,
const RequestType* request, ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
UnaryCallInternal(context, method, request, response,
UnaryCallInternal(context, method, /*options=*/{}, request, response,
std::move(on_completion));
}
@ -100,7 +104,8 @@ class TemplatedGenericStub final {
void PrepareUnaryCall(ClientContext* context, const std::string& method,
const RequestType* request, ResponseType* response,
ClientUnaryReactor* reactor) {
PrepareUnaryCallInternal(context, method, request, response, reactor);
PrepareUnaryCallInternal(context, method, /*options=*/{}, request, response,
reactor);
}
/// Setup a call to a named method \a method using \a context and tied to
@ -109,7 +114,7 @@ class TemplatedGenericStub final {
void PrepareBidiStreamingCall(
ClientContext* context, const std::string& method,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
PrepareBidiStreamingCallInternal(context, method, reactor);
PrepareBidiStreamingCallInternal(context, method, /*options=*/{}, reactor);
}
#endif
@ -123,9 +128,10 @@ class TemplatedGenericStub final {
/// Setup and start a unary call to a named method \a method using
/// \a context and specifying the \a request and \a response buffers.
void UnaryCall(ClientContext* context, const std::string& method,
const RequestType* request, ResponseType* response,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
stub_->UnaryCallInternal(context, method, request, response,
stub_->UnaryCallInternal(context, method, options, request, response,
std::move(on_completion));
}
@ -134,19 +140,20 @@ class TemplatedGenericStub final {
/// Like any other reactor-based RPC, it will not be activated until
/// StartCall is invoked on its reactor.
void PrepareUnaryCall(ClientContext* context, const std::string& method,
const RequestType* request, ResponseType* response,
ClientUnaryReactor* reactor) {
stub_->PrepareUnaryCallInternal(context, method, request, response,
reactor);
StubOptions options, const RequestType* request,
ResponseType* response, ClientUnaryReactor* reactor) {
stub_->PrepareUnaryCallInternal(context, method, options, request,
response, reactor);
}
/// Setup a call to a named method \a method using \a context and tied to
/// \a reactor . Like any other bidi streaming RPC, it will not be activated
/// until StartCall is invoked on its reactor.
void PrepareBidiStreamingCall(
ClientContext* context, const std::string& method,
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
stub_->PrepareBidiStreamingCallInternal(context, method, reactor);
stub_->PrepareBidiStreamingCallInternal(context, method, options,
reactor);
}
private:
@ -162,48 +169,50 @@ class TemplatedGenericStub final {
std::shared_ptr<grpc::ChannelInterface> channel_;
void UnaryCallInternal(ClientContext* context, const std::string& method,
const RequestType* request, ResponseType* response,
StubOptions options, const RequestType* request,
ResponseType* response,
std::function<void(grpc::Status)> on_completion) {
internal::CallbackUnaryCall(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, std::move(on_completion));
}
void PrepareUnaryCallInternal(ClientContext* context,
const std::string& method,
const std::string& method, StubOptions options,
const RequestType* request,
ResponseType* response,
ClientUnaryReactor* reactor) {
internal::ClientCallbackUnaryFactory::Create<RequestType, ResponseType>(
channel_.get(),
grpc::internal::RpcMethod(method.c_str(),
grpc::internal::RpcMethod(method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::NORMAL_RPC),
context, request, response, reactor);
}
void PrepareBidiStreamingCallInternal(
ClientContext* context, const std::string& method,
ClientContext* context, const std::string& method, StubOptions options,
ClientBidiReactor<RequestType, ResponseType>* reactor) {
internal::ClientCallbackReaderWriterFactory<RequestType, ResponseType>::
Create(channel_.get(),
grpc::internal::RpcMethod(
method.c_str(), grpc::internal::RpcMethod::BIDI_STREAMING),
method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::BIDI_STREAMING),
context, reactor);
}
std::unique_ptr<ClientAsyncReaderWriter<RequestType, ResponseType>>
CallInternal(grpc::ChannelInterface* channel, ClientContext* context,
const std::string& method, ::grpc::CompletionQueue* cq,
bool start, void* tag) {
const std::string& method, StubOptions options,
::grpc::CompletionQueue* cq, bool start, void* tag) {
return std::unique_ptr<ClientAsyncReaderWriter<RequestType, ResponseType>>(
internal::ClientAsyncReaderWriterFactory<RequestType, ResponseType>::
Create(
channel, cq,
grpc::internal::RpcMethod(
method.c_str(), grpc::internal::RpcMethod::BIDI_STREAMING),
context, start, tag));
Create(channel, cq,
grpc::internal::RpcMethod(
method.c_str(), options.suffix_for_stats(),
grpc::internal::RpcMethod::BIDI_STREAMING),
context, start, tag));
}
};

@ -464,12 +464,13 @@ class ClientContext {
const std::shared_ptr<::grpc::Channel>& channel);
grpc::experimental::ClientRpcInfo* set_client_rpc_info(
const char* method, grpc::internal::RpcMethod::RpcType type,
grpc::ChannelInterface* channel,
const char* method, const char* suffix_for_stats,
grpc::internal::RpcMethod::RpcType type, grpc::ChannelInterface* channel,
const std::vector<std::unique_ptr<
grpc::experimental::ClientInterceptorFactoryInterface>>& creators,
size_t interceptor_pos) {
rpc_info_ = grpc::experimental::ClientRpcInfo(this, type, method, channel);
rpc_info_ = grpc::experimental::ClientRpcInfo(this, type, method,
suffix_for_stats, channel);
rpc_info_.RegisterInterceptors(creators, interceptor_pos);
return &rpc_info_;
}

@ -87,6 +87,10 @@ class ClientRpcInfo {
/// Return the fully-specified method name
const char* method() const { return method_; }
/// Return an identifying suffix for the client stub, or nullptr if one wasn't
/// specified.
const char* suffix_for_stats() const { return suffix_for_stats_; }
/// Return a pointer to the channel on which the RPC is being sent
ChannelInterface* channel() { return channel_; }
@ -116,10 +120,12 @@ class ClientRpcInfo {
// Constructor will only be called from ClientContext
ClientRpcInfo(grpc::ClientContext* ctx, internal::RpcMethod::RpcType type,
const char* method, grpc::ChannelInterface* channel)
const char* method, const char* suffix_for_stats,
grpc::ChannelInterface* channel)
: ctx_(ctx),
type_(static_cast<Type>(type)),
method_(method),
suffix_for_stats_(suffix_for_stats),
channel_(channel) {}
// Move assignment should only be used by ClientContext
@ -162,6 +168,7 @@ class ClientRpcInfo {
// TODO(yashykt): make type_ const once move-assignment is deleted
Type type_{Type::UNKNOWN};
const char* method_ = nullptr;
const char* suffix_for_stats_ = nullptr;
grpc::ChannelInterface* channel_ = nullptr;
std::vector<std::unique_ptr<experimental::Interceptor>> interceptors_;
bool hijacked_ = false;

@ -36,21 +36,40 @@ class RpcMethod {
};
RpcMethod(const char* name, RpcType type)
: name_(name), method_type_(type), channel_tag_(nullptr) {}
: name_(name),
suffix_for_stats_(nullptr),
method_type_(type),
channel_tag_(nullptr) {}
RpcMethod(const char* name, const char* suffix_for_stats, RpcType type)
: name_(name),
suffix_for_stats_(suffix_for_stats),
method_type_(type),
channel_tag_(nullptr) {}
RpcMethod(const char* name, RpcType type,
const std::shared_ptr<ChannelInterface>& channel)
: name_(name),
suffix_for_stats_(nullptr),
method_type_(type),
channel_tag_(channel->RegisterMethod(name)) {}
RpcMethod(const char* name, const char* suffix_for_stats, RpcType type,
const std::shared_ptr<ChannelInterface>& channel)
: name_(name),
suffix_for_stats_(suffix_for_stats),
method_type_(type),
channel_tag_(channel->RegisterMethod(name)) {}
const char* name() const { return name_; }
const char* suffix_for_stats() const { return suffix_for_stats_; }
RpcType method_type() const { return method_type_; }
void SetMethodType(RpcType type) { method_type_ = type; }
void* channel_tag() const { return channel_tag_; }
private:
const char* const name_;
const char* const suffix_for_stats_;
RpcType method_type_;
void* const channel_tag_;
};

@ -22,7 +22,20 @@
namespace grpc {
/// Useful interface for generated stubs
class StubOptions {};
class StubOptions {
public:
StubOptions() = default;
explicit StubOptions(const char* suffix_for_stats)
: suffix_for_stats_(suffix_for_stats) {}
void set_suffix_for_stats(const char* suffix_for_stats) {
suffix_for_stats_ = suffix_for_stats;
}
const char* suffix_for_stats() const { return suffix_for_stats_; }
private:
const char* suffix_for_stats_ = nullptr;
};
} // namespace grpc

@ -1557,7 +1557,8 @@ void PrintHeaderService(grpc_generator::Printer* printer,
printer->Indent();
printer->Print(
"Stub(const std::shared_ptr< ::grpc::ChannelInterface>& "
"channel);\n");
"channel, const ::grpc::StubOptions& options = "
"::grpc::StubOptions());\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
@ -2163,12 +2164,13 @@ void PrintSourceService(grpc_generator::Printer* printer,
"const ::grpc::StubOptions& options) {\n"
" (void)options;\n"
" std::unique_ptr< $ns$$Service$::Stub> stub(new "
"$ns$$Service$::Stub(channel));\n"
"$ns$$Service$::Stub(channel, options));\n"
" return stub;\n"
"}\n\n");
printer->Print(*vars,
"$ns$$Service$::Stub::Stub(const std::shared_ptr< "
"::grpc::ChannelInterface>& channel)\n");
"::grpc::ChannelInterface>& channel, const "
"::grpc::StubOptions& options)\n");
printer->Indent();
printer->Print(": channel_(channel)");
for (int i = 0; i < service->method_count(); ++i) {
@ -2187,12 +2189,13 @@ void PrintSourceService(grpc_generator::Printer* printer,
} else {
(*vars)["StreamingType"] = "BIDI_STREAMING";
}
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
"::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
printer->Print(
*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], options.suffix_for_stats(),"
"::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
printer->Print("{}\n\n");
printer->Outdent();

@ -146,9 +146,9 @@ void ChannelResetConnectionBackoff(Channel* channel) {
// ClientRpcInfo should be set before call because set_call also checks
// whether the call has been cancelled, and if the call was cancelled, we
// should notify the interceptors too.
auto* info =
context->set_client_rpc_info(method.name(), method.method_type(), this,
interceptor_creators_, interceptor_pos);
auto* info = context->set_client_rpc_info(
method.name(), method.suffix_for_stats(), method.method_type(), this,
interceptor_creators_, interceptor_pos);
context->set_call(c_call, shared_from_this());
return ::grpc::internal::Call(c_call, this, cq, info);

@ -165,7 +165,7 @@ class ServiceA final {
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
::grpc::Status MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response) override;
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodA1Raw(context, request, cq));
@ -922,7 +922,7 @@ class ServiceB final {
};
class Stub final : public StubInterface {
public:
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);
Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
::grpc::Status MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::testing::Response* response) override;
std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> AsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) {
return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(AsyncMethodB1Raw(context, request, cq));

@ -121,10 +121,14 @@ class ClientCallbackEnd2endTest
is_server_started_ = true;
}
void ResetStub() {
void ResetStub(
std::unique_ptr<experimental::ClientInterceptorFactoryInterface>
interceptor = nullptr) {
ChannelArguments args;
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &args);
auto interceptors = CreatePhonyClientInterceptors();
if (interceptor != nullptr) interceptors.push_back(std::move(interceptor));
switch (GetParam().protocol) {
case Protocol::TCP:
if (!GetParam().use_interceptors) {
@ -133,7 +137,7 @@ class ClientCallbackEnd2endTest
} else {
channel_ = CreateCustomChannelWithInterceptors(
server_address_.str(), channel_creds, args,
CreatePhonyClientInterceptors());
std::move(interceptors));
}
break;
case Protocol::INPROC:
@ -141,7 +145,7 @@ class ClientCallbackEnd2endTest
channel_ = server_->InProcessChannel(args);
} else {
channel_ = server_->experimental().InProcessChannelWithInterceptors(
args, CreatePhonyClientInterceptors());
args, std::move(interceptors));
}
break;
default:
@ -212,7 +216,8 @@ class ClientCallbackEnd2endTest
}
}
void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
void SendRpcsGeneric(int num_rpcs, bool maybe_except,
const char* suffix_for_stats) {
const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
std::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
@ -228,8 +233,9 @@ class ClientCallbackEnd2endTest
std::mutex mu;
std::condition_variable cv;
bool done = false;
StubOptions options(suffix_for_stats);
generic_stub_->experimental().UnaryCall(
&cli_ctx, kMethodName, send_buf.get(), &recv_buf,
&cli_ctx, kMethodName, options, send_buf.get(), &recv_buf,
[&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
GPR_ASSERT(s.ok());
@ -254,7 +260,8 @@ class ClientCallbackEnd2endTest
}
}
void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done,
const char* suffix_for_stats) {
const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
std::string test_string("");
for (int i = 0; i < num_rpcs; i++) {
@ -263,14 +270,16 @@ class ClientCallbackEnd2endTest
ByteBuffer> {
public:
Client(ClientCallbackEnd2endTest* test, const std::string& method_name,
const std::string& test_str, int reuses, bool do_writes_done)
const char* suffix_for_stats, const std::string& test_str,
int reuses, bool do_writes_done)
: reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
activate_ = [this, test, method_name, test_str] {
activate_ = [this, test, method_name, suffix_for_stats, test_str] {
if (reuses_remaining_ > 0) {
cli_ctx_ = absl::make_unique<ClientContext>();
reuses_remaining_--;
StubOptions options(suffix_for_stats);
test->generic_stub_->experimental().PrepareBidiStreamingCall(
cli_ctx_.get(), method_name, this);
cli_ctx_.get(), method_name, options, this);
request_.set_message(test_str);
send_buf_ = SerializeToByteBuffer(&request_);
StartWrite(send_buf_.get());
@ -317,7 +326,8 @@ class ClientCallbackEnd2endTest
const bool do_writes_done_;
};
Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
Client rpc(this, kMethodName, suffix_for_stats, test_string, reuses,
do_writes_done);
rpc.Await();
}
@ -501,29 +511,46 @@ TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
ResetStub();
SendRpcsGeneric(10, false);
ResetStub(absl::make_unique<TestInterceptorFactory>(
"/grpc.testing.EchoTestService/Echo", nullptr));
SendRpcsGeneric(10, false, /*suffix_for_stats=*/nullptr);
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsWithSuffix) {
ResetStub(absl::make_unique<TestInterceptorFactory>(
"/grpc.testing.EchoTestService/Echo", "TestSuffix"));
SendRpcsGeneric(10, false, "TestSuffix");
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
ResetStub();
SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
ResetStub(absl::make_unique<TestInterceptorFactory>(
"/grpc.testing.EchoTestService/Echo", nullptr));
SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true,
/*suffix_for_stats=*/nullptr);
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithSuffix) {
ResetStub(absl::make_unique<TestInterceptorFactory>(
"/grpc.testing.EchoTestService/Echo", "TestSuffix"));
SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true, "TestSuffix");
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
ResetStub();
SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true,
/*suffix_for_stats=*/nullptr);
}
TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
ResetStub();
SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false,
/*suffix_for_stats=*/nullptr);
}
#if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
ResetStub();
SendRpcsGeneric(10, true);
SendRpcsGeneric(10, true, nullptr);
}
#endif
@ -822,18 +849,22 @@ TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
}
TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
ResetStub();
const std::string kMethodName("/grpc.testing.EchoTestService/Echo");
constexpr char kSuffixForStats[] = "TestSuffixForStats";
ResetStub(
absl::make_unique<TestInterceptorFactory>(kMethodName, kSuffixForStats));
class UnaryClient : public grpc::experimental::ClientUnaryReactor {
public:
UnaryClient(grpc::GenericStub* stub, const std::string& method_name) {
UnaryClient(grpc::GenericStub* stub, const std::string& method_name,
const char* suffix_for_stats) {
cli_ctx_.AddMetadata("key1", "val1");
cli_ctx_.AddMetadata("key2", "val2");
request_.mutable_param()->set_echo_metadata_initially(true);
request_.set_message("Hello metadata");
send_buf_ = SerializeToByteBuffer(&request_);
stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name,
StubOptions options(suffix_for_stats);
stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name, options,
send_buf_.get(), &recv_buf_, this);
StartCall();
}
@ -878,7 +909,7 @@ TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
bool initial_metadata_done_{false};
};
UnaryClient test{generic_stub_.get(), kMethodName};
UnaryClient test{generic_stub_.get(), kMethodName, kSuffixForStats};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {

@ -76,6 +76,7 @@ class HijackingInterceptor : public experimental::Interceptor {
info_ = info;
// Make sure it is the right method
EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0);
EXPECT_EQ(info->suffix_for_stats(), nullptr);
EXPECT_EQ(info->type(), experimental::ClientRpcInfo::Type::UNARY);
}
@ -183,6 +184,7 @@ class HijackingInterceptorMakesAnotherCall : public experimental::Interceptor {
info_ = info;
// Make sure it is the right method
EXPECT_EQ(strcmp("/grpc.testing.EchoTestService/Echo", info->method()), 0);
EXPECT_EQ(strcmp("TestSuffixForStats", info->suffix_for_stats()), 0);
}
void Intercept(experimental::InterceptorBatchMethods* methods) override {
@ -304,6 +306,7 @@ class BidiStreamingRpcHijackingInterceptor : public experimental::Interceptor {
explicit BidiStreamingRpcHijackingInterceptor(
experimental::ClientRpcInfo* info) {
info_ = info;
EXPECT_EQ(info->suffix_for_stats(), nullptr);
}
void Intercept(experimental::InterceptorBatchMethods* methods) override {
@ -375,6 +378,10 @@ class ClientStreamingRpcHijackingInterceptor
explicit ClientStreamingRpcHijackingInterceptor(
experimental::ClientRpcInfo* info) {
info_ = info;
EXPECT_EQ(
strcmp("/grpc.testing.EchoTestService/RequestStream", info->method()),
0);
EXPECT_EQ(strcmp("TestSuffixForStats", info->suffix_for_stats()), 0);
}
void Intercept(experimental::InterceptorBatchMethods* methods) override {
bool hijack = false;
@ -431,6 +438,7 @@ class ServerStreamingRpcHijackingInterceptor
experimental::ClientRpcInfo* info) {
info_ = info;
got_failed_message_ = false;
EXPECT_EQ(info->suffix_for_stats(), nullptr);
}
void Intercept(experimental::InterceptorBatchMethods* methods) override {
@ -931,7 +939,7 @@ TEST_F(ClientInterceptorsEnd2endTest,
auto channel = server_->experimental().InProcessChannelWithInterceptors(
args, std::move(creators));
MakeCall(channel);
MakeCall(channel, StubOptions("TestSuffixForStats"));
// Make sure all interceptors were run once, since the hijacking interceptor
// makes an RPC on the intercepted channel
EXPECT_EQ(PhonyInterceptor::GetNumTimesRun(), 12);
@ -1059,7 +1067,8 @@ TEST_F(ClientInterceptorsStreamingEnd2endTest, ClientStreamingHijackingTest) {
auto channel = experimental::CreateCustomChannelWithInterceptors(
server_address_, InsecureChannelCredentials(), args, std::move(creators));
auto stub = grpc::testing::EchoTestService::NewStub(channel);
auto stub = grpc::testing::EchoTestService::NewStub(
channel, StubOptions("TestSuffixForStats"));
ClientContext ctx;
EchoRequest req;
EchoResponse resp;

@ -27,8 +27,9 @@ std::atomic<int> PhonyInterceptor::num_times_run_;
std::atomic<int> PhonyInterceptor::num_times_run_reverse_;
std::atomic<int> PhonyInterceptor::num_times_cancel_;
void MakeCall(const std::shared_ptr<Channel>& channel) {
auto stub = grpc::testing::EchoTestService::NewStub(channel);
void MakeCall(const std::shared_ptr<Channel>& channel,
const StubOptions& options) {
auto stub = grpc::testing::EchoTestService::NewStub(channel, options);
ClientContext ctx;
EchoRequest req;
req.mutable_param()->set_echo_metadata(true);

@ -82,6 +82,42 @@ class PhonyInterceptorFactory
}
};
/* This interceptor can be used to test the interception mechanism. */
class TestInterceptor : public experimental::Interceptor {
public:
TestInterceptor(const std::string& method, const char* suffix_for_stats,
experimental::ClientRpcInfo* info) {
EXPECT_EQ(info->method(), method);
if (suffix_for_stats == nullptr || info->suffix_for_stats() == nullptr) {
EXPECT_EQ(info->suffix_for_stats(), suffix_for_stats);
} else {
EXPECT_EQ(strcmp(info->suffix_for_stats(), suffix_for_stats), 0);
}
}
void Intercept(experimental::InterceptorBatchMethods* methods) override {
methods->Proceed();
}
};
class TestInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface {
public:
TestInterceptorFactory(const std::string& method,
const char* suffix_for_stats)
: method_(method), suffix_for_stats_(suffix_for_stats) {}
experimental::Interceptor* CreateClientInterceptor(
experimental::ClientRpcInfo* info) override {
return new TestInterceptor(method_, suffix_for_stats_, info);
}
private:
std::string method_;
const char* suffix_for_stats_;
};
/* This interceptor factory returns nullptr on interceptor creation */
class NullInterceptorFactory
: public experimental::ClientInterceptorFactoryInterface,
@ -164,7 +200,8 @@ class EchoTestServiceStreamingImpl : public EchoTestService::Service {
constexpr int kNumStreamingMessages = 10;
void MakeCall(const std::shared_ptr<Channel>& channel);
void MakeCall(const std::shared_ptr<Channel>& channel,
const StubOptions& options = StubOptions());
void MakeClientStreamingCall(const std::shared_ptr<Channel>& channel);

Loading…
Cancel
Save