Merge pull request #1676 from codeship/ngauthier-encapsulate-logging

Encapsulate logger within GRPC module
pull/1694/head
Tim Emiola 10 years ago
commit 3a303361e6
  1. 4
      src/ruby/bin/apis/pubsub_demo.rb
  2. 6
      src/ruby/bin/interop/interop_client.rb
  3. 8
      src/ruby/bin/interop/interop_server.rb
  4. 44
      src/ruby/bin/math_client.rb
  5. 8
      src/ruby/bin/math_server.rb
  6. 8
      src/ruby/bin/noproto_client.rb
  7. 6
      src/ruby/bin/noproto_server.rb
  8. 10
      src/ruby/lib/grpc/generic/active_call.rb
  9. 22
      src/ruby/lib/grpc/generic/bidi_call.rb
  10. 14
      src/ruby/lib/grpc/generic/rpc_desc.rb
  11. 28
      src/ruby/lib/grpc/generic/rpc_server.rb
  12. 8
      src/ruby/lib/grpc/generic/service.rb
  13. 5
      src/ruby/lib/grpc/logconfig.rb
  14. 4
      src/ruby/spec/generic/rpc_server_spec.rb

@ -79,7 +79,7 @@ end
def publisher_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
logger.info("... access PublisherService at #{address}")
GRPC.logger.info("... access PublisherService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
@ -89,7 +89,7 @@ end
def subscriber_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
logger.info("... access SubscriberService at #{address}")
GRPC.logger.info("... access SubscriberService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)

@ -70,7 +70,7 @@ end
# loads the certificates used to access the test server securely.
def load_prod_cert
fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil?
logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
File.open(ENV['SSL_CERT_FILE']).read
end
@ -115,10 +115,10 @@ def create_stub(opts)
stub_opts[:update_metadata] = auth_creds.updater_proc
end
logger.info("... connecting securely to #{address}")
GRPC.logger.info("... connecting securely to #{address}")
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
else
logger.info("... connecting insecurely to #{address}")
GRPC.logger.info("... connecting insecurely to #{address}")
Grpc::Testing::TestService::Stub.new(address)
end
end

@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service
Thread.new do
begin
reqs.each do |req|
logger.info("read #{req.inspect}")
GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
logger.info('finished reads')
GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@ -179,10 +179,10 @@ def main
s = GRPC::RpcServer.new
if opts['secure']
s.add_http2_port(host, test_server_creds)
logger.info("... running securely on #{host}")
GRPC.logger.info("... running securely on #{host}")
else
s.add_http2_port(host)
logger.info("... running insecurely on #{host}")
GRPC.logger.info("... running insecurely on #{host}")
end
s.handle(TestTarget)
s.run_till_terminated

@ -46,51 +46,51 @@ require 'optparse'
include GRPC::Core::TimeConsts
def do_div(stub)
logger.info('request_response')
logger.info('----------------')
GRPC.logger.info('request_response')
GRPC.logger.info('----------------')
req = Math::DivArgs.new(dividend: 7, divisor: 3)
logger.info("div(7/3): req=#{req.inspect}")
GRPC.logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, INFINITE_FUTURE)
logger.info("Answer: #{resp.inspect}")
logger.info('----------------')
GRPC.logger.info("Answer: #{resp.inspect}")
GRPC.logger.info('----------------')
end
def do_sum(stub)
# to make client streaming requests, pass an enumerable of the inputs
logger.info('client_streamer')
logger.info('---------------')
GRPC.logger.info('client_streamer')
GRPC.logger.info('---------------')
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) }
logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
resp = stub.sum(reqs) # reqs.is_a?(Enumerable)
logger.info("Answer: #{resp.inspect}")
logger.info('---------------')
GRPC.logger.info("Answer: #{resp.inspect}")
GRPC.logger.info('---------------')
end
def do_fib(stub)
logger.info('server_streamer')
logger.info('----------------')
GRPC.logger.info('server_streamer')
GRPC.logger.info('----------------')
req = Math::FibArgs.new(limit: 11)
logger.info("fib(11): req=#{req.inspect}")
GRPC.logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, INFINITE_FUTURE)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
GRPC.logger.info("Answer: #{r.inspect}")
end
logger.info('----------------')
GRPC.logger.info('----------------')
end
def do_div_many(stub)
logger.info('bidi_streamer')
logger.info('-------------')
GRPC.logger.info('bidi_streamer')
GRPC.logger.info('-------------')
reqs = []
reqs << Math::DivArgs.new(dividend: 7, divisor: 3)
reqs << Math::DivArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, 10)
resp.each do |r|
logger.info("Answer: #{r.inspect}")
GRPC.logger.info("Answer: #{r.inspect}")
end
logger.info('----------------')
GRPC.logger.info('----------------')
end
def load_test_certs
@ -132,10 +132,10 @@ def main
p stub_opts
p options['host']
stub = Math::Math::Stub.new(options['host'], **stub_opts)
logger.info("... connecting securely on #{options['host']}")
GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = Math::Math::Stub.new(options['host'])
logger.info("... connecting insecurely on #{options['host']}")
GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
do_div(stub)

@ -128,13 +128,13 @@ class Calculator < Math::Math::Service
t = Thread.new do
begin
requests.each do |req|
logger.info("read #{req.inspect}")
GRPC.logger.info("read #{req.inspect}")
resp = Math::DivReply.new(quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor)
q.push(resp)
Thread.pass # let the internal Bidi threads run
end
logger.info('finished reads')
GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@ -176,10 +176,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
logger.info("... running securely on #{options['host']}")
GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
logger.info("... running insecurely on #{options['host']}")
GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(Calculator)

@ -94,15 +94,15 @@ def main
p stub_opts
p options['host']
stub = NoProtoStub.new(options['host'], **stub_opts)
logger.info("... connecting securely on #{options['host']}")
GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = NoProtoStub.new(options['host'])
logger.info("... connecting insecurely on #{options['host']}")
GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
logger.info('sending a NoProto rpc')
GRPC.logger.info('sending a NoProto rpc')
resp = stub.an_rpc(NoProtoMsg.new)
logger.info("got a response: #{resp}")
GRPC.logger.info("got a response: #{resp}")
end
main

@ -63,7 +63,7 @@ class NoProto < NoProtoService
end
def an_rpc(req, _call)
logger.info('echo service received a request')
GRPC.logger.info('echo service received a request')
req
end
end
@ -98,10 +98,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
logger.info("... running securely on #{options['host']}")
GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
logger.info("... running insecurely on #{options['host']}")
GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(NoProto)

@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
logger.debug("sending #{req}, marshalled? #{marshalled}")
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
@ -230,14 +230,14 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
logger.debug("received req: #{batch_result}")
GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
logger.debug("received req.to_s: #{batch_result.message}")
GRPC.logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
logger.debug("received_req (unmarshalled): #{res.inspect}")
GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
logger.debug('found nil; the final response has been sent')
GRPC.logger.debug('found nil; the final response has been sent')
nil
end

@ -115,10 +115,10 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
logger.debug("each_queued_msg: msg##{count}")
GRPC.logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
logger.debug("each_queued_msg: req = #{req}")
GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
@ -134,22 +134,22 @@ module GRPC
begin
count = 0
requests.each do |req|
logger.debug("bidi-write_loop: #{count}")
GRPC.logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
logger.warn('bidi-write_loop: failed')
logger.warn(e)
GRPC.logger.warn('bidi-write_loop: failed')
GRPC.logger.warn(e)
raise e
end
end
@ -164,7 +164,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
logger.debug("bidi-read_loop: #{count}")
GRPC.logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@ -172,19 +172,19 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
logger.debug('bidi-read-loop: done reading!')
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
logger.debug("received req: #{batch_result.message}")
GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
logger.warn('bidi: read_loop failed')
logger.warn(e)
GRPC.logger.warn('bidi: read_loop failed')
GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
end

@ -84,22 +84,22 @@ module GRPC
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
logger.warn("failed call: #{active_call}\n#{e}")
GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
logger.warn("late call: #{active_call}")
GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
logger.warn(e)
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
@ -139,8 +139,8 @@ module GRPC
details = 'Not sure why' if details.nil?
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
logger.warn("Could not send status #{code}:#{details}")
logger.warn(e)
GRPC.logger.warn("Could not send status #{code}:#{details}")
GRPC.logger.warn(e)
end
end
end

@ -94,7 +94,7 @@ module GRPC
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
logger.info('schedule another job')
GRPC.logger.info('schedule another job')
@jobs << [blk, args]
end
@ -114,14 +114,14 @@ module GRPC
# Stops the jobs in the pool
def stop
logger.info('stopping, will wait for all the workers to exit')
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
logger.info('stopped, all workers are shutdown')
GRPC.logger.info('stopped, all workers are shutdown')
end
protected
@ -129,14 +129,14 @@ module GRPC
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
logger.info("forcibly terminating #{@workers.size} worker(s)")
GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
logger.warn('error while terminating a worker')
logger.warn(e)
GRPC.logger.warn('error while terminating a worker')
GRPC.logger.warn(e)
end
end
end
@ -156,8 +156,8 @@ module GRPC
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
logger.warn('Error in worker thread')
logger.warn(e)
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
end
end
@ -365,7 +365,7 @@ module GRPC
# the server to stop.
def run
if rpc_descs.size.zero?
logger.warn('did not run as no services were present')
GRPC.logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@ -381,9 +381,9 @@ module GRPC
# Sends UNAVAILABLE if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
logger.info("waiting: #{jobs_count}, max: #{max}")
GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNAVAILABLE, '')
@ -394,7 +394,7 @@ module GRPC
def found?(an_rpc)
mth = an_rpc.method.to_sym
return an_rpc if rpc_descs.key?(mth)
logger.warn("NOT_FOUND: #{an_rpc}")
GRPC.logger.warn("NOT_FOUND: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::NOT_FOUND, '')
@ -434,7 +434,7 @@ module GRPC
return nil unless found?(an_rpc)
# Create the ActiveCall
logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
@ -474,7 +474,7 @@ module GRPC
else
handlers[route] = service.method(rpc_name)
end
logger.info("handling #{route} with #{handlers[route]}")
GRPC.logger.info("handling #{route} with #{handlers[route]}")
end
end
end

@ -175,23 +175,23 @@ module GRPC
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil, **kw|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
logger.debug("calling #{@host}:#{route}")
GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
&blk)
end

@ -29,7 +29,10 @@
require 'logging'
include Logging.globally # logger is accessible everywhere
# GRPC contains the General RPC module.
module GRPC
extend Logging.globally
end
Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info

@ -69,7 +69,7 @@ class EchoService
end
def an_rpc(req, call)
logger.info('echo service received a request')
GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
@ -109,7 +109,7 @@ class SlowService
end
def an_rpc(req, call)
logger.info("starting a slow #{@delay} rpc")
GRPC.logger.info("starting a slow #{@delay} rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response

Loading…
Cancel
Save