mirror of https://github.com/grpc/grpc.git
parent
76c9e08063
commit
eec7a917ba
18 changed files with 1143 additions and 150 deletions
@ -0,0 +1,53 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# GRPC contains the General RPC module. |
||||
module GRPC |
||||
## |
||||
# Represents a registry of added interceptors available for enumeration. |
||||
# The registry can be used for both server and client interceptors. |
||||
# This class is internal to gRPC and not meant for public usage. |
||||
# |
||||
class InterceptorRegistry |
||||
## |
||||
# An error raised when an interceptor is attempted to be added |
||||
# that does not extend GRPC::Interceptor |
||||
# |
||||
class DescendantError < StandardError; end |
||||
|
||||
## |
||||
# Initialize the registry with an empty interceptor list |
||||
# This is an EXPERIMENTAL API. |
||||
# |
||||
def initialize(interceptors = []) |
||||
@interceptors = [] |
||||
interceptors.each do |i| |
||||
base = GRPC::Interceptor |
||||
unless i.class.ancestors.include?(base) |
||||
fail DescendantError, "Interceptors must descend from #{base}" |
||||
end |
||||
@interceptors << i |
||||
end |
||||
end |
||||
|
||||
## |
||||
# Builds an interception context from this registry |
||||
# |
||||
# @return [InterceptionContext] |
||||
# |
||||
def build_context |
||||
InterceptionContext.new(@interceptors) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,186 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
require_relative 'interceptor_registry' |
||||
|
||||
# GRPC contains the General RPC module. |
||||
module GRPC |
||||
## |
||||
# Base class for interception in GRPC |
||||
# |
||||
class Interceptor |
||||
## |
||||
# @param [Hash] options A hash of options that will be used |
||||
# by the interceptor. This is an EXPERIMENTAL API. |
||||
# |
||||
def initialize(options = {}) |
||||
@options = options || {} |
||||
end |
||||
end |
||||
|
||||
## |
||||
# ClientInterceptor allows for wrapping outbound gRPC client stub requests. |
||||
# This is an EXPERIMENTAL API. |
||||
# |
||||
class ClientInterceptor < Interceptor |
||||
## |
||||
# Intercept a unary request response call |
||||
# |
||||
# @param [Object] request |
||||
# @param [GRPC::ActiveCall] call |
||||
# @param [Method] method |
||||
# @param [Hash] metadata |
||||
# |
||||
def request_response(request:, call:, method:, metadata:) |
||||
GRPC.logger.debug "Intercepting request response method #{method}" \ |
||||
" for request #{request} with call #{call} and metadata: #{metadata}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a client streaming call |
||||
# |
||||
# @param [Enumerable] requests |
||||
# @param [GRPC::ActiveCall] call |
||||
# @param [Method] method |
||||
# @param [Hash] metadata |
||||
# |
||||
def client_streamer(requests:, call:, method:, metadata:) |
||||
GRPC.logger.debug "Intercepting client streamer method #{method}" \ |
||||
" for requests #{requests} with call #{call} and metadata: #{metadata}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a server streaming call |
||||
# |
||||
# @param [Object] request |
||||
# @param [GRPC::ActiveCall] call |
||||
# @param [Method] method |
||||
# @param [Hash] metadata |
||||
# |
||||
def server_streamer(request:, call:, method:, metadata:) |
||||
GRPC.logger.debug "Intercepting server streamer method #{method}" \ |
||||
" for request #{request} with call #{call} and metadata: #{metadata}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a BiDi streaming call |
||||
# |
||||
# @param [Enumerable] requests |
||||
# @param [GRPC::ActiveCall] call |
||||
# @param [Method] method |
||||
# @param [Hash] metadata |
||||
# |
||||
def bidi_streamer(requests:, call:, method:, metadata:) |
||||
GRPC.logger.debug "Intercepting bidi streamer method #{method}" \ |
||||
" for requests #{requests} with call #{call} and metadata: #{metadata}" |
||||
yield |
||||
end |
||||
end |
||||
|
||||
## |
||||
# ServerInterceptor allows for wrapping gRPC server execution handling. |
||||
# This is an EXPERIMENTAL API. |
||||
# |
||||
class ServerInterceptor < Interceptor |
||||
## |
||||
# Intercept a unary request response call. |
||||
# |
||||
# @param [Object] request |
||||
# @param [GRPC::ActiveCall::SingleReqView] call |
||||
# @param [Method] method |
||||
# |
||||
def request_response(request:, call:, method:) |
||||
GRPC.logger.debug "Intercepting request response method #{method}" \ |
||||
" for request #{request} with call #{call}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a client streaming call |
||||
# |
||||
# @param [GRPC::ActiveCall::MultiReqView] call |
||||
# @param [Method] method |
||||
# |
||||
def client_streamer(call:, method:) |
||||
GRPC.logger.debug "Intercepting client streamer method #{method}" \ |
||||
" with call #{call}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a server streaming call |
||||
# |
||||
# @param [Object] request |
||||
# @param [GRPC::ActiveCall::SingleReqView] call |
||||
# @param [Method] method |
||||
# |
||||
def server_streamer(request:, call:, method:) |
||||
GRPC.logger.debug "Intercepting server streamer method #{method}" \ |
||||
" for request #{request} with call #{call}" |
||||
yield |
||||
end |
||||
|
||||
## |
||||
# Intercept a BiDi streaming call |
||||
# |
||||
# @param [Enumerable<Object>] requests |
||||
# @param [GRPC::ActiveCall::MultiReqView] call |
||||
# @param [Method] method |
||||
# |
||||
def bidi_streamer(requests:, call:, method:) |
||||
GRPC.logger.debug "Intercepting bidi streamer method #{method}" \ |
||||
" for requests #{requests} with call #{call}" |
||||
yield |
||||
end |
||||
end |
||||
|
||||
## |
||||
# Represents the context in which an interceptor runs. Used to provide an |
||||
# injectable mechanism for handling interception. This is an EXPERIMENTAL API. |
||||
# |
||||
class InterceptionContext |
||||
## |
||||
# @param [Array<GRPC::Interceptor>] |
||||
# |
||||
def initialize(interceptors = []) |
||||
@interceptors = interceptors.dup |
||||
end |
||||
|
||||
## |
||||
# Intercept the call and fire out to interceptors in a FIFO execution. |
||||
# This is an EXPERIMENTAL API. |
||||
# |
||||
# @param [Symbol] type The request type |
||||
# @param [Hash] args The arguments for the call |
||||
# |
||||
def intercept!(type, args = {}) |
||||
return yield if @interceptors.none? |
||||
|
||||
i = @interceptors.pop |
||||
return yield unless i |
||||
|
||||
i.send(type, args) do |
||||
if @interceptors.any? |
||||
intercept!(type, args) do |
||||
yield |
||||
end |
||||
else |
||||
yield |
||||
end |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,153 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
require 'spec_helper' |
||||
|
||||
describe 'Client Interceptors' do |
||||
let(:interceptor) { TestClientInterceptor.new } |
||||
let(:interceptors_opts) { { interceptors: [interceptor] } } |
||||
let(:request) { EchoMsg.new } |
||||
let(:service) { EchoService } |
||||
|
||||
before(:each) do |
||||
build_rpc_server |
||||
end |
||||
|
||||
context 'when a client interceptor is added' do |
||||
context 'with a request/response call' do |
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:request_response) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:request_response) |
||||
.once.and_call_original |
||||
expect(stub.an_rpc(request)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
|
||||
it 'can modify outgoing metadata', server: true do |
||||
expect(interceptor).to receive(:request_response) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:request_response) |
||||
.with(request, metadata: { 'foo' => 'bar_from_request_response' }) |
||||
.once.and_call_original |
||||
expect(stub.an_rpc(request)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a client streaming call' do |
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:client_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:client_streamer) |
||||
.once.and_call_original |
||||
requests = [EchoMsg.new, EchoMsg.new] |
||||
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
|
||||
it 'can modify outgoing metadata', server: true do |
||||
expect(interceptor).to receive(:client_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
requests = [EchoMsg.new, EchoMsg.new] |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:client_streamer) |
||||
.with(requests, metadata: { 'foo' => 'bar_from_client_streamer' }) |
||||
.once.and_call_original |
||||
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a server streaming call' do |
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:server_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
request = EchoMsg.new |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:server_streamer) |
||||
.once.and_call_original |
||||
responses = stub.a_server_streaming_rpc(request) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
it 'can modify outgoing metadata', server: true do |
||||
expect(interceptor).to receive(:server_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
request = EchoMsg.new |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:server_streamer) |
||||
.with(request, metadata: { 'foo' => 'bar_from_server_streamer' }) |
||||
.once.and_call_original |
||||
responses = stub.a_server_streaming_rpc(request) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a bidi call' do |
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:bidi_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:bidi_streamer) |
||||
.once.and_call_original |
||||
requests = [EchoMsg.new, EchoMsg.new] |
||||
responses = stub.a_bidi_rpc(requests) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
it 'can modify outgoing metadata', server: true do |
||||
expect(interceptor).to receive(:bidi_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub, opts: interceptors_opts) |
||||
requests = [EchoMsg.new, EchoMsg.new] |
||||
expect_any_instance_of(GRPC::ActiveCall).to receive(:bidi_streamer) |
||||
.with(requests, metadata: { 'foo' => 'bar_from_bidi_streamer' }) |
||||
.once.and_call_original |
||||
responses = stub.a_bidi_rpc(requests) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,65 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
require 'spec_helper' |
||||
|
||||
describe GRPC::InterceptorRegistry do |
||||
let(:server) { RpcServer.new } |
||||
let(:interceptor) { TestServerInterceptor.new } |
||||
let(:interceptors) { [interceptor] } |
||||
let(:registry) { described_class.new(interceptors) } |
||||
|
||||
describe 'initialization' do |
||||
subject { registry } |
||||
|
||||
context 'with an interceptor extending GRPC::ServerInterceptor' do |
||||
it 'should add the interceptor to the registry' do |
||||
subject |
||||
is = registry.instance_variable_get('@interceptors') |
||||
expect(is.count).to eq 1 |
||||
expect(is.first).to eq interceptor |
||||
end |
||||
end |
||||
|
||||
context 'with multiple interceptors' do |
||||
let(:interceptor2) { TestServerInterceptor.new } |
||||
let(:interceptor3) { TestServerInterceptor.new } |
||||
let(:interceptors) { [interceptor, interceptor2, interceptor3] } |
||||
|
||||
it 'should maintain order of insertion when iterated against' do |
||||
subject |
||||
is = registry.instance_variable_get('@interceptors') |
||||
expect(is.count).to eq 3 |
||||
is.each_with_index do |i, idx| |
||||
case idx |
||||
when 0 |
||||
expect(i).to eq interceptor |
||||
when 1 |
||||
expect(i).to eq interceptor2 |
||||
when 2 |
||||
expect(i).to eq interceptor3 |
||||
end |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with an interceptor not extending GRPC::ServerInterceptor' do |
||||
let(:interceptor) { Class } |
||||
let(:err) { GRPC::InterceptorRegistry::DescendantError } |
||||
|
||||
it 'should raise an InvalidArgument exception' do |
||||
expect { subject }.to raise_error(err) |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,218 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
require 'spec_helper' |
||||
|
||||
describe 'Server Interceptors' do |
||||
let(:interceptor) { TestServerInterceptor.new } |
||||
let(:request) { EchoMsg.new } |
||||
let(:trailing_metadata) { {} } |
||||
let(:service) { EchoService.new(trailing_metadata) } |
||||
let(:interceptors) { [] } |
||||
|
||||
before(:each) do |
||||
build_rpc_server(server_opts: { interceptors: interceptors }) |
||||
end |
||||
|
||||
context 'when a server interceptor is added' do |
||||
let(:interceptors) { [interceptor] } |
||||
let(:client_metadata) { { client_md: 'test' } } |
||||
let(:client_call_opts) { { metadata: client_metadata, return_op: true } } |
||||
|
||||
context 'with a request/response call' do |
||||
let(:trailing_metadata) { { server_om: 'from_request_response' } } |
||||
|
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:request_response) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect(stub.an_rpc(request)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
|
||||
it 'can modify trailing metadata', server: true do |
||||
expect(interceptor).to receive(:request_response) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect_any_instance_of(GRPC::ActiveCall).to( |
||||
receive(:request_response).with(request, metadata: client_metadata) |
||||
.once.and_call_original |
||||
) |
||||
op = stub.an_rpc(request, client_call_opts) |
||||
msg = op.execute |
||||
expect(op.trailing_metadata).to eq( |
||||
'interc' => 'from_request_response', |
||||
'server_om' => 'from_request_response' |
||||
) |
||||
expect(msg).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a client streaming call' do |
||||
let(:trailing_metadata) { { server_om: 'from_client_streamer' } } |
||||
let(:requests) { [EchoMsg.new, EchoMsg.new] } |
||||
|
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:client_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
|
||||
it 'can modify trailing metadata', server: true do |
||||
expect(interceptor).to receive(:client_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect_any_instance_of(GRPC::ActiveCall).to( |
||||
receive(:client_streamer).with(requests) |
||||
.once.and_call_original |
||||
) |
||||
op = stub.a_client_streaming_rpc(requests, client_call_opts) |
||||
msg = op.execute |
||||
expect(op.trailing_metadata).to eq( |
||||
'interc' => 'from_client_streamer', |
||||
'server_om' => 'from_client_streamer' |
||||
) |
||||
expect(msg).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a server streaming call' do |
||||
let(:trailing_metadata) { { server_om: 'from_server_streamer' } } |
||||
let(:request) { EchoMsg.new } |
||||
|
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:server_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
responses = stub.a_server_streaming_rpc(request) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
it 'can modify trailing metadata', server: true do |
||||
expect(interceptor).to receive(:server_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect_any_instance_of(GRPC::ActiveCall).to( |
||||
receive(:server_streamer).with(request) |
||||
.once.and_call_original |
||||
) |
||||
op = stub.a_server_streaming_rpc(request, client_call_opts) |
||||
responses = op.execute |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
expect(op.trailing_metadata).to eq( |
||||
'interc' => 'from_server_streamer', |
||||
'server_om' => 'from_server_streamer' |
||||
) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'with a bidi call' do |
||||
let(:trailing_metadata) { { server_om: 'from_bidi_streamer' } } |
||||
let(:requests) { [EchoMsg.new, EchoMsg.new] } |
||||
|
||||
it 'should be called', server: true do |
||||
expect(interceptor).to receive(:bidi_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
responses = stub.a_bidi_rpc(requests) |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
it 'can modify trailing metadata', server: true do |
||||
expect(interceptor).to receive(:bidi_streamer) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect_any_instance_of(GRPC::ActiveCall).to( |
||||
receive(:bidi_streamer).with(requests) |
||||
.once.and_call_original |
||||
) |
||||
op = stub.a_bidi_rpc(requests, client_call_opts) |
||||
responses = op.execute |
||||
responses.each do |r| |
||||
expect(r).to be_a(EchoMsg) |
||||
end |
||||
expect(op.trailing_metadata).to eq( |
||||
'interc' => 'from_bidi_streamer', |
||||
'server_om' => 'from_bidi_streamer' |
||||
) |
||||
end |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'when multiple interceptors are added' do |
||||
let(:interceptor2) { TestServerInterceptor.new } |
||||
let(:interceptor3) { TestServerInterceptor.new } |
||||
let(:interceptors) do |
||||
[ |
||||
interceptor, |
||||
interceptor2, |
||||
interceptor3 |
||||
] |
||||
end |
||||
|
||||
it 'each should be called', server: true do |
||||
expect(interceptor).to receive(:request_response) |
||||
.once.and_call_original |
||||
expect(interceptor2).to receive(:request_response) |
||||
.once.and_call_original |
||||
expect(interceptor3).to receive(:request_response) |
||||
.once.and_call_original |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect(stub.an_rpc(request)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
|
||||
context 'when an interceptor is not added' do |
||||
it 'should not be called', server: true do |
||||
expect(interceptor).to_not receive(:call) |
||||
|
||||
run_services_on_server(@server, services: [service]) do |
||||
stub = build_insecure_stub(EchoStub) |
||||
expect(stub.an_rpc(request)).to be_a(EchoMsg) |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,73 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# GRPC contains the General RPC module. |
||||
module GRPC |
||||
## |
||||
# GRPC RSpec base module |
||||
# |
||||
module Spec |
||||
## |
||||
# A module that is used for providing generic helpers across the |
||||
# GRPC test suite |
||||
# |
||||
module Helpers |
||||
# Shortcut syntax for a GRPC RPC Server |
||||
RpcServer = GRPC::RpcServer |
||||
|
||||
## |
||||
# Build an RPC server used for testing |
||||
# |
||||
def build_rpc_server(server_opts: {}, |
||||
client_opts: {}) |
||||
@server = RpcServer.new({ poll_period: 1 }.merge(server_opts)) |
||||
@port = @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) |
||||
@host = "0.0.0.0:#{@port}" |
||||
@client_opts = client_opts |
||||
@server |
||||
end |
||||
|
||||
## |
||||
# Run services on an RPC server, yielding to allow testing within |
||||
# |
||||
# @param [RpcServer] server |
||||
# @param [Array<Class>] services |
||||
# |
||||
def run_services_on_server(server, services: []) |
||||
services.each do |s| |
||||
server.handle(s) |
||||
end |
||||
t = Thread.new { server.run } |
||||
server.wait_till_running |
||||
|
||||
yield |
||||
|
||||
server.stop |
||||
t.join |
||||
end |
||||
|
||||
## |
||||
# Build an insecure stub from a given stub class |
||||
# |
||||
# @param [Class] klass |
||||
# @param [String] host |
||||
# |
||||
def build_insecure_stub(klass, host: nil, opts: nil) |
||||
host ||= @host |
||||
opts ||= @client_opts |
||||
klass.new(host, :this_channel_is_insecure, **opts) |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,147 @@ |
||||
# Copyright 2017 gRPC authors. |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); |
||||
# you may not use this file except in compliance with the License. |
||||
# You may obtain a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, |
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
# See the License for the specific language governing permissions and |
||||
# limitations under the License. |
||||
|
||||
# Test stubs for various scenarios |
||||
require 'grpc' |
||||
|
||||
# A test message |
||||
class EchoMsg |
||||
def self.marshal(_o) |
||||
'' |
||||
end |
||||
|
||||
def self.unmarshal(_o) |
||||
EchoMsg.new |
||||
end |
||||
end |
||||
|
||||
# A test service with an echo implementation. |
||||
class EchoService |
||||
include GRPC::GenericService |
||||
rpc :an_rpc, EchoMsg, EchoMsg |
||||
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg |
||||
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) |
||||
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) |
||||
attr_reader :received_md |
||||
|
||||
def initialize(**kw) |
||||
@trailing_metadata = kw |
||||
@received_md = [] |
||||
end |
||||
|
||||
def an_rpc(req, call) |
||||
GRPC.logger.info('echo service received a request') |
||||
call.output_metadata.update(@trailing_metadata) |
||||
@received_md << call.metadata unless call.metadata.nil? |
||||
req |
||||
end |
||||
|
||||
def a_client_streaming_rpc(call) |
||||
# iterate through requests so call can complete |
||||
call.output_metadata.update(@trailing_metadata) |
||||
call.each_remote_read.each { |r| p r } |
||||
EchoMsg.new |
||||
end |
||||
|
||||
def a_server_streaming_rpc(_req, call) |
||||
call.output_metadata.update(@trailing_metadata) |
||||
[EchoMsg.new, EchoMsg.new] |
||||
end |
||||
|
||||
def a_bidi_rpc(requests, call) |
||||
call.output_metadata.update(@trailing_metadata) |
||||
requests.each { |r| p r } |
||||
[EchoMsg.new, EchoMsg.new] |
||||
end |
||||
end |
||||
|
||||
EchoStub = EchoService.rpc_stub_class |
||||
|
||||
# For testing server interceptors |
||||
class TestServerInterceptor < GRPC::ServerInterceptor |
||||
def request_response(request:, call:, method:) |
||||
p "Received request/response call at method #{method}" \ |
||||
" with request #{request} for call #{call}" |
||||
call.output_metadata[:interc] = 'from_request_response' |
||||
p "[GRPC::Ok] (#{method.owner.name}.#{method.name})" |
||||
yield |
||||
end |
||||
|
||||
def client_streamer(call:, method:) |
||||
call.output_metadata[:interc] = 'from_client_streamer' |
||||
call.each_remote_read.each do |r| |
||||
p "In interceptor: #{r}" |
||||
end |
||||
p "Received client streamer call at method #{method} for call #{call}" |
||||
yield |
||||
end |
||||
|
||||
def server_streamer(request:, call:, method:) |
||||
p "Received server streamer call at method #{method} with request" \ |
||||
" #{request} for call #{call}" |
||||
call.output_metadata[:interc] = 'from_server_streamer' |
||||
yield |
||||
end |
||||
|
||||
def bidi_streamer(requests:, call:, method:) |
||||
requests.each do |r| |
||||
p "Bidi request: #{r}" |
||||
end |
||||
p "Received bidi streamer call at method #{method} with requests" \ |
||||
" #{requests} for call #{call}" |
||||
call.output_metadata[:interc] = 'from_bidi_streamer' |
||||
yield |
||||
end |
||||
end |
||||
|
||||
# For testing client interceptors |
||||
class TestClientInterceptor < GRPC::ClientInterceptor |
||||
def request_response(request:, call:, method:, metadata: {}) |
||||
p "Intercepted request/response call at method #{method}" \ |
||||
" with request #{request} for call #{call}" \ |
||||
" and metadata: #{metadata}" |
||||
metadata['foo'] = 'bar_from_request_response' |
||||
yield |
||||
end |
||||
|
||||
def client_streamer(requests:, call:, method:, metadata: {}) |
||||
p "Received client streamer call at method #{method}" \ |
||||
" with requests #{requests} for call #{call}" \ |
||||
" and metadata: #{metadata}" |
||||
requests.each do |r| |
||||
p "In client interceptor: #{r}" |
||||
end |
||||
metadata['foo'] = 'bar_from_client_streamer' |
||||
yield |
||||
end |
||||
|
||||
def server_streamer(request:, call:, method:, metadata: {}) |
||||
p "Received server streamer call at method #{method}" \ |
||||
" with request #{request} for call #{call}" \ |
||||
" and metadata: #{metadata}" |
||||
metadata['foo'] = 'bar_from_server_streamer' |
||||
yield |
||||
end |
||||
|
||||
def bidi_streamer(requests:, call:, method:, metadata: {}) |
||||
p "Received bidi streamer call at method #{method}" \ |
||||
"with requests #{requests} for call #{call}" \ |
||||
" and metadata: #{metadata}" |
||||
requests.each do |r| |
||||
p "In client interceptor: #{r}" |
||||
end |
||||
metadata['foo'] = 'bar_from_bidi_streamer' |
||||
yield |
||||
end |
||||
end |
Loading…
Reference in new issue