Merge pull request #12100 from bigcommerce/ruby-interceptors-2

Add Ruby interceptors
pull/12559/head
Mehrdad Afshari 7 years ago committed by GitHub
commit f3f420d91f
  1. 1
      src/ruby/lib/grpc.rb
  2. 43
      src/ruby/lib/grpc/generic/active_call.rb
  3. 29
      src/ruby/lib/grpc/generic/bidi_call.rb
  4. 133
      src/ruby/lib/grpc/generic/client_stub.rb
  5. 53
      src/ruby/lib/grpc/generic/interceptor_registry.rb
  6. 186
      src/ruby/lib/grpc/generic/interceptors.rb
  7. 80
      src/ruby/lib/grpc/generic/rpc_desc.rb
  8. 18
      src/ruby/lib/grpc/generic/rpc_server.rb
  9. 1
      src/ruby/pb/grpc/testing/duplicate/echo_duplicate_services_pb.rb
  10. 35
      src/ruby/spec/channel_connection_spec.rb
  11. 19
      src/ruby/spec/generic/active_call_spec.rb
  12. 153
      src/ruby/spec/generic/client_interceptors_spec.rb
  13. 65
      src/ruby/spec/generic/interceptor_registry_spec.rb
  14. 35
      src/ruby/spec/generic/rpc_server_spec.rb
  15. 218
      src/ruby/spec/generic/server_interceptors_spec.rb
  16. 4
      src/ruby/spec/spec_helper.rb
  17. 73
      src/ruby/spec/support/helpers.rb
  18. 147
      src/ruby/spec/support/services.rb

@ -24,6 +24,7 @@ require_relative 'grpc/generic/active_call'
require_relative 'grpc/generic/client_stub'
require_relative 'grpc/generic/service'
require_relative 'grpc/generic/rpc_server'
require_relative 'grpc/generic/interceptors'
begin
file = File.open(ssl_roots_path)

@ -154,6 +154,15 @@ module GRPC
Operation.new(self)
end
##
# Returns a restricted view of this ActiveCall for use in interceptors
#
# @return [InterceptableView]
#
def interceptable
InterceptableView.new(self)
end
def receive_and_check_status
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
@ -515,15 +524,27 @@ module GRPC
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received,
req_view: MultiReqView.new(self))
bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
# @param mth [Proc] generates the BiDi stream replies
# @param interception_ctx [InterceptionContext]
#
def run_server_bidi(mth, interception_ctx)
view = multi_req_view
bidi_call = BidiCall.new(
@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received,
req_view: view
)
requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
interception_ctx.intercept!(
:bidi_streamer,
call: view,
method: mth,
requests: requests
) do
bidi_call.run_on_server(mth, requests)
end
end
# Waits till an operation completes
@ -645,5 +666,9 @@ module GRPC
Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=, :trailing_metadata)
# InterceptableView further limits access to an ActiveCall's methods
# for use in interceptors on the client, exposing only the deadline
InterceptableView = view_class(:deadline)
end
end

@ -87,23 +87,32 @@ module GRPC
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
# @param set_input_steam_done [Proc] call back to call when
# the reads have been completely read through.
def run_on_server(gen_each_reply, set_input_stream_done)
# @param [Proc] gen_each_reply generates the BiDi stream replies.
# @param [Enumerable] requests The enumerable of requests to run
def run_on_server(gen_each_reply, requests)
replies = nil
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
replys = gen_each_reply.call(
read_loop(set_input_stream_done, is_client: false))
replies = gen_each_reply.call(requests)
elsif gen_each_reply.arity == 2
replys = gen_each_reply.call(
read_loop(set_input_stream_done, is_client: false),
@req_view)
replies = gen_each_reply.call(requests, @req_view)
else
fail 'Illegal arity of reply generator'
end
write_loop(replys, is_client: false)
write_loop(replies, is_client: false)
end
##
# Read the next stream iteration
#
# @param [Proc] finalize_stream callback to call when the reads have been
# completely read through.
# @param [Boolean] is_client If this is a client or server request
#
def read_next_loop(finalize_stream, is_client = false)
read_loop(finalize_stream, is_client: is_client)
end
private

@ -89,17 +89,23 @@ module GRPC
# used within a gRPC server.
# @param channel_args [Hash] the channel arguments. Note: this argument is
# ignored if the channel_override argument is provided.
# @param interceptors [Array<GRPC::ClientInterceptor>] An array of
# GRPC::ClientInterceptor objects that will be used for
# intercepting calls before they are executed
# Interceptors are an EXPERIMENTAL API.
def initialize(host, creds,
channel_override: nil,
timeout: nil,
propagate_mask: nil,
channel_args: {})
channel_args: {},
interceptors: [])
@ch = ClientStub.setup_channel(channel_override, host, creds,
channel_args)
alt_host = channel_args[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
@propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
@interceptors = InterceptorRegistry.new(interceptors)
end
# request_response sends a request to a GRPC server, and returns the
@ -149,16 +155,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
return c.request_response(req, metadata: metadata) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, metadata: metadata)
interception_context = @interceptors.build_context
intercept_args = {
method: method,
request: req,
call: c.interceptable,
metadata: metadata
}
if return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
interception_context.intercept!(:request_response, intercept_args) do
c.request_response(req, metadata: metadata)
end
end
op
else
interception_context.intercept!(:request_response, intercept_args) do
c.request_response(req, metadata: metadata)
end
end
op
end
# client_streamer sends a stream of requests to a GRPC server, and
@ -213,16 +232,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
return c.client_streamer(requests, metadata: metadata) unless return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.client_streamer(requests)
interception_context = @interceptors.build_context
intercept_args = {
method: method,
requests: requests,
call: c.interceptable,
metadata: metadata
}
if return_op
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
interception_context.intercept!(:client_streamer, intercept_args) do
c.client_streamer(requests)
end
end
op
else
interception_context.intercept!(:client_streamer, intercept_args) do
c.client_streamer(requests, metadata: metadata)
end
end
op
end
# server_streamer sends one request to the GRPC server, which yields a
@ -292,16 +324,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
return c.server_streamer(req, metadata: metadata, &blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.server_streamer(req, &blk)
interception_context = @interceptors.build_context
intercept_args = {
method: method,
request: req,
call: c.interceptable,
metadata: metadata
}
if return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
interception_context.intercept!(:server_streamer, intercept_args) do
c.server_streamer(req, &blk)
end
end
op
else
interception_context.intercept!(:server_streamer, intercept_args) do
c.server_streamer(req, metadata: metadata, &blk)
end
end
op
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@ -405,17 +450,29 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
return c.bidi_streamer(requests, metadata: metadata,
&blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.bidi_streamer(requests, &blk)
interception_context = @interceptors.build_context
intercept_args = {
method: method,
requests: requests,
call: c.interceptable,
metadata: metadata
}
if return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
interception_context.intercept!(:bidi_streamer, intercept_args) do
c.bidi_streamer(requests, &blk)
end
end
op
else
interception_context.intercept!(:bidi_streamer, intercept_args) do
c.bidi_streamer(requests, metadata: metadata, &blk)
end
end
op
end
private

@ -0,0 +1,53 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# GRPC contains the General RPC module.
module GRPC
##
# Represents a registry of added interceptors available for enumeration.
# The registry can be used for both server and client interceptors.
# This class is internal to gRPC and not meant for public usage.
#
class InterceptorRegistry
##
# An error raised when an interceptor is attempted to be added
# that does not extend GRPC::Interceptor
#
class DescendantError < StandardError; end
##
# Initialize the registry with an empty interceptor list
# This is an EXPERIMENTAL API.
#
def initialize(interceptors = [])
@interceptors = []
interceptors.each do |i|
base = GRPC::Interceptor
unless i.class.ancestors.include?(base)
fail DescendantError, "Interceptors must descend from #{base}"
end
@interceptors << i
end
end
##
# Builds an interception context from this registry
#
# @return [InterceptionContext]
#
def build_context
InterceptionContext.new(@interceptors)
end
end
end

@ -0,0 +1,186 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require_relative 'interceptor_registry'
# GRPC contains the General RPC module.
module GRPC
##
# Base class for interception in GRPC
#
class Interceptor
##
# @param [Hash] options A hash of options that will be used
# by the interceptor. This is an EXPERIMENTAL API.
#
def initialize(options = {})
@options = options || {}
end
end
##
# ClientInterceptor allows for wrapping outbound gRPC client stub requests.
# This is an EXPERIMENTAL API.
#
class ClientInterceptor < Interceptor
##
# Intercept a unary request response call
#
# @param [Object] request
# @param [GRPC::ActiveCall] call
# @param [Method] method
# @param [Hash] metadata
#
def request_response(request:, call:, method:, metadata:)
GRPC.logger.debug "Intercepting request response method #{method}" \
" for request #{request} with call #{call} and metadata: #{metadata}"
yield
end
##
# Intercept a client streaming call
#
# @param [Enumerable] requests
# @param [GRPC::ActiveCall] call
# @param [Method] method
# @param [Hash] metadata
#
def client_streamer(requests:, call:, method:, metadata:)
GRPC.logger.debug "Intercepting client streamer method #{method}" \
" for requests #{requests} with call #{call} and metadata: #{metadata}"
yield
end
##
# Intercept a server streaming call
#
# @param [Object] request
# @param [GRPC::ActiveCall] call
# @param [Method] method
# @param [Hash] metadata
#
def server_streamer(request:, call:, method:, metadata:)
GRPC.logger.debug "Intercepting server streamer method #{method}" \
" for request #{request} with call #{call} and metadata: #{metadata}"
yield
end
##
# Intercept a BiDi streaming call
#
# @param [Enumerable] requests
# @param [GRPC::ActiveCall] call
# @param [Method] method
# @param [Hash] metadata
#
def bidi_streamer(requests:, call:, method:, metadata:)
GRPC.logger.debug "Intercepting bidi streamer method #{method}" \
" for requests #{requests} with call #{call} and metadata: #{metadata}"
yield
end
end
##
# ServerInterceptor allows for wrapping gRPC server execution handling.
# This is an EXPERIMENTAL API.
#
class ServerInterceptor < Interceptor
##
# Intercept a unary request response call.
#
# @param [Object] request
# @param [GRPC::ActiveCall::SingleReqView] call
# @param [Method] method
#
def request_response(request:, call:, method:)
GRPC.logger.debug "Intercepting request response method #{method}" \
" for request #{request} with call #{call}"
yield
end
##
# Intercept a client streaming call
#
# @param [GRPC::ActiveCall::MultiReqView] call
# @param [Method] method
#
def client_streamer(call:, method:)
GRPC.logger.debug "Intercepting client streamer method #{method}" \
" with call #{call}"
yield
end
##
# Intercept a server streaming call
#
# @param [Object] request
# @param [GRPC::ActiveCall::SingleReqView] call
# @param [Method] method
#
def server_streamer(request:, call:, method:)
GRPC.logger.debug "Intercepting server streamer method #{method}" \
" for request #{request} with call #{call}"
yield
end
##
# Intercept a BiDi streaming call
#
# @param [Enumerable<Object>] requests
# @param [GRPC::ActiveCall::MultiReqView] call
# @param [Method] method
#
def bidi_streamer(requests:, call:, method:)
GRPC.logger.debug "Intercepting bidi streamer method #{method}" \
" for requests #{requests} with call #{call}"
yield
end
end
##
# Represents the context in which an interceptor runs. Used to provide an
# injectable mechanism for handling interception. This is an EXPERIMENTAL API.
#
class InterceptionContext
##
# @param [Array<GRPC::Interceptor>]
#
def initialize(interceptors = [])
@interceptors = interceptors.dup
end
##
# Intercept the call and fire out to interceptors in a FIFO execution.
# This is an EXPERIMENTAL API.
#
# @param [Symbol] type The request type
# @param [Hash] args The arguments for the call
#
def intercept!(type, args = {})
return yield if @interceptors.none?
i = @interceptors.pop
return yield unless i
i.send(type, args) do
if @interceptors.any?
intercept!(type, args) do
yield
end
else
yield
end
end
end
end
end

@ -47,43 +47,85 @@ module GRPC
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
def handle_request_response(active_call, mth)
def handle_request_response(active_call, mth, inter_ctx)
req = active_call.read_unary_request
resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
call = active_call.single_req_view
inter_ctx.intercept!(
:request_response,
method: mth,
call: call,
request: req
) do
resp = mth.call(req, call)
active_call.server_unary_response(
resp,
trailing_metadata: active_call.output_metadata
)
end
end
def handle_client_streamer(active_call, mth)
resp = mth.call(active_call.multi_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
def handle_client_streamer(active_call, mth, inter_ctx)
call = active_call.multi_req_view
inter_ctx.intercept!(
:client_streamer,
method: mth,
call: call
) do
resp = mth.call(call)
active_call.server_unary_response(
resp,
trailing_metadata: active_call.output_metadata
)
end
end
def handle_server_streamer(active_call, mth)
def handle_server_streamer(active_call, mth, inter_ctx)
req = active_call.read_unary_request
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
call = active_call.single_req_view
inter_ctx.intercept!(
:server_streamer,
method: mth,
call: call,
request: req
) do
replies = mth.call(req, call)
replies.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
end
def handle_bidi_streamer(active_call, mth)
active_call.run_server_bidi(mth)
##
# @param [GRPC::ActiveCall] active_call
# @param [Method] mth
# @param [Array<GRPC::InterceptionContext>] inter_ctx
#
def handle_bidi_streamer(active_call, mth, inter_ctx)
active_call.run_server_bidi(mth, inter_ctx)
send_status(active_call, OK, 'OK', active_call.output_metadata)
end
def run_server_method(active_call, mth)
##
# @param [GRPC::ActiveCall] active_call The current active call object
# for the request
# @param [Method] mth The current RPC method being called
# @param [GRPC::InterceptionContext] inter_ctx The interception context
# being executed
#
def run_server_method(active_call, mth, inter_ctx = InterceptionContext.new)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
handle_request_response(active_call, mth)
handle_request_response(active_call, mth, inter_ctx)
elsif client_streamer?
handle_client_streamer(active_call, mth)
handle_client_streamer(active_call, mth, inter_ctx)
elsif server_streamer?
handle_server_streamer(active_call, mth)
handle_server_streamer(active_call, mth, inter_ctx)
else # is a bidi_stream
handle_bidi_streamer(active_call, mth)
handle_bidi_streamer(active_call, mth, inter_ctx)
end
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error

@ -196,11 +196,18 @@ module GRPC
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
#
# * interceptors:
# Am array of GRPC::ServerInterceptor objects that will be used for
# intercepting server handlers to provide extra functionality.
# Interceptors are an EXPERIMENTAL API.
#
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
connect_md_proc:nil,
server_args:{})
server_args:{},
interceptors:[])
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@ -212,6 +219,7 @@ module GRPC
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
@server = Core::Server.new(server_args)
@interceptors = InterceptorRegistry.new(interceptors)
end
# stops a running server
@ -374,7 +382,11 @@ module GRPC
@pool.schedule(active_call) do |ac|
c, mth = ac
begin
rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
rpc_descs[mth].run_server_method(
c,
rpc_handlers[mth],
@interceptors.build_context
)
rescue StandardError
c.send_status(GRPC::Core::StatusCodes::INTERNAL,
'Server handler failed')
@ -382,7 +394,7 @@ module GRPC
end
end
rescue Core::CallError, RuntimeError => e
# these might happen for various reasonse. The correct behaviour of
# these might happen for various reasons. The correct behavior of
# the server is to log them and continue, if it's not shutting down.
if running_state == :running
GRPC.logger.warn("server call failed: #{e}")

@ -34,6 +34,7 @@ module Grpc
self.service_name = 'grpc.testing.duplicate.EchoTestService'
rpc :Echo, Grpc::Testing::EchoRequest, Grpc::Testing::EchoResponse
rpc :ResponseStream, Grpc::Testing::EchoRequest, stream(Grpc::Testing::EchoResponse)
end
Stub = Service.rpc_stub_class

@ -11,45 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'grpc'
require 'spec_helper'
require 'timeout'
include Timeout
include GRPC::Core
# A test message
class EchoMsg
def self.marshal(_o)
''
end
def self.unmarshal(_o)
EchoMsg.new
end
end
# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md
def initialize(**kw)
@trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
end
EchoStub = EchoService.rpc_stub_class
def start_server(port = 0)
@srv = GRPC::RpcServer.new(pool_size: 1)
server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
require 'grpc'
require 'spec_helper'
include GRPC::Core::StatusCodes
@ -82,6 +82,16 @@ describe GRPC::ActiveCall do
end
end
end
describe '#interceptable' do
it 'exposes a fixed subset of the ActiveCall.methods' do
want = %w(deadline)
v = @client_call.interceptable
want.each do |w|
expect(v.methods.include?(w))
end
end
end
end
describe '#remote_send' do
@ -609,9 +619,11 @@ describe GRPC::ActiveCall do
msgs
end
int_ctx = GRPC::InterceptionContext.new
@server_thread = Thread.new do
@server_call.run_server_bidi(
fake_gen_each_reply_with_no_call_param)
fake_gen_each_reply_with_no_call_param, int_ctx)
@server_call.send_status(@server_status)
end
end
@ -624,10 +636,11 @@ describe GRPC::ActiveCall do
call_param.send_initial_metadata
msgs
end
int_ctx = GRPC::InterceptionContext.new
@server_thread = Thread.new do
@server_call.run_server_bidi(
fake_gen_each_reply_with_call_param)
fake_gen_each_reply_with_call_param, int_ctx)
@server_call.send_status(@server_status)
end
end

@ -0,0 +1,153 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'spec_helper'
describe 'Client Interceptors' do
let(:interceptor) { TestClientInterceptor.new }
let(:interceptors_opts) { { interceptors: [interceptor] } }
let(:request) { EchoMsg.new }
let(:service) { EchoService }
before(:each) do
build_rpc_server
end
context 'when a client interceptor is added' do
context 'with a request/response call' do
it 'should be called', server: true do
expect(interceptor).to receive(:request_response)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
expect_any_instance_of(GRPC::ActiveCall).to receive(:request_response)
.once.and_call_original
expect(stub.an_rpc(request)).to be_a(EchoMsg)
end
end
it 'can modify outgoing metadata', server: true do
expect(interceptor).to receive(:request_response)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
expect_any_instance_of(GRPC::ActiveCall).to receive(:request_response)
.with(request, metadata: { 'foo' => 'bar_from_request_response' })
.once.and_call_original
expect(stub.an_rpc(request)).to be_a(EchoMsg)
end
end
end
context 'with a client streaming call' do
it 'should be called', server: true do
expect(interceptor).to receive(:client_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
expect_any_instance_of(GRPC::ActiveCall).to receive(:client_streamer)
.once.and_call_original
requests = [EchoMsg.new, EchoMsg.new]
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg)
end
end
it 'can modify outgoing metadata', server: true do
expect(interceptor).to receive(:client_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
requests = [EchoMsg.new, EchoMsg.new]
expect_any_instance_of(GRPC::ActiveCall).to receive(:client_streamer)
.with(requests, metadata: { 'foo' => 'bar_from_client_streamer' })
.once.and_call_original
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg)
end
end
end
context 'with a server streaming call' do
it 'should be called', server: true do
expect(interceptor).to receive(:server_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
request = EchoMsg.new
expect_any_instance_of(GRPC::ActiveCall).to receive(:server_streamer)
.once.and_call_original
responses = stub.a_server_streaming_rpc(request)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
it 'can modify outgoing metadata', server: true do
expect(interceptor).to receive(:server_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
request = EchoMsg.new
expect_any_instance_of(GRPC::ActiveCall).to receive(:server_streamer)
.with(request, metadata: { 'foo' => 'bar_from_server_streamer' })
.once.and_call_original
responses = stub.a_server_streaming_rpc(request)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
end
context 'with a bidi call' do
it 'should be called', server: true do
expect(interceptor).to receive(:bidi_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
expect_any_instance_of(GRPC::ActiveCall).to receive(:bidi_streamer)
.once.and_call_original
requests = [EchoMsg.new, EchoMsg.new]
responses = stub.a_bidi_rpc(requests)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
it 'can modify outgoing metadata', server: true do
expect(interceptor).to receive(:bidi_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub, opts: interceptors_opts)
requests = [EchoMsg.new, EchoMsg.new]
expect_any_instance_of(GRPC::ActiveCall).to receive(:bidi_streamer)
.with(requests, metadata: { 'foo' => 'bar_from_bidi_streamer' })
.once.and_call_original
responses = stub.a_bidi_rpc(requests)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
end
end
end

@ -0,0 +1,65 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'spec_helper'
describe GRPC::InterceptorRegistry do
let(:server) { RpcServer.new }
let(:interceptor) { TestServerInterceptor.new }
let(:interceptors) { [interceptor] }
let(:registry) { described_class.new(interceptors) }
describe 'initialization' do
subject { registry }
context 'with an interceptor extending GRPC::ServerInterceptor' do
it 'should add the interceptor to the registry' do
subject
is = registry.instance_variable_get('@interceptors')
expect(is.count).to eq 1
expect(is.first).to eq interceptor
end
end
context 'with multiple interceptors' do
let(:interceptor2) { TestServerInterceptor.new }
let(:interceptor3) { TestServerInterceptor.new }
let(:interceptors) { [interceptor, interceptor2, interceptor3] }
it 'should maintain order of insertion when iterated against' do
subject
is = registry.instance_variable_get('@interceptors')
expect(is.count).to eq 3
is.each_with_index do |i, idx|
case idx
when 0
expect(i).to eq interceptor
when 1
expect(i).to eq interceptor2
when 2
expect(i).to eq interceptor3
end
end
end
end
context 'with an interceptor not extending GRPC::ServerInterceptor' do
let(:interceptor) { Class }
let(:err) { GRPC::InterceptorRegistry::DescendantError }
it 'should raise an InvalidArgument exception' do
expect { subject }.to raise_error(err)
end
end
end
end

@ -11,8 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'grpc'
require 'spec_helper'
def load_test_certs
test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
@ -28,17 +27,6 @@ def check_md(wanted_md, received_md)
end
end
# A test message
class EchoMsg
def self.marshal(_o)
''
end
def self.unmarshal(_o)
EchoMsg.new
end
end
# A test service with no methods.
class EmptyService
include GRPC::GenericService
@ -50,27 +38,6 @@ class NoRpcImplementation
rpc :an_rpc, EchoMsg, EchoMsg
end
# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
attr_reader :received_md
def initialize(**kw)
@trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
end
EchoStub = EchoService.rpc_stub_class
# A test service with an implementation that fails with BadStatus
class FailingService
include GRPC::GenericService

@ -0,0 +1,218 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'spec_helper'
describe 'Server Interceptors' do
let(:interceptor) { TestServerInterceptor.new }
let(:request) { EchoMsg.new }
let(:trailing_metadata) { {} }
let(:service) { EchoService.new(trailing_metadata) }
let(:interceptors) { [] }
before(:each) do
build_rpc_server(server_opts: { interceptors: interceptors })
end
context 'when a server interceptor is added' do
let(:interceptors) { [interceptor] }
let(:client_metadata) { { client_md: 'test' } }
let(:client_call_opts) { { metadata: client_metadata, return_op: true } }
context 'with a request/response call' do
let(:trailing_metadata) { { server_om: 'from_request_response' } }
it 'should be called', server: true do
expect(interceptor).to receive(:request_response)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect(stub.an_rpc(request)).to be_a(EchoMsg)
end
end
it 'can modify trailing metadata', server: true do
expect(interceptor).to receive(:request_response)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect_any_instance_of(GRPC::ActiveCall).to(
receive(:request_response).with(request, metadata: client_metadata)
.once.and_call_original
)
op = stub.an_rpc(request, client_call_opts)
msg = op.execute
expect(op.trailing_metadata).to eq(
'interc' => 'from_request_response',
'server_om' => 'from_request_response'
)
expect(msg).to be_a(EchoMsg)
end
end
end
context 'with a client streaming call' do
let(:trailing_metadata) { { server_om: 'from_client_streamer' } }
let(:requests) { [EchoMsg.new, EchoMsg.new] }
it 'should be called', server: true do
expect(interceptor).to receive(:client_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect(stub.a_client_streaming_rpc(requests)).to be_a(EchoMsg)
end
end
it 'can modify trailing metadata', server: true do
expect(interceptor).to receive(:client_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect_any_instance_of(GRPC::ActiveCall).to(
receive(:client_streamer).with(requests)
.once.and_call_original
)
op = stub.a_client_streaming_rpc(requests, client_call_opts)
msg = op.execute
expect(op.trailing_metadata).to eq(
'interc' => 'from_client_streamer',
'server_om' => 'from_client_streamer'
)
expect(msg).to be_a(EchoMsg)
end
end
end
context 'with a server streaming call' do
let(:trailing_metadata) { { server_om: 'from_server_streamer' } }
let(:request) { EchoMsg.new }
it 'should be called', server: true do
expect(interceptor).to receive(:server_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
responses = stub.a_server_streaming_rpc(request)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
it 'can modify trailing metadata', server: true do
expect(interceptor).to receive(:server_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect_any_instance_of(GRPC::ActiveCall).to(
receive(:server_streamer).with(request)
.once.and_call_original
)
op = stub.a_server_streaming_rpc(request, client_call_opts)
responses = op.execute
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
expect(op.trailing_metadata).to eq(
'interc' => 'from_server_streamer',
'server_om' => 'from_server_streamer'
)
end
end
end
context 'with a bidi call' do
let(:trailing_metadata) { { server_om: 'from_bidi_streamer' } }
let(:requests) { [EchoMsg.new, EchoMsg.new] }
it 'should be called', server: true do
expect(interceptor).to receive(:bidi_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
responses = stub.a_bidi_rpc(requests)
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
end
end
it 'can modify trailing metadata', server: true do
expect(interceptor).to receive(:bidi_streamer)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect_any_instance_of(GRPC::ActiveCall).to(
receive(:bidi_streamer).with(requests)
.once.and_call_original
)
op = stub.a_bidi_rpc(requests, client_call_opts)
responses = op.execute
responses.each do |r|
expect(r).to be_a(EchoMsg)
end
expect(op.trailing_metadata).to eq(
'interc' => 'from_bidi_streamer',
'server_om' => 'from_bidi_streamer'
)
end
end
end
end
context 'when multiple interceptors are added' do
let(:interceptor2) { TestServerInterceptor.new }
let(:interceptor3) { TestServerInterceptor.new }
let(:interceptors) do
[
interceptor,
interceptor2,
interceptor3
]
end
it 'each should be called', server: true do
expect(interceptor).to receive(:request_response)
.once.and_call_original
expect(interceptor2).to receive(:request_response)
.once.and_call_original
expect(interceptor3).to receive(:request_response)
.once.and_call_original
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect(stub.an_rpc(request)).to be_a(EchoMsg)
end
end
end
context 'when an interceptor is not added' do
it 'should not be called', server: true do
expect(interceptor).to_not receive(:call)
run_services_on_server(@server, services: [service]) do
stub = build_insecure_stub(EchoStub)
expect(stub.an_rpc(request)).to be_a(EchoMsg)
end
end
end
end

@ -32,6 +32,9 @@ require 'rspec'
require 'logging'
require 'rspec/logging_helper'
require_relative 'support/services'
require_relative 'support/helpers'
# GRPC is the general RPC module
#
# Configure its logging for fine-grained log control during test runs
@ -49,6 +52,7 @@ Logging.logger['GRPC::BidiCall'].level = :info
RSpec.configure do |config|
include RSpec::LoggingHelper
config.capture_log_messages # comment this out to see logs during test runs
include GRPC::Spec::Helpers
end
RSpec::Expectations.configuration.warn_about_potential_false_positives = false

@ -0,0 +1,73 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# GRPC contains the General RPC module.
module GRPC
##
# GRPC RSpec base module
#
module Spec
##
# A module that is used for providing generic helpers across the
# GRPC test suite
#
module Helpers
# Shortcut syntax for a GRPC RPC Server
RpcServer = GRPC::RpcServer
##
# Build an RPC server used for testing
#
def build_rpc_server(server_opts: {},
client_opts: {})
@server = RpcServer.new({ poll_period: 1 }.merge(server_opts))
@port = @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
@host = "0.0.0.0:#{@port}"
@client_opts = client_opts
@server
end
##
# Run services on an RPC server, yielding to allow testing within
#
# @param [RpcServer] server
# @param [Array<Class>] services
#
def run_services_on_server(server, services: [])
services.each do |s|
server.handle(s)
end
t = Thread.new { server.run }
server.wait_till_running
yield
server.stop
t.join
end
##
# Build an insecure stub from a given stub class
#
# @param [Class] klass
# @param [String] host
#
def build_insecure_stub(klass, host: nil, opts: nil)
host ||= @host
opts ||= @client_opts
klass.new(host, :this_channel_is_insecure, **opts)
end
end
end
end

@ -0,0 +1,147 @@
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Test stubs for various scenarios
require 'grpc'
# A test message
class EchoMsg
def self.marshal(_o)
''
end
def self.unmarshal(_o)
EchoMsg.new
end
end
# A test service with an echo implementation.
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
attr_reader :received_md
def initialize(**kw)
@trailing_metadata = kw
@received_md = []
end
def an_rpc(req, call)
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
end
def a_client_streaming_rpc(call)
# iterate through requests so call can complete
call.output_metadata.update(@trailing_metadata)
call.each_remote_read.each { |r| p r }
EchoMsg.new
end
def a_server_streaming_rpc(_req, call)
call.output_metadata.update(@trailing_metadata)
[EchoMsg.new, EchoMsg.new]
end
def a_bidi_rpc(requests, call)
call.output_metadata.update(@trailing_metadata)
requests.each { |r| p r }
[EchoMsg.new, EchoMsg.new]
end
end
EchoStub = EchoService.rpc_stub_class
# For testing server interceptors
class TestServerInterceptor < GRPC::ServerInterceptor
def request_response(request:, call:, method:)
p "Received request/response call at method #{method}" \
" with request #{request} for call #{call}"
call.output_metadata[:interc] = 'from_request_response'
p "[GRPC::Ok] (#{method.owner.name}.#{method.name})"
yield
end
def client_streamer(call:, method:)
call.output_metadata[:interc] = 'from_client_streamer'
call.each_remote_read.each do |r|
p "In interceptor: #{r}"
end
p "Received client streamer call at method #{method} for call #{call}"
yield
end
def server_streamer(request:, call:, method:)
p "Received server streamer call at method #{method} with request" \
" #{request} for call #{call}"
call.output_metadata[:interc] = 'from_server_streamer'
yield
end
def bidi_streamer(requests:, call:, method:)
requests.each do |r|
p "Bidi request: #{r}"
end
p "Received bidi streamer call at method #{method} with requests" \
" #{requests} for call #{call}"
call.output_metadata[:interc] = 'from_bidi_streamer'
yield
end
end
# For testing client interceptors
class TestClientInterceptor < GRPC::ClientInterceptor
def request_response(request:, call:, method:, metadata: {})
p "Intercepted request/response call at method #{method}" \
" with request #{request} for call #{call}" \
" and metadata: #{metadata}"
metadata['foo'] = 'bar_from_request_response'
yield
end
def client_streamer(requests:, call:, method:, metadata: {})
p "Received client streamer call at method #{method}" \
" with requests #{requests} for call #{call}" \
" and metadata: #{metadata}"
requests.each do |r|
p "In client interceptor: #{r}"
end
metadata['foo'] = 'bar_from_client_streamer'
yield
end
def server_streamer(request:, call:, method:, metadata: {})
p "Received server streamer call at method #{method}" \
" with request #{request} for call #{call}" \
" and metadata: #{metadata}"
metadata['foo'] = 'bar_from_server_streamer'
yield
end
def bidi_streamer(requests:, call:, method:, metadata: {})
p "Received bidi streamer call at method #{method}" \
"with requests #{requests} for call #{call}" \
" and metadata: #{metadata}"
requests.each do |r|
p "In client interceptor: #{r}"
end
metadata['foo'] = 'bar_from_bidi_streamer'
yield
end
end
Loading…
Cancel
Save