From fb1e164cd81bd15b7d2aad595510e0c083fb3d5b Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 11 Jul 2017 17:49:21 -0700 Subject: [PATCH 01/16] dont wait for gc to destroy calls on ruby server --- src/ruby/lib/grpc/generic/active_call.rb | 141 +++++++++++++++++----- src/ruby/lib/grpc/generic/bidi_call.rb | 62 +++++----- src/ruby/lib/grpc/generic/rpc_desc.rb | 4 +- src/ruby/spec/generic/active_call_spec.rb | 4 +- src/ruby/spec/generic/client_stub_spec.rb | 136 +++++++++++++++------ src/ruby/spec/generic/rpc_desc_spec.rb | 10 +- 6 files changed, 249 insertions(+), 108 deletions(-) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index cb407d236d1..96c773a995d 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -40,7 +40,7 @@ end module GRPC # The ActiveCall class provides simple methods for sending marshallable # data to a call - class ActiveCall + class ActiveCall # rubocop:disable Metrics/ClassLength include Core::TimeConsts include Core::CallOps extend Forwardable @@ -100,6 +100,11 @@ module GRPC fail(ArgumentError, 'Already sent md') if started && metadata_to_send @metadata_to_send = metadata_to_send || {} unless started @send_initial_md_mutex = Mutex.new + + @output_stream_done = false + @input_stream_done = false + @call_finished = false + @call_finished_mu = Mutex.new end # Sends the initial metadata that has yet to be sent. @@ -142,11 +147,9 @@ module GRPC Operation.new(self) end - # finished waits until a client call is completed. - # - # It blocks until the remote endpoint acknowledges by sending a status. - def finished + def receive_and_check_status batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) + set_input_stream_done attach_status_results_and_complete_call(batch_result) end @@ -155,8 +158,6 @@ module GRPC @call.trailing_metadata = recv_status_batch_result.status.metadata end @call.status = recv_status_batch_result.status - @call.close - op_is_done # The RECV_STATUS in run_batch always succeeds # Check the status for a bad status or failed run batch @@ -193,9 +194,19 @@ module GRPC } ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished @call.run_batch(ops) + set_output_stream_done + nil end + # Intended for use on server-side calls when a single request from + # the client is expected (i.e., unary and server-streaming RPC types). + def read_unary_request + req = remote_read + set_input_stream_done + req + end + def server_unary_response(req, trailing_metadata: {}, code: Core::StatusCodes::OK, details: 'OK') ops = {} @@ -211,6 +222,7 @@ module GRPC ops[RECV_CLOSE_ON_SERVER] = nil @call.run_batch(ops) + set_output_stream_done end # remote_read reads a response from the remote endpoint. @@ -241,6 +253,8 @@ module GRPC # each_remote_read passes each response to the given block or returns an # enumerator the responses if no block is given. + # Used to generate the request enumerable for + # server-side client-streaming RPC's. # # == Enumerator == # @@ -258,10 +272,14 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read return enum_for(:each_remote_read) unless block_given? - loop do - resp = remote_read - break if resp.nil? # the last response was received - yield resp + begin + loop do + resp = remote_read + break if resp.nil? # the last response was received + yield resp + end + ensure + set_input_stream_done end end @@ -287,13 +305,17 @@ module GRPC # @return [Enumerator] if no block was given def each_remote_read_then_finish return enum_for(:each_remote_read_then_finish) unless block_given? - loop do - resp = remote_read - if resp.nil? # the last response was received, but not finished yet - finished - break + begin + loop do + resp = remote_read + if resp.nil? # the last response was received + receive_and_check_status + break + end + yield resp end - yield resp + ensure + set_input_stream_done end end @@ -319,7 +341,15 @@ module GRPC end @metadata_sent = true end - batch_result = @call.run_batch(ops) + + begin + batch_result = @call.run_batch(ops) + # no need to check for cancellation after a CallError because this + # batch contains a RECV_STATUS op + ensure + set_input_stream_done + set_output_stream_done + end @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) @@ -339,10 +369,19 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) - # Metadata might have already been sent if this is an operation view - merge_metadata_and_send_if_not_already_sent(metadata) + begin + merge_metadata_and_send_if_not_already_sent(metadata) + requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } + rescue GRPC::Core::CallError => e + receive_and_check_status # check for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end - requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } batch_result = @call.run_batch( SEND_CLOSE_FROM_CLIENT => nil, RECV_INITIAL_METADATA => nil, @@ -350,12 +389,11 @@ module GRPC RECV_STATUS_ON_CLIENT => nil ) + set_input_stream_done + @call.metadata = batch_result.metadata attach_status_results_and_complete_call(batch_result) get_message_from_batch_result(batch_result) - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # server_streamer sends one request to the GRPC server, which yields a @@ -384,13 +422,22 @@ module GRPC end @metadata_sent = true end - @call.run_batch(ops) + + begin + @call.run_batch(ops) + rescue GRPC::Core::CallError => e + receive_and_check_status # checks for Cancelled + raise e + rescue => e + set_input_stream_done + raise e + ensure + set_output_stream_done + end + replies = enum_for(:each_remote_read_then_finish) return replies unless block_given? replies.each { |r| yield r } - rescue GRPC::Core::CallError => e - finished # checks for Cancelled - raise e end # bidi_streamer sends a stream of requests to the GRPC server, and yields @@ -428,7 +475,10 @@ module GRPC @unmarshal, metadata_received: @metadata_received) - bd.run_on_client(requests, @op_notifier, &blk) + bd.run_on_client(requests, + proc { set_input_stream_done }, + proc { set_output_stream_done }, + &blk) end # run_server_bidi orchestrates a BiDi stream processing on a server. @@ -449,7 +499,7 @@ module GRPC metadata_received: @metadata_received, req_view: MultiReqView.new(self)) - bd.run_on_server(gen_each_reply) + bd.run_on_server(gen_each_reply, proc { set_input_stream_done }) end # Waits till an operation completes @@ -459,7 +509,8 @@ module GRPC @op_notifier.wait end - # Signals that an operation is done + # Signals that an operation is done. + # Only relevant on the client-side (this is a no-op on the server-side) def op_is_done return if @op_notifier.nil? @op_notifier.notify(self) @@ -486,6 +537,34 @@ module GRPC private + # To be called once the "input stream" has been completelly + # read through (i.e, done reading from client or received status) + # note this is idempotent + def set_input_stream_done + @call_finished_mu.synchronize do + @input_stream_done = true + maybe_finish_and_close_call_locked + end + end + + # To be called once the "output stream" has been completelly + # sent through (i.e, done sending from client or sent status) + # note this is idempotent + def set_output_stream_done + @call_finished_mu.synchronize do + @output_stream_done = true + maybe_finish_and_close_call_locked + end + end + + def maybe_finish_and_close_call_locked + return unless @output_stream_done && @input_stream_done + return if @call_finished + @call_finished = true + op_is_done + @call.close + end + # Starts the call if not already started # @param metadata [Hash] metadata to be sent to the server. If a value is # a list, multiple metadata for its key are sent diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index e54cf78969b..9e125cd986b 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -62,12 +62,19 @@ module GRPC # block that can be invoked with each response. # # @param requests the Enumerable of requests to send - # @param op_notifier a Notifier used to signal completion + # @param set_input_stream_done [Proc] called back when we're done + # reading the input stream + # @param set_input_stream_done [Proc] called back when we're done + # sending data on the output stream # @return an Enumerator of requests to yield - def run_on_client(requests, op_notifier, &blk) - @op_notifier = op_notifier - @enq_th = Thread.new { write_loop(requests) } - read_loop(&blk) + def run_on_client(requests, + set_input_stream_done, + set_output_stream_done, + &blk) + @enq_th = Thread.new do + write_loop(requests, set_output_stream_done: set_output_stream_done) + end + read_loop(set_input_stream_done, &blk) end # Begins orchestration of the Bidi stream for a server generating replies. @@ -81,12 +88,17 @@ module GRPC # produced by gen_each_reply could ignore the received_msgs # # @param gen_each_reply [Proc] generates the BiDi stream replies. - def run_on_server(gen_each_reply) + # @param set_input_steam_done [Proc] call back to call when + # the reads have been completely read through. + def run_on_server(gen_each_reply, set_input_stream_done) # Pass in the optional call object parameter if possible if gen_each_reply.arity == 1 - replys = gen_each_reply.call(read_loop(is_client: false)) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false)) elsif gen_each_reply.arity == 2 - replys = gen_each_reply.call(read_loop(is_client: false), @req_view) + replys = gen_each_reply.call( + read_loop(set_input_stream_done, is_client: false), + @req_view) else fail 'Illegal arity of reply generator' end @@ -99,22 +111,6 @@ module GRPC END_OF_READS = :end_of_reads END_OF_WRITES = :end_of_writes - # signals that bidi operation is complete - def notify_done - return unless @op_notifier - GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") - @op_notifier.notify(self) - end - - # signals that a bidi operation is complete (read + write) - def finished - @done_mutex.synchronize do - return unless @reads_complete && @writes_complete && !@complete - @call.close - @complete = true - end - end - # performs a read using @call.run_batch, ensures metadata is set up def read_using_run_batch ops = { RECV_MESSAGE => nil } @@ -127,7 +123,8 @@ module GRPC batch_result end - def write_loop(requests, is_client: true) + # set_output_stream_done is relevant on client-side + def write_loop(requests, is_client: true, set_output_stream_done: nil) GRPC.logger.debug('bidi-write-loop: starting') count = 0 requests.each do |req| @@ -151,23 +148,20 @@ module GRPC GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') - notify_done - @writes_complete = true - finished end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) - notify_done - @writes_complete = true - finished raise e + ensure + set_output_stream_done.call if is_client end # Provides an enumerator that yields results of remote reads - def read_loop(is_client: true) + def read_loop(set_input_stream_done, is_client: true) return enum_for(:read_loop, + set_input_stream_done, is_client: is_client) unless block_given? GRPC.logger.debug('bidi-read-loop: starting') begin @@ -201,10 +195,10 @@ module GRPC GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) raise e + ensure + set_input_stream_done.call end GRPC.logger.debug('bidi-read-loop: finished') - @reads_complete = true - finished # Make sure that the write loop is done done before finishing the call. # Note that blocking is ok at this point because we've already received # a status diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb index ce0097573a6..89cf8ff6a0a 100644 --- a/src/ruby/lib/grpc/generic/rpc_desc.rb +++ b/src/ruby/lib/grpc/generic/rpc_desc.rb @@ -48,7 +48,7 @@ module GRPC end def handle_request_response(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request resp = mth.call(req, active_call.single_req_view) active_call.server_unary_response( resp, trailing_metadata: active_call.output_metadata) @@ -61,7 +61,7 @@ module GRPC end def handle_server_streamer(active_call, mth) - req = active_call.remote_read + req = active_call.read_unary_request replys = mth.call(req, active_call.single_req_view) replys.each { |r| active_call.remote_send(r) } send_status(active_call, OK, 'OK', active_call.output_metadata) diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 72e55ebcce0..ec0c2941741 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -473,7 +473,7 @@ describe GRPC::ActiveCall do server_call.remote_send('server_response') expect(client_call.remote_read).to eq('server_response') server_call.send_status(OK, 'status code is OK') - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if the server sends an early status response' do @@ -490,7 +490,7 @@ describe GRPC::ActiveCall do expect do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) end.to_not raise_error - expect { client_call.finished }.to_not raise_error + expect { client_call.receive_and_check_status }.to_not raise_error end it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 09b88c7cef4..3b8f72eda11 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -45,6 +45,7 @@ describe 'ClientStub' do @method = 'an_rpc_method' @pass = OK @fail = INTERNAL + @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do @@ -107,7 +108,7 @@ describe 'ClientStub' do end end - describe '#request_response' do + describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end @@ -187,13 +188,24 @@ describe 'ClientStub' do # Kill the server thread so tests can complete th.kill end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_response(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end end describe 'without a call operation' do def get_response(stub, credentials: nil) puts credentials.inspect stub.request_response(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }, + metadata: @metadata, credentials: credentials) end @@ -201,16 +213,19 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false, credentials: nil) - op = stub.request_response(@method, @sent_msg, noop, noop, - return_op: true, - metadata: { k1: 'v1', k2: 'v2' }, - deadline: from_relative_time(2), - credentials: credentials) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.request_response(@method, @sent_msg, noop, noop, + return_op: true, + metadata: @metadata, + deadline: from_relative_time(2), + credentials: credentials) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end @@ -228,13 +243,12 @@ describe 'ClientStub' do end end - describe '#client_streamer' do + describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - @metadata = { k1: 'v1', k2: 'v2' } @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end @@ -278,13 +292,16 @@ describe 'ClientStub' do end describe 'via a call operation' do + after(:each) do + # make sure op.wait doesn't hang, even if there's a bad status + @op.wait + end def get_response(stub, run_start_call_first: false) - op = stub.client_streamer(@method, @sent_msgs, noop, noop, - return_op: true, metadata: @metadata) - expect(op).to be_a(GRPC::ActiveCall::Operation) - op.start_call if run_start_call_first - result = op.execute - op.wait # make sure wait doesn't hang + @op = stub.client_streamer(@method, @sent_msgs, noop, noop, + return_op: true, metadata: @metadata) + expect(@op).to be_a(GRPC::ActiveCall::Operation) + @op.start_call if run_start_call_first + result = @op.execute result end @@ -298,7 +315,7 @@ describe 'ClientStub' do end end - describe '#server_streamer' do + describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -334,12 +351,36 @@ describe 'ClientStub' do expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should raise ArgumentError if metadata contains invalid values' do + @metadata.merge!(k3: 3) + server_port = create_test_server + host = "localhost:#{server_port}" + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + expect do + get_responses(stub) + end.to raise_error(ArgumentError, + /Header values must be of type string or array/) + end + + it 'the call terminates when there is an unmarshalling error' do + server_port = create_test_server + host = "localhost:#{server_port}" + th = run_server_streamer(@sent_msg, @replys, @pass) + stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) + + unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } + expect do + get_responses(stub, unmarshal: unmarshal).collect { |r| r } + end.to raise_error(ArgumentError, 'test unmarshalling error') + th.join + end end - describe 'without a call operation' do - def get_responses(stub) - e = stub.server_streamer(@method, @sent_msg, noop, noop, - metadata: { k1: 'v1', k2: 'v2' }) + describe 'without a call operation', test2: true do + def get_responses(stub, unmarshal: noop) + e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -351,10 +392,10 @@ describe 'ClientStub' do after(:each) do @op.wait # make sure wait doesn't hang end - def get_responses(stub, run_start_call_first: false) - @op = stub.server_streamer(@method, @sent_msg, noop, noop, + def get_responses(stub, run_start_call_first: false, unmarshal: noop) + @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, - metadata: { k1: 'v1', k2: 'v2' }) + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -377,7 +418,7 @@ describe 'ClientStub' do end end - describe '#bidi_streamer' do + describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } @@ -386,7 +427,7 @@ describe 'ClientStub' do end shared_examples 'bidi streaming' do - it 'supports sending all the requests first', bidi: true do + it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) @@ -395,7 +436,7 @@ describe 'ClientStub' do th.join end - it 'supports client-initiated ping pong', bidi: true do + it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) @@ -403,18 +444,39 @@ describe 'ClientStub' do th.join end - it 'supports a server-initiated ping pong', bidi: true do + it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end + + it 'should raise an error if the status is not ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + th.join + end + + # TODO: add test for metadata-related ArgumentError in a bidi call once + # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed + + it 'should send metadata to the server ok' do + th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, + **@metadata) + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + e = get_responses(stub) + expect(e.collect { |r| r }).to eq(@sent_msgs) + th.join + end end describe 'without a call operation' do def get_responses(stub) - e = stub.bidi_streamer(@method, @sent_msgs, noop, noop) + e = stub.bidi_streamer(@method, @sent_msgs, noop, noop, + metadata: @metadata) expect(e).to be_a(Enumerator) e end @@ -428,7 +490,8 @@ describe 'ClientStub' do end def get_responses(stub, run_start_call_first: false) @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop, - return_op: true) + return_op: true, + metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute @@ -472,9 +535,14 @@ describe 'ClientStub' do end end - def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts) + def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, + **kw) + wanted_metadata = kw.clone wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) + wanted_metadata.each do |k, v| + expect(c.metadata[k.to_s]).to eq(v) + end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb index 100e9e84876..be578c40d3f 100644 --- a/src/ruby/spec/generic/rpc_desc_spec.rb +++ b/src/ruby/spec/generic/rpc_desc_spec.rb @@ -38,14 +38,14 @@ describe GRPC::RpcDesc do shared_examples 'it handles errors' do it 'sends the specified status if BadStatus is raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false, metadata: {}) this_desc.run_server_method(@call, method(:bad_status)) end it 'sends status UNKNOWN if other StandardErrors are raised' do - expect(@call).to receive(:remote_read).once.and_return(Object.new) + expect(@call).to receive(:read_unary_request).once.and_return(Object.new) expect(@call).to receive(:send_status).once.with(UNKNOWN, arg_error_msg, false, metadata: {}) @@ -53,7 +53,7 @@ describe GRPC::RpcDesc do end it 'absorbs CallError with no further action' do - expect(@call).to receive(:remote_read).once.and_raise(CallError) + expect(@call).to receive(:read_unary_request).once.and_raise(CallError) blk = proc do this_desc.run_server_method(@call, method(:fake_reqresp)) end @@ -75,7 +75,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:output_metadata).once.and_return(fake_md) expect(@call).to receive(:server_unary_response).once .with(@ok_response, trailing_metadata: fake_md) @@ -133,7 +133,7 @@ describe GRPC::RpcDesc do it 'sends a response and closes the stream if there no errors' do req = Object.new - expect(@call).to receive(:remote_read).once.and_return(req) + expect(@call).to receive(:read_unary_request).once.and_return(req) expect(@call).to receive(:remote_send).twice.with(@ok_response) expect(@call).to receive(:output_metadata).and_return(fake_md) expect(@call).to receive(:send_status).once.with(OK, 'OK', true, From 8a08400af22a9ebf7e699b130a65833ce272ed87 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 14 Jul 2017 14:49:09 -0700 Subject: [PATCH 02/16] Fix spam --- tools/mkowners/mkowners.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/mkowners/mkowners.py b/tools/mkowners/mkowners.py index 2ccedfcfb8c..e0ad998bdc7 100755 --- a/tools/mkowners/mkowners.py +++ b/tools/mkowners/mkowners.py @@ -164,7 +164,6 @@ def expand_directives(root, directives): if intersect: for f in sorted(files_add): # sorted to ensure merge stability if f not in intersect: - print("X", root, glob_add, glob_have) out_globs[os.path.relpath(f, start=root)] = who_add for who in who_have: if who not in out_globs[glob_add]: @@ -185,7 +184,6 @@ def add_parent_to_globs(parent, globs, globs_dir): if intersect: for f in sorted(files_child): # sorted to ensure merge stability if f not in intersect: - print("Y", full_dir(owners.dir, oglob), full_dir(globs_dir, gglob)) who = gglob_who_orig.copy() globs[os.path.relpath(f, start=globs_dir)] = who for who in oglob_who: From 87116f449985cd49728326ecbdb7293cc522b8ff Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Wed, 12 Jul 2017 14:22:46 -0700 Subject: [PATCH 03/16] Fix failures in libuv portability tests --- src/core/lib/iomgr/iomgr_uv.c | 4 + src/core/lib/iomgr/resolve_address_uv.c | 17 ++-- src/core/lib/iomgr/tcp_server_uv.c | 109 ++++++++++++++++-------- src/core/lib/iomgr/tcp_uv.c | 4 + 4 files changed, 91 insertions(+), 43 deletions(-) diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 49d1a03c322..7a99a6efdd3 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -21,12 +21,16 @@ #ifdef GRPC_UV #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" void grpc_iomgr_platform_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); + grpc_executor_set_threading(&exec_ctx, false); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index a98b8e62db3..5180357cb16 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -114,11 +114,14 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; int retry_status; + char *port = r->port; gpr_free(req); retry_status = retry_named_port_failure(status, r, getaddrinfo_callback); if (retry_status == 0) { - // The request is being retried. Nothing should be done here + /* The request is being retried. It is using its own port string, so we free + * the original one */ + gpr_free(port); return; } /* Either no retry was attempted, or the retry failed. Either way, the @@ -218,16 +221,18 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { - uv_getaddrinfo_t *req; - request *r; - struct addrinfo *hints; - char *host; - char *port; + uv_getaddrinfo_t *req = NULL; + request *r = NULL; + struct addrinfo *hints = NULL; + char *host = NULL; + char *port = NULL; grpc_error *err; int s; err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); + gpr_free(host); + gpr_free(port); return; } r = gpr_malloc(sizeof(request)); diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34d..a63a36bf587 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -20,6 +20,7 @@ #ifdef GRPC_UV +#include #include #include @@ -43,6 +44,8 @@ struct grpc_tcp_listener { struct grpc_tcp_listener *next; bool closed; + + bool has_pending_connection; }; struct grpc_tcp_server { @@ -142,10 +145,12 @@ static void handle_close_callback(uv_handle_t *handle) { } static void close_listener(grpc_tcp_listener *sp) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (!sp->closed) { sp->closed = true; uv_close((uv_handle_t *)sp->handle, handle_close_callback); } + grpc_exec_ctx_finish(&exec_ctx); } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -183,18 +188,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -static void accepted_connection_close_cb(uv_handle_t *handle) { - gpr_free(handle); -} - -static void on_connect(uv_stream_t *server, int status) { - grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; +static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) { + grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); uv_tcp_t *client; grpc_endpoint *ep = NULL; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_resolved_address peer_name; char *peer_name_string; int err; + uv_tcp_t *server = sp->handle; + + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err)); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (peer_name_string) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s", + sp->server, peer_name_string); + } else { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server); + } + } + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); + acceptor->from_server = sp->server; + acceptor->port_index = sp->port_index; + acceptor->fd_index = 0; + sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + acceptor); + gpr_free(peer_name_string); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (status < 0) { switch (status) { @@ -207,35 +243,17 @@ static void on_connect(uv_stream_t *server, int status) { } } - client = gpr_malloc(sizeof(uv_tcp_t)); - uv_tcp_init(uv_default_loop(), client); - // UV documentation says this is guaranteed to succeed - uv_accept((uv_stream_t *)server, (uv_stream_t *)client); - // If the server has not been started, we discard incoming connections - if (sp->server->on_accept_cb == NULL) { - uv_close((uv_handle_t *)client, accepted_connection_close_cb); + GPR_ASSERT(!sp->has_pending_connection); + + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + + // Create acceptor. + if (sp->server->on_accept_cb) { + finish_accept(&exec_ctx, sp); } else { - peer_name_string = NULL; - memset(&peer_name, 0, sizeof(grpc_resolved_address)); - peer_name.len = sizeof(struct sockaddr_storage); - err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, - (int *)&peer_name.len); - if (err == 0) { - peer_name_string = grpc_sockaddr_to_uri(&peer_name); - } else { - gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); - } - ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); - // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); - acceptor->from_server = sp->server; - acceptor->port_index = sp->port_index; - acceptor->fd_index = 0; - sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, - acceptor); - grpc_exec_ctx_finish(&exec_ctx); - gpr_free(peer_name_string); + sp->has_pending_connection = true; } + grpc_exec_ctx_finish(&exec_ctx); } static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, @@ -282,7 +300,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, GPR_ASSERT(port >= 0); GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = gpr_zalloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -366,6 +384,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, gpr_free(allocated_addr); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + char *port_string; + grpc_sockaddr_to_string(&port_string, addr, 0); + const char *str = grpc_error_string(error); + if (port_string) { + gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str); + gpr_free(port_string); + } else { + gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str); + } + } + if (error != GRPC_ERROR_NONE) { grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to add port to server", &error, 1); @@ -385,13 +415,18 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_START %p", server); + } GPR_ASSERT(on_accept_cb); GPR_ASSERT(!server->on_accept_cb); server->on_accept_cb = on_accept_cb; server->on_accept_cb_arg = cb_arg; for (sp = server->head; sp; sp = sp->next) { - GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == - 0); + if (sp->has_pending_connection) { + finish_accept(exec_ctx, sp); + sp->has_pending_connection = false; + } } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index ff5fd3edc85..0302af2a062 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -307,6 +307,10 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(why); + gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); + } tcp->shutting_down = true; uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); From b8e07ad780ceb0bae7c431d0de1c6343bf275ceb Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 13:20:55 -0700 Subject: [PATCH 04/16] Add asserts that uv calls all run on the same thread --- BUILD | 1 + build.yaml | 1 + gRPC-Core.podspec | 2 + grpc.gemspec | 1 + package.xml | 1 + src/core/lib/iomgr/iomgr_uv.c | 4 ++ src/core/lib/iomgr/iomgr_uv.h | 37 +++++++++++++++++++ src/core/lib/iomgr/pollset_uv.c | 7 ++++ src/core/lib/iomgr/resolve_address_uv.c | 4 ++ src/core/lib/iomgr/tcp_client_uv.c | 3 ++ src/core/lib/iomgr/tcp_server_uv.c | 6 +++ src/core/lib/iomgr/tcp_uv.c | 3 ++ src/core/lib/iomgr/timer_uv.c | 4 ++ tools/doxygen/Doxyfile.core.internal | 1 + .../generated/sources_and_headers.json | 2 + tools/run_tests/run_tests.py | 2 +- vsprojects/vcxproj/grpc/grpc.vcxproj | 1 + vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 3 ++ .../grpc_test_util/grpc_test_util.vcxproj | 1 + .../grpc_test_util.vcxproj.filters | 3 ++ .../grpc_unsecure/grpc_unsecure.vcxproj | 1 + .../grpc_unsecure.vcxproj.filters | 3 ++ 22 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/core/lib/iomgr/iomgr_uv.h diff --git a/BUILD b/BUILD index 495bb668eb1..189a8276e52 100644 --- a/BUILD +++ b/BUILD @@ -597,6 +597,7 @@ grpc_cc_library( "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/iomgr_posix.h", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", diff --git a/build.yaml b/build.yaml index e55c4ca3012..f43a13e9bcd 100644 --- a/build.yaml +++ b/build.yaml @@ -213,6 +213,7 @@ filegroups: - src/core/lib/iomgr/iomgr.h - src/core/lib/iomgr/iomgr_internal.h - src/core/lib/iomgr/iomgr_posix.h + - src/core/lib/iomgr/iomgr_uv.h - src/core/lib/iomgr/is_epollexclusive_available.h - src/core/lib/iomgr/load_file.h - src/core/lib/iomgr/lockfree_event.h diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3f841d51d8a..7a2c5a40f4f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -276,6 +276,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_internal.h', 'src/core/lib/iomgr/iomgr_posix.h', + 'src/core/lib/iomgr/iomgr_uv.h', 'src/core/lib/iomgr/is_epollexclusive_available.h', 'src/core/lib/iomgr/load_file.h', 'src/core/lib/iomgr/lockfree_event.h', @@ -753,6 +754,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_internal.h', 'src/core/lib/iomgr/iomgr_posix.h', + 'src/core/lib/iomgr/iomgr_uv.h', 'src/core/lib/iomgr/is_epollexclusive_available.h', 'src/core/lib/iomgr/load_file.h', 'src/core/lib/iomgr/lockfree_event.h', diff --git a/grpc.gemspec b/grpc.gemspec index 663915b75ea..692977c7400 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -208,6 +208,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/iomgr.h ) s.files += %w( src/core/lib/iomgr/iomgr_internal.h ) s.files += %w( src/core/lib/iomgr/iomgr_posix.h ) + s.files += %w( src/core/lib/iomgr/iomgr_uv.h ) s.files += %w( src/core/lib/iomgr/is_epollexclusive_available.h ) s.files += %w( src/core/lib/iomgr/load_file.h ) s.files += %w( src/core/lib/iomgr/lockfree_event.h ) diff --git a/package.xml b/package.xml index cfa45f06d76..0714e614151 100644 --- a/package.xml +++ b/package.xml @@ -222,6 +222,7 @@ + diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index 7a99a6efdd3..c484a39c540 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -22,14 +22,18 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/executor.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" +gpr_thd_id grpc_init_thread; + void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); grpc_executor_set_threading(&exec_ctx, false); + grpc_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h new file mode 100644 index 00000000000..35c073a0330 --- /dev/null +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H +#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H + +#include "src/core/lib/iomgr/iomgr_internal.h" + +#include + +/* The thread ID of the thread on which grpc was initialized. Used to verify + * that all calls into libuv are made on that same thread */ +extern gpr_thd_id grpc_init_thread; + +#ifdef GRPC_UV_THREAD_CHECK +#define GRPC_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread) +#else +#define GRPC_ASSERT_SAME_THREAD() +#endif /* GRPC_UV_THREAD_CHECK */ + +#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 1a54065a914..47c8c79cc2a 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -28,6 +28,7 @@ #include #include +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_uv.h" @@ -69,6 +70,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { + GRPC_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -78,6 +80,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + GRPC_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -86,6 +89,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); + GRPC_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -98,6 +102,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { + GRPC_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -112,6 +117,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; + GRPC_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -140,6 +146,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + GRPC_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 5180357cb16..f910ec26b26 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -30,6 +30,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -174,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; + GRPC_ASSERT_SAME_THREAD(); + req.addrinfo = NULL; err = try_split_host_port(name, default_port, &host, &port); @@ -228,6 +231,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port = NULL; grpc_error *err; int s; + GRPC_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 2f1d237c07b..098ff7648de 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -26,6 +26,7 @@ #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; + GRPC_ASSERT_SAME_THREAD(); + if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index a63a36bf587..7b45e08c9b8 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -28,6 +28,7 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/tcp_server.h" @@ -107,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + GRPC_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -173,6 +175,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + GRPC_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -335,6 +338,8 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_ASSERT_SAME_THREAD(); + if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -415,6 +420,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; + GRPC_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "SERVER_START %p", server); } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 0302af2a062..115297634b1 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -30,6 +30,7 @@ #include #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/resource_quota.h" #include "src/core/lib/iomgr/tcp_uv.h" @@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; + GRPC_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; + GRPC_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index 4f204cfbf8c..e004ff5a3a4 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -24,6 +24,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/iomgr_uv.h" #include "src/core/lib/iomgr/timer.h" #include @@ -42,6 +43,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -54,6 +56,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; + GRPC_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -74,6 +77,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + GRPC_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 766c20f59b7..1263bdbb743 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1118,6 +1118,7 @@ src/core/lib/iomgr/iomgr_internal.h \ src/core/lib/iomgr/iomgr_posix.c \ src/core/lib/iomgr/iomgr_posix.h \ src/core/lib/iomgr/iomgr_uv.c \ +src/core/lib/iomgr/iomgr_uv.h \ src/core/lib/iomgr/iomgr_windows.c \ src/core/lib/iomgr/is_epollexclusive_available.c \ src/core/lib/iomgr/is_epollexclusive_available.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 477a258daa6..93411a8c614 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7654,6 +7654,7 @@ "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_internal.h", "src/core/lib/iomgr/iomgr_posix.h", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/is_epollexclusive_available.h", "src/core/lib/iomgr/load_file.h", "src/core/lib/iomgr/lockfree_event.h", @@ -7812,6 +7813,7 @@ "src/core/lib/iomgr/iomgr_posix.c", "src/core/lib/iomgr/iomgr_posix.h", "src/core/lib/iomgr/iomgr_uv.c", + "src/core/lib/iomgr/iomgr_uv.h", "src/core/lib/iomgr/iomgr_windows.c", "src/core/lib/iomgr/is_epollexclusive_available.c", "src/core/lib/iomgr/is_epollexclusive_available.h", diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 611868ce5a4..50eed6256c9 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -245,7 +245,7 @@ class CLanguage(object): self._docker_distro, self._make_options = self._compiler_options(self.args.use_docker, self.args.compiler) if args.iomgr_platform == "uv": - cflags = '-DGRPC_UV ' + cflags = '-DGRPC_UV -DGRPC_UV_THREAD_CHECK' try: cflags += subprocess.check_output(['pkg-config', '--cflags', 'libuv']).strip() + ' ' except (subprocess.CalledProcessError, OSError): diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index f9badbbd353..12cb39a3df6 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -333,6 +333,7 @@ + diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index f3e997e4e0e..c762200a008 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -944,6 +944,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj index 1899be91db3..98bf5aeb69a 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj @@ -228,6 +228,7 @@ + diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters index 9c81fc5bf51..145b51c206b 100644 --- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters @@ -677,6 +677,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 3a865f3e812..12c352a3f81 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -323,6 +323,7 @@ + diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 0310ebc012c..0b598363b8b 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -854,6 +854,9 @@ src\core\lib\iomgr + + src\core\lib\iomgr + src\core\lib\iomgr From 79e2d8e89f635e07312d07afcc1f26a49aa48f06 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 13:23:55 -0700 Subject: [PATCH 05/16] libuv tests: skip a test that spawns a thread --- build.yaml | 2 ++ tools/run_tests/generated/tests.json | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/build.yaml b/build.yaml index f43a13e9bcd..c8556c4b455 100644 --- a/build.yaml +++ b/build.yaml @@ -2370,6 +2370,8 @@ targets: - grpc - gpr_test_util - gpr + exclude_iomgrs: + - uv platforms: - linux secure: true diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index dd812414b9f..5be3cf796bd 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -1390,7 +1390,9 @@ ], "cpu_cost": 1.0, "exclude_configs": [], - "exclude_iomgrs": [], + "exclude_iomgrs": [ + "uv" + ], "flaky": false, "gtest": false, "language": "c", From 269d3b4faae39ac4a6be66baa5b5fc9a2efb86df Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 15:09:17 -0700 Subject: [PATCH 06/16] Clang format --- src/core/lib/iomgr/iomgr_uv.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h index 35c073a0330..bd406c34f72 100644 --- a/src/core/lib/iomgr/iomgr_uv.h +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -28,7 +28,7 @@ extern gpr_thd_id grpc_init_thread; #ifdef GRPC_UV_THREAD_CHECK -#define GRPC_ASSERT_SAME_THREAD() \ +#define GRPC_ASSERT_SAME_THREAD() \ GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread) #else #define GRPC_ASSERT_SAME_THREAD() From 30939f88548a7318cac2642d6149f651c09f931a Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 5 Jul 2017 15:43:18 -0700 Subject: [PATCH 07/16] Make CreateThreadPool settable --- src/cpp/client/secure_credentials.cc | 2 +- src/cpp/server/create_default_thread_pool.cc | 12 +++++++++++- src/cpp/server/secure_server_credentials.h | 2 +- src/cpp/server/thread_pool_interface.h | 6 +++++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 057a058a3fb..0eefd0645b4 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -207,7 +207,7 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper( std::unique_ptr plugin) - : thread_pool_(CreateDefaultThreadPool()), plugin_(std::move(plugin)) {} + : thread_pool_(CreateThreadPool()), plugin_(std::move(plugin)) {} std::shared_ptr MetadataCredentialsFromPlugin( std::unique_ptr plugin) { diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 17ad331c9c0..bf76303b454 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -24,12 +24,22 @@ namespace grpc { -ThreadPoolInterface* CreateDefaultThreadPool() { +static ThreadPoolInterface* CreateDefaultThreadPool() { int cores = gpr_cpu_num_cores(); if (!cores) cores = 4; return new DynamicThreadPool(cores); } +static CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPool; + +ThreadPoolInterface* CreateThreadPool() { + return g_ctp_impl(); +} + +void SetCreateThreadPool(CreateThreadPoolFunc func) { + g_ctp_impl = func; +} + } // namespace grpc #endif // !GRPC_CUSTOM_DEFAULT_THREAD_POOL diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index 212f0d1df3b..30ee1c7a366 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -39,7 +39,7 @@ class AuthMetadataProcessorAyncWrapper final { AuthMetadataProcessorAyncWrapper( const std::shared_ptr& processor) - : thread_pool_(CreateDefaultThreadPool()), processor_(processor) {} + : thread_pool_(CreateThreadPool()), processor_(processor) {} private: void InvokeProcessor(grpc_auth_context* context, const grpc_metadata* md, diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h index 4f4fc7eaaa2..5a182982968 100644 --- a/src/cpp/server/thread_pool_interface.h +++ b/src/cpp/server/thread_pool_interface.h @@ -32,7 +32,11 @@ class ThreadPoolInterface { virtual void Add(const std::function& callback) = 0; }; -ThreadPoolInterface* CreateDefaultThreadPool(); +// Allows different codebases to use their own thread pool impls +typedef ThreadPoolInterface* (*CreateThreadPoolFunc)(void); +void SetCreateThreadPool(CreateThreadPoolFunc func); + +ThreadPoolInterface* CreateThreadPool(); } // namespace grpc From a5d557b96d873873e5f17d96ffca6088e3743af0 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Wed, 5 Jul 2017 15:52:28 -0700 Subject: [PATCH 08/16] Rename function to avoud future clash --- src/cpp/client/secure_credentials.cc | 2 +- src/cpp/server/create_default_thread_pool.cc | 12 ++++-------- src/cpp/server/secure_server_credentials.h | 2 +- src/cpp/server/thread_pool_interface.h | 2 +- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc index 0eefd0645b4..057a058a3fb 100644 --- a/src/cpp/client/secure_credentials.cc +++ b/src/cpp/client/secure_credentials.cc @@ -207,7 +207,7 @@ void MetadataCredentialsPluginWrapper::InvokePlugin( MetadataCredentialsPluginWrapper::MetadataCredentialsPluginWrapper( std::unique_ptr plugin) - : thread_pool_(CreateThreadPool()), plugin_(std::move(plugin)) {} + : thread_pool_(CreateDefaultThreadPool()), plugin_(std::move(plugin)) {} std::shared_ptr MetadataCredentialsFromPlugin( std::unique_ptr plugin) { diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index bf76303b454..57faa17f6b9 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -24,21 +24,17 @@ namespace grpc { -static ThreadPoolInterface* CreateDefaultThreadPool() { +static ThreadPoolInterface* CreateDefaultThreadPoolImpl() { int cores = gpr_cpu_num_cores(); if (!cores) cores = 4; return new DynamicThreadPool(cores); } -static CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPool; +static CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; -ThreadPoolInterface* CreateThreadPool() { - return g_ctp_impl(); -} +ThreadPoolInterface* CreateDefaultThreadPool() { return g_ctp_impl(); } -void SetCreateThreadPool(CreateThreadPoolFunc func) { - g_ctp_impl = func; -} +void SetCreateThreadPool(CreateThreadPoolFunc func) { g_ctp_impl = func; } } // namespace grpc diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h index 30ee1c7a366..212f0d1df3b 100644 --- a/src/cpp/server/secure_server_credentials.h +++ b/src/cpp/server/secure_server_credentials.h @@ -39,7 +39,7 @@ class AuthMetadataProcessorAyncWrapper final { AuthMetadataProcessorAyncWrapper( const std::shared_ptr& processor) - : thread_pool_(CreateThreadPool()), processor_(processor) {} + : thread_pool_(CreateDefaultThreadPool()), processor_(processor) {} private: void InvokeProcessor(grpc_auth_context* context, const grpc_metadata* md, diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h index 5a182982968..028842a776f 100644 --- a/src/cpp/server/thread_pool_interface.h +++ b/src/cpp/server/thread_pool_interface.h @@ -36,7 +36,7 @@ class ThreadPoolInterface { typedef ThreadPoolInterface* (*CreateThreadPoolFunc)(void); void SetCreateThreadPool(CreateThreadPoolFunc func); -ThreadPoolInterface* CreateThreadPool(); +ThreadPoolInterface* CreateDefaultThreadPool(); } // namespace grpc From eb70b9e0df3b51db4e2d3466607e7713425dfac7 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 6 Jul 2017 09:38:26 -0700 Subject: [PATCH 09/16] Anon namespace over static --- src/cpp/server/create_default_thread_pool.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc index 57faa17f6b9..8ca3e32c2fb 100644 --- a/src/cpp/server/create_default_thread_pool.cc +++ b/src/cpp/server/create_default_thread_pool.cc @@ -23,14 +23,17 @@ #ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL namespace grpc { +namespace { -static ThreadPoolInterface* CreateDefaultThreadPoolImpl() { +ThreadPoolInterface* CreateDefaultThreadPoolImpl() { int cores = gpr_cpu_num_cores(); if (!cores) cores = 4; return new DynamicThreadPool(cores); } -static CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; +CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl; + +} // namespace ThreadPoolInterface* CreateDefaultThreadPool() { return g_ctp_impl(); } From 59a19a9d5ecc34b60fd6c035a6cb261261dd48fe Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 18 Jul 2017 17:26:08 -0700 Subject: [PATCH 10/16] make sure that client-side view of calls is robust --- src/ruby/lib/grpc/generic/active_call.rb | 17 +++- src/ruby/spec/generic/client_stub_spec.rb | 97 ++++++++++++++++++++--- 2 files changed, 104 insertions(+), 10 deletions(-) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 96c773a995d..4a748a4ac29 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -46,7 +46,7 @@ module GRPC extend Forwardable attr_reader :deadline, :metadata_sent, :metadata_to_send def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata + :peer, :peer_cert, :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -105,6 +105,8 @@ module GRPC @input_stream_done = false @call_finished = false @call_finished_mu = Mutex.new + @client_call_executed = false + @client_call_executed_mu = Mutex.new end # Sends the initial metadata that has yet to be sent. @@ -327,6 +329,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def request_response(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil, @@ -369,6 +372,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Object] the response received from the server def client_streamer(requests, metadata: {}) + raise_error_if_already_executed begin merge_metadata_and_send_if_not_already_sent(metadata) requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } @@ -411,6 +415,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator|nil] a response Enumerator def server_streamer(req, metadata: {}) + raise_error_if_already_executed ops = { SEND_MESSAGE => @marshal.call(req), SEND_CLOSE_FROM_CLIENT => nil @@ -468,6 +473,7 @@ module GRPC # a list, multiple metadata for its key are sent # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, metadata: {}, &blk) + raise_error_if_already_executed # Metadata might have already been sent if this is an operation view merge_metadata_and_send_if_not_already_sent(metadata) bd = BidiCall.new(@call, @@ -572,6 +578,15 @@ module GRPC merge_metadata_to_send(metadata) && send_initial_metadata end + def raise_error_if_already_executed + @client_call_executed_mu.synchronize do + if @client_call_executed + fail GRPC::Core::CallError, 'attempting to re-run a call' + end + @client_call_executed = true + end + end + def self.view_class(*visible_methods) Class.new do extend ::Forwardable diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 3b8f72eda11..7b5e6a95a43 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -36,6 +36,33 @@ include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps +# check that methods on a finished/closed call t crash +def check_op_view_of_finished_client_call_is_robust(op_view) + # use read_response_stream to try to iterate through + # possible response stream + fail('need something to attempt reads') unless block_given? + expect do + resp = op_view.execute + yield resp + end.to raise_error(GRPC::Core::CallError) + + expect { op_view.start_call }.to raise_error(RuntimeError) + + expect do + op_view.wait + op_view.cancel + + op_view.metadata + op_view.trailing_metadata + op_view.status + + op_view.cancelled? + op_view.deadline + op_view.write_flag + op_view.write_flag = 1 + end.to_not raise_error +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -231,15 +258,27 @@ describe 'ClientStub' do it_behaves_like 'request response' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, k1: 'v1', k2: 'v2') stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - expect(get_response(stub)).to eq(@resp) + expect( + get_response(stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end end end @@ -307,11 +346,23 @@ describe 'ClientStub' do it_behaves_like 'client streaming' - it 'sends metadata to the server ok when running start_call first' do + def run_op_view_metadata_test(run_start_call_first) th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) - expect(get_response(@stub, run_start_call_first: true)).to eq(@resp) + expect( + get_response(@stub, + run_start_call_first: run_start_call_first)).to eq(@resp) th.join end + + it 'sends metadata to the server ok when running start_call first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + end end end @@ -377,7 +428,7 @@ describe 'ClientStub' do end end - describe 'without a call operation', test2: true do + describe 'without a call operation' do def get_responses(stub, unmarshal: noop) e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, metadata: @metadata) @@ -405,16 +456,30 @@ describe 'ClientStub' do it_behaves_like 'server streaming' - it 'should send metadata to the server ok when start_call is run first' do + def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, k1: 'v1', k2: 'v2') stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) + e = get_responses(stub, run_start_call_first: run_start_call_first) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end + + it 'should send metadata to the server ok when start_call is run first' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end + + it 'does not crash when used after the call has been finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end end end @@ -501,14 +566,28 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' - it 'can run start_call before executing the call' do + def run_op_view_metadata_test(run_start_call_first) th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) - e = get_responses(stub, run_start_call_first: true) + e = get_responses(stub, run_start_call_first: run_start_call_first) expect(e.collect { |r| r }).to eq(@replys) th.join end + + it 'can run start_call before executing the call' do + run_op_view_metadata_test(true) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end + + it 'doesnt crash when op_view used after call has finished' do + run_op_view_metadata_test(false) + check_op_view_of_finished_client_call_is_robust(@op) do |responses| + responses.each { |r| p r } + end + end end end From 6617790c18e4048671d64d03212cfe84b960f376 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 18 Jul 2017 18:00:38 -0700 Subject: [PATCH 11/16] Add SO_REUSEPORT support to uv iomgr code --- src/core/lib/iomgr/sockaddr_utils.c | 5 +++++ src/core/lib/iomgr/sockaddr_utils.h | 2 ++ src/core/lib/iomgr/tcp_server_uv.c | 14 +++++++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 99dc2f1c78a..3f4145d104e 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme( return NULL; } +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + return addr->sa_family; +} + int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 7692b969f2b..a589a197054 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); /* Returns the URI scheme corresponding to \a addr */ const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); +int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2ab836cc34d..079c9135796 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -316,6 +316,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, unsigned port_index = 0; int status; grpc_error *error = GRPC_ERROR_NONE; + int family; if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -353,7 +354,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, } handle = gpr_malloc(sizeof(uv_tcp_t)); - status = uv_tcp_init(uv_default_loop(), handle); + + family = grpc_sockaddr_get_family(addr); + status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family); +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (family == AF_INET || family == AF_INET6) { + int fd; + uv_fileno((uv_handle_t *)handle, &fd); + int enable = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif /* GPR_LINUX && SO_REUSEPORT */ + if (status == 0) { error = add_socket_to_server(s, handle, addr, port_index, &sp); } else { From 7cc30c1155805537e18affd13cc4b7e44c3927be Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Tue, 18 Jul 2017 23:19:24 -0700 Subject: [PATCH 12/16] add missing fields on server call context and improve robustness of finished calls --- src/ruby/lib/grpc/generic/active_call.rb | 14 +- src/ruby/lib/grpc/generic/rpc_server.rb | 1 + src/ruby/spec/generic/client_stub_spec.rb | 166 +++++++++++++++------- src/ruby/spec/generic/rpc_server_spec.rb | 145 +++++++++++++++++++ 4 files changed, 276 insertions(+), 50 deletions(-) diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 4a748a4ac29..84c88871680 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -44,9 +44,9 @@ module GRPC include Core::TimeConsts include Core::CallOps extend Forwardable - attr_reader :deadline, :metadata_sent, :metadata_to_send + attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, - :peer, :peer_cert, :trailing_metadata, :status + :trailing_metadata, :status # client_invoke begins a client invocation. # @@ -105,8 +105,13 @@ module GRPC @input_stream_done = false @call_finished = false @call_finished_mu = Mutex.new + @client_call_executed = false @client_call_executed_mu = Mutex.new + + # set the peer now so that the accessor can still function + # after the server closes the call + @peer = call.peer end # Sends the initial metadata that has yet to be sent. @@ -541,6 +546,10 @@ module GRPC end end + def attach_peer_cert(peer_cert) + @peer_cert = peer_cert + end + private # To be called once the "input stream" has been completelly @@ -612,6 +621,7 @@ module GRPC # server client_streamer handlers. MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg, :each_remote_read, :metadata, :output_metadata, + :peer, :peer_cert, :send_initial_metadata, :metadata_to_send, :merge_metadata_to_send, diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index ef2cc0ce918..33b3cea1fc3 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,6 +418,7 @@ module GRPC metadata_received: true, started: false, metadata_to_send: connect_md) + c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 7b5e6a95a43..a8653e73cfd 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -37,7 +37,9 @@ include GRPC::Core::TimeConsts include GRPC::Core::CallOps # check that methods on a finished/closed call t crash -def check_op_view_of_finished_client_call_is_robust(op_view) +def check_op_view_of_finished_client_call(op_view, + expected_metadata, + expected_trailing_metadata) # use read_response_stream to try to iterate through # possible response stream fail('need something to attempt reads') unless block_given? @@ -48,21 +50,39 @@ def check_op_view_of_finished_client_call_is_robust(op_view) expect { op_view.start_call }.to raise_error(RuntimeError) + sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expect do op_view.wait op_view.cancel - - op_view.metadata - op_view.trailing_metadata - op_view.status - - op_view.cancelled? - op_view.deadline - op_view.write_flag op_view.write_flag = 1 end.to_not raise_error end +def sanity_check_values_of_accessors(op_view, + expected_metadata, + expected_trailing_metadata) + expected_status = Struct::Status.new + expected_status.code = 0 + expected_status.details = 'OK' + expected_status.metadata = expected_trailing_metadata + + expect(op_view.status).to eq(expected_status) + expect(op_view.metadata).to eq(expected_metadata) + expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) + + expect(op_view.cancelled?).to be(false) + expect(op_view.write_flag).to be(nil) + + # The deadline attribute of a call can be either + # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. + # TODO: fix so that the accessor always returns the same type. + expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || + op_view.deadline.is_a?(Time)).to be(true) +end + describe 'ClientStub' do let(:noop) { proc { |x| x } } @@ -154,7 +174,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect(get_response(stub)).to eq(@resp) th.join @@ -261,8 +281,14 @@ describe 'ClientStub' do def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_request_response(@sent_msg, @resp, @pass, - k1: 'v1', k2: 'v2') + + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_request_response( + @sent_msg, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect( get_response(stub, @@ -272,12 +298,14 @@ describe 'ClientStub' do it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end end end @@ -300,7 +328,8 @@ describe 'ClientStub' do end it 'should send metadata to the server ok' do - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + th = run_client_streamer(@sent_msgs, @resp, @pass, + expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end @@ -347,7 +376,13 @@ describe 'ClientStub' do it_behaves_like 'client streaming' def run_op_view_metadata_test(run_start_call_first) - th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_client_streamer( + @sent_msgs, @resp, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) expect( get_response(@stub, run_start_call_first: run_start_call_first)).to eq(@resp) @@ -356,12 +391,14 @@ describe 'ClientStub' do it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) { |r| p r } + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) { |r| p r } end end end @@ -396,7 +433,7 @@ describe 'ClientStub' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) @@ -459,24 +496,31 @@ describe 'ClientStub' do def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" - th = run_server_streamer(@sent_msg, @replys, @fail, - k1: 'v1', k2: 'v2') + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_server_streamer( + @sent_msg, @replys, @pass, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) - expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) + expect(e.collect { |r| r }).to eq(@replys) th.join end it 'should send metadata to the server ok when start_call is run first' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end @@ -530,7 +574,7 @@ describe 'ClientStub' do it 'should send metadata to the server ok' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, - **@metadata) + expected_metadata: @metadata) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) @@ -567,40 +611,52 @@ describe 'ClientStub' do it_behaves_like 'bidi streaming' def run_op_view_metadata_test(run_start_call_first) - th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, - @pass) + @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } + @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } + th = run_bidi_streamer_echo_ping_pong( + @sent_msgs, @pass, true, + expected_metadata: @metadata, + server_initial_md: @server_initial_md, + server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) - expect(e.collect { |r| r }).to eq(@replys) + expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'can run start_call before executing the call' do run_op_view_metadata_test(true) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end it 'doesnt crash when op_view used after call has finished' do run_op_view_metadata_test(false) - check_op_view_of_finished_client_call_is_robust(@op) do |responses| + check_op_view_of_finished_client_call( + @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| p r } end end end end - def run_server_streamer(expected_input, replys, status, **kw) - wanted_metadata = kw.clone + def run_server_streamer(expected_input, replys, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -615,10 +671,13 @@ describe 'ClientStub' do end def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, - **kw) - wanted_metadata = kw.clone + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end @@ -631,33 +690,44 @@ describe 'ClientStub' do expect(c.remote_read).to eq(i) end end - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_client_streamer(expected_inputs, resp, status, **kw) - wanted_metadata = kw.clone + def run_client_streamer(expected_inputs, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end - def run_request_response(expected_input, resp, status, **kw) - wanted_metadata = kw.clone + def run_request_response(expected_input, resp, status, + expected_metadata: {}, + server_initial_md: {}, + server_trailing_md: {}) + wanted_metadata = expected_metadata.clone wakey_thread do |notifier| - c = expect_server_to_be_invoked(notifier) + c = expect_server_to_be_invoked( + notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) - c.send_status(status, status == @pass ? 'OK' : 'NOK', true) + c.send_status(status, status == @pass ? 'OK' : 'NOK', true, + metadata: server_trailing_md) end end @@ -675,13 +745,13 @@ describe 'ClientStub' do @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end - def expect_server_to_be_invoked(notifier) + def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata - recvd_call.run_batch(SEND_INITIAL_METADATA => nil) + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 9633a828a2a..4258d59851d 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -111,6 +111,47 @@ end SlowStub = SlowService.rpc_stub_class +# a test service that hangs onto call objects +# and uses them after the server-side call has been +# finished +class CheckCallAfterFinishedService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + attr_reader :server_side_call + + def an_rpc(req, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + req + end + + def a_client_streaming_rpc(call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + # iterate through requests so call can complete + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + fail 'shouldnt reuse service' unless @call.nil? + @server_side_call = call + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class + describe GRPC::RpcServer do RpcServer = GRPC::RpcServer StatusCodes = GRPC::Core::StatusCodes @@ -505,5 +546,109 @@ describe GRPC::RpcServer do t.join end end + + context 'when call objects are used after calls have completed' do + before(:each) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) + @alt_host = "0.0.0.0:#{alt_port}" + + @service = CheckCallAfterFinishedService.new + @srv.handle(@service) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + end + + # check that the server-side call is still in a usable state even + # after it has finished + def check_single_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect(call.peer).to be_a(String) + expect(call.peer_cert).to be(nil) + end + + def check_multi_req_view_of_finished_call(call) + common_check_of_finished_server_call(call) + + expect do + call.each_remote_read.each { |r| p r } + end.to raise_error(GRPC::Core::CallError) + end + + def common_check_of_finished_server_call(call) + expect do + call.merge_metadata_to_send({}) + end.to raise_error(RuntimeError) + + expect do + call.send_initial_metadata + end.to_not raise_error + + expect(call.cancelled?).to be(false) + expect(call.metadata).to be_a(Hash) + expect(call.metadata['user-agent']).to be_a(String) + + expect(call.metadata_sent).to be(true) + expect(call.output_metadata).to eq({}) + expect(call.metadata_to_send).to eq({}) + expect(call.deadline.is_a?(Time)).to be(true) + end + + it 'should not crash when call used after an unary call is finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.an_rpc(req) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after client streaming finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + resp = stub.a_client_streaming_rpc(requests) + expect(resp).to be_a(EchoMsg) + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after server streaming finished' do + req = EchoMsg.new + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_server_streaming_rpc(req) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_single_req_view_of_finished_call(@service.server_side_call) + end + + it 'should not crash when call used after a bidi call is finished' do + requests = [EchoMsg.new, EchoMsg.new] + stub = CheckCallAfterFinishedServiceStub.new(@alt_host, + :this_channel_is_insecure) + responses = stub.a_bidi_rpc(requests) + responses.each do |r| + expect(r).to be_a(EchoMsg) + end + @srv.stop + @srv_thd.join + + check_multi_req_view_of_finished_call(@service.server_side_call) + end + end end end From 0c22cad01fe778ca2df217c7ef2a68dfac24254d Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Jul 2017 10:33:52 -0700 Subject: [PATCH 13/16] Improve a comment in inproc transport --- src/core/ext/transport/inproc/inproc_transport.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c index 4df64d81e2b..14498021ebd 100644 --- a/src/core/ext/transport/inproc/inproc_transport.c +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -190,8 +190,11 @@ typedef struct inproc_stream { static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, grpc_byte_stream *bs, size_t max, grpc_closure *on_complete) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - return (stream->le->sb.count != 0); + // Because inproc transport always provides the entire message atomically, + // the byte stream always has data available when this function is called. + // Thus, this function always returns true (unlike other transports) and + // there is never any need to schedule a closure + return true; } static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, From 5c92dcde1ee0c25a3c341c88af62fec600645252 Mon Sep 17 00:00:00 2001 From: ncteisen Date: Thu, 20 Jul 2017 14:55:49 -0700 Subject: [PATCH 14/16] Allow passing regex to bm_diff tools --- .../microbenchmarks/bm_diff/bm_diff.py | 5 +++-- .../microbenchmarks/bm_diff/bm_main.py | 14 ++++++------- .../microbenchmarks/bm_diff/bm_run.py | 21 +++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py index 809817a1a8c..1ac951f3d86 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py @@ -144,7 +144,7 @@ def _read_json(filename, badjson_files, nonexistant_files): def fmt_dict(d): return ''.join([" " + k + ": " + str(d[k]) + "\n" for k in d]) -def diff(bms, loops, track, old, new, counters): +def diff(bms, loops, regex, track, old, new, counters): benchmarks = collections.defaultdict(Benchmark) badjson_files = {} @@ -153,7 +153,8 @@ def diff(bms, loops, track, old, new, counters): for loop in range(0, loops): for line in subprocess.check_output( ['bm_diff_%s/opt/%s' % (old, bm), - '--benchmark_list_tests']).splitlines(): + '--benchmark_list_tests', + '--benchmark_filter=%s' % regex]).splitlines(): stripped_line = line.strip().replace("/", "_").replace( "<", "_").replace(">", "_").replace(", ", "_") js_new_opt = _read_json('%s.%s.opt.%s.%d.json' % diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_main.py b/tools/profiling/microbenchmarks/bm_diff/bm_main.py index 8b4e0cb69a4..5aa11ac391e 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_main.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_main.py @@ -63,10 +63,10 @@ def _args(): help='Name of baseline run to compare to. Ususally just called "old"') argp.add_argument( '-r', - '--repetitions', - type=int, - default=1, - help='Number of repetitions to pass to the benchmarks') + '--regex', + type=str, + default="", + help='Regex to filter benchmarks run') argp.add_argument( '-l', '--loops', @@ -125,10 +125,10 @@ def main(args): subprocess.check_call(['git', 'checkout', where_am_i]) subprocess.check_call(['git', 'submodule', 'update']) - bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) - bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) + bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.regex, args.counters) + bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.regex, args.counters) - diff, note = bm_diff.diff(args.benchmarks, args.loops, args.track, old, + diff, note = bm_diff.diff(args.benchmarks, args.loops, args.regex, args.track, old, 'new', args.counters) if diff: text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff) diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_run.py b/tools/profiling/microbenchmarks/bm_diff/bm_run.py index 72b3d3cf106..206f7c5845f 100755 --- a/tools/profiling/microbenchmarks/bm_diff/bm_run.py +++ b/tools/profiling/microbenchmarks/bm_diff/bm_run.py @@ -56,10 +56,10 @@ def _args(): ) argp.add_argument( '-r', - '--repetitions', - type=int, - default=1, - help='Number of repetitions to pass to the benchmarks') + '--regex', + type=str, + default="", + help='Regex to filter benchmarks run') argp.add_argument( '-l', '--loops', @@ -77,18 +77,17 @@ def _args(): return args -def _collect_bm_data(bm, cfg, name, reps, idx, loops): +def _collect_bm_data(bm, cfg, name, regex, idx, loops): jobs_list = [] for line in subprocess.check_output( ['bm_diff_%s/%s/%s' % (name, cfg, bm), - '--benchmark_list_tests']).splitlines(): + '--benchmark_list_tests', '--benchmark_filter=%s' % regex]).splitlines(): stripped_line = line.strip().replace("/", "_").replace( "<", "_").replace(">", "_").replace(", ", "_") cmd = [ 'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' % line, '--benchmark_out=%s.%s.%s.%s.%d.json' % (bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json', - '--benchmark_repetitions=%d' % (reps) ] jobs_list.append( jobset.JobSpec( @@ -100,13 +99,13 @@ def _collect_bm_data(bm, cfg, name, reps, idx, loops): return jobs_list -def run(name, benchmarks, jobs, loops, reps, counters): +def run(name, benchmarks, jobs, loops, regex, counters): jobs_list = [] for loop in range(0, loops): for bm in benchmarks: - jobs_list += _collect_bm_data(bm, 'opt', name, reps, loop, loops) + jobs_list += _collect_bm_data(bm, 'opt', name, regex, loop, loops) if counters: - jobs_list += _collect_bm_data(bm, 'counters', name, reps, loop, + jobs_list += _collect_bm_data(bm, 'counters', name, regex, loop, loops) random.shuffle(jobs_list, random.SystemRandom().random) jobset.run(jobs_list, maxjobs=jobs) @@ -114,4 +113,4 @@ def run(name, benchmarks, jobs, loops, reps, counters): if __name__ == '__main__': args = _args() - run(args.name, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters) + run(args.name, args.benchmarks, args.jobs, args.loops, args.regex, args.counters) From 7cc83e0701aafb2cc2412796b72bddbfbda4eac4 Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Wed, 19 Jul 2017 17:39:04 -0700 Subject: [PATCH 15/16] add a standalone client auth test --- src/ruby/spec/client_auth_spec.rb | 137 +++++++++++++++++++++++ src/ruby/spec/generic/rpc_server_spec.rb | 8 +- src/ruby/spec/testdata/client.key | 16 +++ src/ruby/spec/testdata/client.pem | 14 +++ 4 files changed, 171 insertions(+), 4 deletions(-) create mode 100644 src/ruby/spec/client_auth_spec.rb create mode 100644 src/ruby/spec/testdata/client.key create mode 100644 src/ruby/spec/testdata/client.pem diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb new file mode 100644 index 00000000000..79c9192aa5f --- /dev/null +++ b/src/ruby/spec/client_auth_spec.rb @@ -0,0 +1,137 @@ +# Copyright 2015 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'grpc' + +def create_channel_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + files = ['ca.pem', 'client.key', 'client.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2]) +end + +def client_cert + test_root = File.join(File.dirname(__FILE__), 'testdata') + cert = File.open(File.join(test_root, 'client.pem')).read + fail unless cert.is_a?(String) + cert +end + +def create_server_creds + test_root = File.join(File.dirname(__FILE__), 'testdata') + p "test root: #{test_root}" + files = ['ca.pem', 'server1.key', 'server1.pem'] + creds = files.map { |f| File.open(File.join(test_root, f)).read } + GRPC::Core::ServerCredentials.new( + creds[0], + [{ private_key: creds[1], cert_chain: creds[2] }], + true) # force client auth +end + +# A test message +class EchoMsg + def self.marshal(_o) + '' + end + + def self.unmarshal(_o) + EchoMsg.new + end +end + +# a test service that checks the cert of its peer +class SslTestService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + + def check_peer_cert(call) + error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}" + fail(error_msg) unless call.peer_cert == client_cert + end + + def an_rpc(req, call) + check_peer_cert(call) + req + end + + def a_client_streaming_rpc(call) + check_peer_cert(call) + call.each_remote_read.each { |r| p r } + EchoMsg.new + end + + def a_server_streaming_rpc(_, call) + check_peer_cert(call) + [EchoMsg.new, EchoMsg.new] + end + + def a_bidi_rpc(requests, call) + check_peer_cert(call) + requests.each { |r| p r } + [EchoMsg.new, EchoMsg.new] + end +end + +SslTestServiceStub = SslTestService.rpc_stub_class + +describe 'client-server auth' do + RpcServer = GRPC::RpcServer + + before(:all) do + server_opts = { + poll_period: 1 + } + @srv = RpcServer.new(**server_opts) + port = @srv.add_http2_port('0.0.0.0:0', create_server_creds) + @srv.handle(SslTestService) + @srv_thd = Thread.new { @srv.run } + @srv.wait_till_running + + client_opts = { + channel_args: { + GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' + } + } + @stub = SslTestServiceStub.new("localhost:#{port}", + create_channel_creds, + **client_opts) + end + + after(:all) do + expect(@srv.stopped?).to be(false) + @srv.stop + @srv_thd.join + end + + it 'client-server auth with unary RPCs' do + @stub.an_rpc(EchoMsg.new) + end + + it 'client-server auth with client streaming RPCs' do + @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new]) + end + + it 'client-server auth with server streaming RPCs' do + responses = @stub.a_server_streaming_rpc(EchoMsg.new) + responses.each { |r| p r } + end + + it 'client-server auth with bidi RPCs' do + responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new]) + responses.each { |r| p r } + end +end diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 4258d59851d..e0646f45997 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -123,13 +123,13 @@ class CheckCallAfterFinishedService attr_reader :server_side_call def an_rpc(req, call) - fail 'shouldnt reuse service' unless @call.nil? + fail 'shouldnt reuse service' unless @server_side_call.nil? @server_side_call = call req end def a_client_streaming_rpc(call) - fail 'shouldnt reuse service' unless @call.nil? + fail 'shouldnt reuse service' unless @server_side_call.nil? @server_side_call = call # iterate through requests so call can complete call.each_remote_read.each { |r| p r } @@ -137,13 +137,13 @@ class CheckCallAfterFinishedService end def a_server_streaming_rpc(_, call) - fail 'shouldnt reuse service' unless @call.nil? + fail 'shouldnt reuse service' unless @server_side_call.nil? @server_side_call = call [EchoMsg.new, EchoMsg.new] end def a_bidi_rpc(requests, call) - fail 'shouldnt reuse service' unless @call.nil? + fail 'shouldnt reuse service' unless @server_side_call.nil? @server_side_call = call requests.each { |r| p r } [EchoMsg.new, EchoMsg.new] diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key new file mode 100644 index 00000000000..f48d0735d99 --- /dev/null +++ b/src/ruby/spec/testdata/client.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM +s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM +JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT +NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS +k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH +0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS +W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI +w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5 +0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5 +/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/ +U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP +1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd +9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI +JiqOszq9GWESErAatg== +-----END PRIVATE KEY----- diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem new file mode 100644 index 00000000000..e332091019b --- /dev/null +++ b/src/ruby/spec/testdata/client.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV +BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 +ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw +MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB +nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX +b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz +W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw +R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/ +T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk +tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C +OO+svdkmqH0KZo320ZUqdl2ooQ== +-----END CERTIFICATE----- From 9d7def71fe04ec25b7cace53adb5775dbcccc353 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 20 Jul 2017 16:41:27 -0700 Subject: [PATCH 16/16] Address comments: change names and remove unnecessary lines --- src/core/lib/iomgr/iomgr_uv.c | 4 ++-- src/core/lib/iomgr/iomgr_uv.h | 8 ++++---- src/core/lib/iomgr/pollset_uv.c | 12 ++++++------ src/core/lib/iomgr/resolve_address_uv.c | 4 ++-- src/core/lib/iomgr/tcp_client_uv.c | 2 +- src/core/lib/iomgr/tcp_server_uv.c | 14 +++++++------- src/core/lib/iomgr/tcp_uv.c | 4 ++-- src/core/lib/iomgr/timer_uv.c | 6 +++--- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c index ffec6bcf766..df5d23af3bb 100644 --- a/src/core/lib/iomgr/iomgr_uv.c +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -26,14 +26,14 @@ #include "src/core/lib/iomgr/pollset_uv.h" #include "src/core/lib/iomgr/tcp_uv.h" -gpr_thd_id grpc_init_thread; +gpr_thd_id g_init_thread; void grpc_iomgr_platform_init(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_global_init(); grpc_register_tracer(&grpc_tcp_trace); grpc_executor_set_threading(&exec_ctx, false); - grpc_init_thread = gpr_thd_currentid(); + g_init_thread = gpr_thd_currentid(); grpc_exec_ctx_finish(&exec_ctx); } void grpc_iomgr_platform_flush(void) {} diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h index bd406c34f72..3b4daaa73ba 100644 --- a/src/core/lib/iomgr/iomgr_uv.h +++ b/src/core/lib/iomgr/iomgr_uv.h @@ -25,13 +25,13 @@ /* The thread ID of the thread on which grpc was initialized. Used to verify * that all calls into libuv are made on that same thread */ -extern gpr_thd_id grpc_init_thread; +extern gpr_thd_id g_init_thread; #ifdef GRPC_UV_THREAD_CHECK -#define GRPC_ASSERT_SAME_THREAD() \ - GPR_ASSERT(gpr_thd_currentid() == grpc_init_thread) +#define GRPC_UV_ASSERT_SAME_THREAD() \ + GPR_ASSERT(gpr_thd_currentid() == g_init_thread) #else -#define GRPC_ASSERT_SAME_THREAD() +#define GRPC_UV_ASSERT_SAME_THREAD() #endif /* GRPC_UV_THREAD_CHECK */ #endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index e59801834dd..a79fe89d3ef 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -71,7 +71,7 @@ void grpc_pollset_global_init(void) { } void grpc_pollset_global_shutdown(void) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_destroy(&grpc_polling_mu); uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); } @@ -81,7 +81,7 @@ static void timer_run_cb(uv_timer_t *timer) {} static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); *mu = &grpc_polling_mu; uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; @@ -90,7 +90,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); pollset->shutting_down = 1; if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking @@ -103,7 +103,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); // timer.data is a boolean indicating that the timer has finished closing pollset->timer.data = (void *)0; @@ -118,7 +118,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { uint64_t timeout; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_mu_unlock(&grpc_polling_mu); if (grpc_pollset_work_run_loop) { if (gpr_time_cmp(deadline, now) >= 0) { @@ -147,7 +147,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index f910ec26b26..2d438e8b486 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -175,7 +175,7 @@ static grpc_error *blocking_resolve_address_impl( grpc_error *err; int retry_status; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); req.addrinfo = NULL; @@ -231,7 +231,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, char *port = NULL; grpc_error *err; int s; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 098ff7648de..786c456b735 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -125,7 +125,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, (void)channel_args; (void)interested_parties; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (channel_args != NULL) { for (size_t i = 0; i < channel_args->num_args; i++) { diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 7b45e08c9b8..7edf22a0326 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -108,7 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, } grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); gpr_ref(&s->refs); return s; } @@ -147,12 +147,10 @@ static void handle_close_callback(uv_handle_t *handle) { } static void close_listener(grpc_tcp_listener *sp) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (!sp->closed) { sp->closed = true; uv_close((uv_handle_t *)sp->handle, handle_close_callback); } - grpc_exec_ctx_finish(&exec_ctx); } static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { @@ -175,7 +173,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; @@ -248,7 +246,9 @@ static void on_connect(uv_stream_t *server, int status) { GPR_ASSERT(!sp->has_pending_connection); - gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server); + } // Create acceptor. if (sp->server->on_accept_cb) { @@ -338,7 +338,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, int status; grpc_error *error = GRPC_ERROR_NONE; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (s->tail != NULL) { port_index = s->tail->port_index + 1; @@ -420,7 +420,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, grpc_tcp_listener *sp; (void)pollsets; (void)pollset_count; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "SERVER_START %p", server); } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 9b1a8db7235..a05c19b4ac9 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -184,7 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_tcp *tcp = (grpc_tcp *)ep; int status; grpc_error *error = GRPC_ERROR_NONE; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_slices = read_slices; @@ -238,7 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, unsigned int i; grpc_slice *slice; uv_write_t *write_req; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t j; diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index ff2570c60d1..70f49bcbe87 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -44,7 +44,7 @@ static void stop_uv_timer(uv_timer_t *handle) { void run_expired_timer(uv_timer_t *handle) { grpc_timer *timer = (grpc_timer *)handle->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); GPR_ASSERT(timer->pending); timer->pending = 0; GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); @@ -57,7 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec now) { uint64_t timeout; uv_timer_t *uv_timer; - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; @@ -78,7 +78,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, } void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { - GRPC_ASSERT_SAME_THREAD(); + GRPC_UV_ASSERT_SAME_THREAD(); if (timer->pending) { timer->pending = 0; GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);