[ruby] Make `GRPC::DeadlineExceeded` report properly (#33565)

In grpc v1.46.2 and later versions, #29155 caused the Ruby client to return `GRPC::Core::CallError`, a non-standard error, with a message: `grpc_call_start_batch failed with outstanding read or write present (code=8)`. However, the actual error should have been `GRPC::DeadlineExceeded`. This occurred because when `GRPC::DeadlineExceeded` is raised, the `@metadata_received` flag doesn't get flipped. When `receive_and_check_status` runs to determine
the error, the `RECV_INITIAL_METADATA` is erroneously sent again.

To avoid this, flip the flag in an `ensure` block whenever the `RECV_INITIAL_METADATA` op is set.

Closes #33283

Closes #33565

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/33565 from stanhu:sh-ruby-fix-issue-33283 850889558c
PiperOrigin-RevId: 614088694
pull/36035/head
Stan Hu 9 months ago committed by Copybara-Service
parent b78f64569a
commit 489d24cf7e
  1. 10
      src/ruby/lib/grpc/generic/active_call.rb
  2. 25
      src/ruby/spec/generic/rpc_server_spec.rb

@ -169,10 +169,13 @@ module GRPC
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
set_input_stream_done
attach_status_results_and_complete_call(batch_result)
ensure
# Ensure we don't attempt to request the initial metadata again
# in case an exception occurs.
@metadata_received = true
end
def attach_status_results_and_complete_call(recv_status_batch_result)
@ -258,12 +261,15 @@ module GRPC
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e
GRPC.logger.info("remote_read: #{e}")
nil
ensure
# Ensure we don't attempt to request the initial metadata again
# in case an exception occurs.
@metadata_received = true
end
def get_message_from_batch_result(recv_message_batch_result)

@ -61,6 +61,7 @@ FailingStub = FailingService.rpc_stub_class
class SlowService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
attr_reader :received_md, :delay
def initialize(_default_var = 'ignored')
@ -74,6 +75,13 @@ class SlowService
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response
end
def a_server_streaming_rpc(_, call)
GRPC.logger.info("starting a slow #{@delay} server streaming rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
[EchoMsg.new, EchoMsg.new]
end
end
SlowStub = SlowService.rpc_stub_class
@ -410,6 +418,23 @@ describe GRPC::RpcServer do
t.join
end
it 'should raise DeadlineExceeded', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@srv.wait_till_running
req = EchoMsg.new
stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
timeout = service.delay - 0.1
deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
responses = stub.a_server_streaming_rpc(req,
deadline: deadline,
metadata: { k1: 'v1', k2: 'v2' })
expect { responses.to_a }.to raise_error(GRPC::DeadlineExceeded)
@srv.stop
t.join
end
it 'should handle cancellation correctly', server: true do
request_received = false
request_received_mu = Mutex.new

Loading…
Cancel
Save