Adding generic rpc and unimplemented rpc test for server interceptors

pull/16842/head
Yash Tibrewal 6 years ago
parent 3896dabb85
commit 312feb4202
  1. 6
      include/grpcpp/impl/codegen/server_interface.h
  2. 17
      src/cpp/server/server_cc.cc
  3. 145
      test/cpp/end2end/server_interceptors_end2end_test.cc

@ -184,7 +184,6 @@ class ServerInterface : public internal::CallHook {
const char* name); const char* name);
virtual bool FinalizeResult(void** tag, bool* status) override { virtual bool FinalizeResult(void** tag, bool* status) override {
gpr_log(GPR_ERROR, "finalize registeredasyncrequest");
/* If we are done intercepting, then there is nothing more for us to do */ /* If we are done intercepting, then there is nothing more for us to do */
if (done_intercepting_) { if (done_intercepting_) {
return BaseAsyncRequest::FinalizeResult(tag, status); return BaseAsyncRequest::FinalizeResult(tag, status);
@ -238,7 +237,6 @@ class ServerInterface : public internal::CallHook {
notification_cq_(notification_cq), notification_cq_(notification_cq),
tag_(tag), tag_(tag),
request_(request) { request_(request) {
gpr_log(GPR_ERROR, "new payload request");
IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
notification_cq); notification_cq);
} }
@ -248,7 +246,6 @@ class ServerInterface : public internal::CallHook {
} }
bool FinalizeResult(void** tag, bool* status) override { bool FinalizeResult(void** tag, bool* status) override {
gpr_log(GPR_ERROR, "finalize PayloadAsyncRequest");
/* If we are done intercepting, then there is nothing more for us to do */ /* If we are done intercepting, then there is nothing more for us to do */
if (done_intercepting_) { if (done_intercepting_) {
return RegisteredAsyncRequest::FinalizeResult(tag, status); return RegisteredAsyncRequest::FinalizeResult(tag, status);
@ -313,7 +310,6 @@ class ServerInterface : public internal::CallHook {
ServerCompletionQueue* notification_cq, void* tag, ServerCompletionQueue* notification_cq, void* tag,
Message* message) { Message* message) {
GPR_CODEGEN_ASSERT(method); GPR_CODEGEN_ASSERT(method);
gpr_log(GPR_ERROR, "request async method with payload");
new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
notification_cq, tag, message); notification_cq, tag, message);
} }
@ -324,7 +320,6 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) { ServerCompletionQueue* notification_cq, void* tag) {
GPR_CODEGEN_ASSERT(method); GPR_CODEGEN_ASSERT(method);
gpr_log(GPR_ERROR, "request async method with no payload");
new NoPayloadAsyncRequest(method, this, context, stream, call_cq, new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
notification_cq, tag); notification_cq, tag);
} }
@ -334,7 +329,6 @@ class ServerInterface : public internal::CallHook {
CompletionQueue* call_cq, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, ServerCompletionQueue* notification_cq,
void* tag) { void* tag) {
gpr_log(GPR_ERROR, "request async generic call");
new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
tag, true); tag, true);
} }

@ -132,10 +132,13 @@ class Server::UnimplementedAsyncResponse final
~UnimplementedAsyncResponse() { delete request_; } ~UnimplementedAsyncResponse() { delete request_; }
bool FinalizeResult(void** tag, bool* status) override { bool FinalizeResult(void** tag, bool* status) override {
internal::CallOpSet< if (internal::CallOpSet<
internal::CallOpSendInitialMetadata, internal::CallOpSendInitialMetadata,
internal::CallOpServerSendStatus>::FinalizeResult(tag, status); internal::CallOpServerSendStatus>::FinalizeResult(tag, status)) {
delete this; delete this;
} else {
// The tag was swallowed due to interception. We will see it again.
}
return false; return false;
} }
@ -755,7 +758,6 @@ ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
/* Set up interception state partially for the receive ops. call_wrapper_ is /* Set up interception state partially for the receive ops. call_wrapper_ is
* not filled at this point, but it will be filled before the interceptors are * not filled at this point, but it will be filled before the interceptors are
* run. */ * run. */
gpr_log(GPR_ERROR, "Created base async request");
interceptor_methods_.SetCall(&call_wrapper_); interceptor_methods_.SetCall(&call_wrapper_);
interceptor_methods_.SetReverse(); interceptor_methods_.SetReverse();
call_cq_->RegisterAvalanching(); // This op will trigger more ops call_cq_->RegisterAvalanching(); // This op will trigger more ops
@ -767,9 +769,7 @@ ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag, bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
bool* status) { bool* status) {
gpr_log(GPR_ERROR, "in finalize result");
if (done_intercepting_) { if (done_intercepting_) {
gpr_log(GPR_ERROR, "done running interceptors");
*tag = tag_; *tag = tag_;
if (delete_on_finalize_) { if (delete_on_finalize_) {
delete this; delete this;
@ -788,7 +788,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
stream_->BindCall(&call_wrapper_); stream_->BindCall(&call_wrapper_);
if (*status && call_ && call_wrapper_.server_rpc_info()) { if (*status && call_ && call_wrapper_.server_rpc_info()) {
gpr_log(GPR_ERROR, "here");
done_intercepting_ = true; done_intercepting_ = true;
// Set interception point for RECV INITIAL METADATA // Set interception point for RECV INITIAL METADATA
interceptor_methods_.AddInterceptionHookPoint( interceptor_methods_.AddInterceptionHookPoint(
@ -803,7 +802,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
// There were interceptors to be run, so // There were interceptors to be run, so
// ContinueFinalizeResultAfterInterception will be run when interceptors // ContinueFinalizeResultAfterInterception will be run when interceptors
// are done. // are done.
gpr_log(GPR_ERROR, "don't return this tag");
return false; return false;
} }
} }
@ -819,7 +817,6 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest:: void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() { ContinueFinalizeResultAfterInterception() {
gpr_log(GPR_ERROR, "continue finalize result");
context_->BeginCompletionOp(&call_wrapper_); context_->BeginCompletionOp(&call_wrapper_);
// Queue a tag which will be returned immediately // Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx; grpc_core::ExecCtx exec_ctx;

@ -425,6 +425,151 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
grpc_recycle_unused_port(port); grpc_recycle_unused_port(port);
} }
TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
DummyInterceptor::Reset();
int port = grpc_pick_unused_port_or_die();
string server_address = "localhost:" + std::to_string(port);
ServerBuilder builder;
AsyncGenericService service;
builder.AddListeningPort(server_address, InsecureServerCredentials());
builder.RegisterAsyncGenericService(&service);
std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
creators;
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
}
builder.experimental().SetInterceptorCreators(std::move(creators));
auto cq = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
ChannelArguments args;
auto channel = CreateChannel(server_address, InsecureChannelCredentials());
GenericStub generic_stub(channel);
const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
GenericServerContext srv_ctx;
GenericServerAsyncReaderWriter stream(&srv_ctx);
// The string needs to be long enough to test heap-based slice.
send_request.set_message("Hello");
cli_ctx.AddMetadata("testkey", "testvalue");
std::unique_ptr<GenericClientAsyncReaderWriter> call =
generic_stub.PrepareCall(&cli_ctx, kMethodName, cq.get());
call->StartCall(tag(1));
Verifier().Expect(1, true).Verify(cq.get());
std::unique_ptr<ByteBuffer> send_buffer =
SerializeToByteBuffer(&send_request);
call->Write(*send_buffer, tag(2));
// Send ByteBuffer can be destroyed after calling Write.
send_buffer.reset();
Verifier().Expect(2, true).Verify(cq.get());
call->WritesDone(tag(3));
Verifier().Expect(3, true).Verify(cq.get());
service.RequestCall(&srv_ctx, &stream, cq.get(), cq.get(), tag(4));
Verifier().Expect(4, true).Verify(cq.get());
EXPECT_EQ(kMethodName, srv_ctx.method());
EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
srv_ctx.AddTrailingMetadata("testkey", "testvalue");
ByteBuffer recv_buffer;
stream.Read(&recv_buffer, tag(5));
Verifier().Expect(5, true).Verify(cq.get());
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
send_buffer = SerializeToByteBuffer(&send_response);
stream.Write(*send_buffer, tag(6));
send_buffer.reset();
Verifier().Expect(6, true).Verify(cq.get());
stream.Finish(Status::OK, tag(7));
Verifier().Expect(7, true).Verify(cq.get());
recv_buffer.Clear();
call->Read(&recv_buffer, tag(8));
Verifier().Expect(8, true).Verify(cq.get());
EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
call->Finish(&recv_status, tag(9));
Verifier().Expect(9, true).Verify(cq.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
"testvalue"));
// Make sure all 20 dummy interceptors were run
EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
server->Shutdown();
cq->Shutdown();
void* ignored_tag;
bool ignored_ok;
while (cq->Next(&ignored_tag, &ignored_ok))
;
grpc_recycle_unused_port(port);
}
TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
DummyInterceptor::Reset();
int port = grpc_pick_unused_port_or_die();
string server_address = "localhost:" + std::to_string(port);
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
creators;
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
}
builder.experimental().SetInterceptorCreators(std::move(creators));
auto cq = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();
ChannelArguments args;
std::shared_ptr<Channel> channel =
CreateChannel(server_address, InsecureChannelCredentials());
std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
EchoRequest send_request;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier().Expect(4, true).Verify(cq.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
// Make sure all 20 dummy interceptors were run
// EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
server->Shutdown();
cq->Shutdown();
void* ignored_tag;
bool ignored_ok;
while (cq->Next(&ignored_tag, &ignored_ok))
;
grpc_recycle_unused_port(port);
}
} // namespace } // namespace
} // namespace testing } // namespace testing
} // namespace grpc } // namespace grpc

Loading…
Cancel
Save