diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.cc b/src/core/ext/transport/chttp2/client/chttp2_connector.cc index b5593cb17db..1e930ca5fb5 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.cc +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.cc @@ -129,6 +129,15 @@ void Chttp2Connector::StartHandshakeLocked() { endpoint_ = nullptr; // Endpoint handed off to handshake manager. } +namespace { +void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure, + grpc_error* error) { + grpc_closure* c = *closure; + *closure = nullptr; + ExecCtx::Run(location, c, error); +} +} // namespace + void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) { auto* args = static_cast(arg); Chttp2Connector* self = static_cast(args->user_data); @@ -154,53 +163,99 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) { error = GRPC_ERROR_REF(error); } self->result_->Reset(); + NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error); } else if (args->endpoint != nullptr) { - grpc_endpoint_delete_from_pollset_set(args->endpoint, - self->args_.interested_parties); self->result_->transport = grpc_create_chttp2_transport(args->args, args->endpoint, true); self->result_->socket_node = grpc_chttp2_transport_get_socket_node(self->result_->transport); + self->result_->channel_args = args->args; GPR_ASSERT(self->result_->transport != nullptr); - // TODO(roth): We ideally want to wait until we receive HTTP/2 - // settings from the server before we consider the connection - // established. If that doesn't happen before the connection - // timeout expires, then we should consider the connection attempt a - // failure and feed that information back into the backoff code. - // We could pass a notify_on_receive_settings callback to - // grpc_chttp2_transport_start_reading() to let us know when - // settings are received, but we would need to figure out how to use - // that information here. - // - // Unfortunately, we don't currently have a way to split apart the two - // effects of scheduling c->notify: we start sending RPCs immediately - // (which we want to do) and we consider the connection attempt successful - // (which we don't want to do until we get the notify_on_receive_settings - // callback from the transport). If we could split those things - // apart, then we could start sending RPCs but then wait for our - // timeout before deciding if the connection attempt is successful. - // If the attempt is not successful, then we would tear down the - // transport and feed the failure back into the backoff code. - // - // In addition, even if we did that, we would probably not want to do - // so until after transparent retries is implemented. Otherwise, any - // RPC that we attempt to send on the connection before the timeout - // would fail instead of being retried on a subsequent attempt. + self->endpoint_ = args->endpoint; + self->Ref().release(); // Ref held by OnReceiveSettings() + GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, + grpc_schedule_on_exec_ctx); + self->Ref().release(); // Ref held by OnTimeout() grpc_chttp2_transport_start_reading(self->result_->transport, - args->read_buffer, nullptr); - self->result_->channel_args = args->args; + args->read_buffer, + &self->on_receive_settings_); + GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self, + grpc_schedule_on_exec_ctx); + grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_); } else { // If the handshaking succeeded but there is no endpoint, then the // handshaker may have handed off the connection to some external // code. Just verify that exit_early flag is set. GPR_DEBUG_ASSERT(args->exit_early); + NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error); } - grpc_closure* notify = self->notify_; - self->notify_ = nullptr; - ExecCtx::Run(DEBUG_LOCATION, notify, error); self->handshake_mgr_.reset(); } self->Unref(); } +void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error* error) { + Chttp2Connector* self = static_cast(arg); + { + MutexLock lock(&self->mu_); + if (!self->notify_error_.has_value()) { + if (error != GRPC_ERROR_NONE) { + // Transport got an error while waiting on SETTINGS frame. + // TODO(yashykt): The following two lines should be moved to + // SubchannelConnector::Result::Reset() + grpc_transport_destroy(self->result_->transport); + grpc_channel_args_destroy(self->result_->channel_args); + self->result_->Reset(); + } + self->MaybeNotify(GRPC_ERROR_REF(error)); + grpc_timer_cancel(&self->timer_); + } else { + // OnTimeout() was already invoked. Call Notify() again so that notify_ + // can be invoked. + self->MaybeNotify(GRPC_ERROR_NONE); + } + } + self->Unref(); +} + +void Chttp2Connector::OnTimeout(void* arg, grpc_error* error) { + Chttp2Connector* self = static_cast(arg); + { + MutexLock lock(&self->mu_); + if (!self->notify_error_.has_value()) { + // The transport did not receive the settings frame in time. Destroy the + // transport. + // TODO(yashykt): The following two lines should be moved to + // SubchannelConnector::Result::Reset() + grpc_transport_destroy(self->result_->transport); + grpc_channel_args_destroy(self->result_->channel_args); + self->result_->Reset(); + self->MaybeNotify(GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "connection attempt timed out before receiving SETTINGS frame")); + } else { + // OnReceiveSettings() was already invoked. Call Notify() again so that + // notify_ can be invoked. + self->MaybeNotify(GRPC_ERROR_NONE); + } + } + self->Unref(); +} + +void Chttp2Connector::MaybeNotify(grpc_error* error) { + if (notify_error_.has_value()) { + GRPC_ERROR_UNREF(error); + NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, notify_error_.value()); + // Clear out the endpoint, since it is the responsibility of the transport + // to shut it down. + // Clear state for a new Connect(). + grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties); + // We do not destroy the endpoint here, since it is the responsibility of + // the transport to shut it down. + endpoint_ = nullptr; + notify_error_.reset(); + } else { + notify_error_ = error; + } +} + } // namespace grpc_core diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.h b/src/core/ext/transport/chttp2/client/chttp2_connector.h index 1ecd172bba1..37143c8f187 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.h +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.h @@ -39,6 +39,19 @@ class Chttp2Connector : public SubchannelConnector { static void Connected(void* arg, grpc_error* error); void StartHandshakeLocked(); static void OnHandshakeDone(void* arg, grpc_error* error); + static void OnReceiveSettings(void* arg, grpc_error* error); + static void OnTimeout(void* arg, grpc_error* error); + + // We cannot invoke notify_ until both OnTimeout() and OnReceiveSettings() + // have been called since that is an indicator to the upper layer that we are + // done with the connection attempt. So, the notification process is broken + // into two steps. 1) Either OnTimeout() or OnReceiveSettings() gets invoked + // first. Whichever gets invoked, calls MaybeNotify() to set the result and + // triggers the other callback to be invoked. 2) When the other callback is + // invoked, we call MaybeNotify() again to actually invoke the notify_ + // callback. Note that this only happens if the handshake is done and the + // connector is waiting on the SETTINGS frame. + void MaybeNotify(grpc_error* error); Mutex mu_; Args args_; @@ -47,9 +60,13 @@ class Chttp2Connector : public SubchannelConnector { bool shutdown_ = false; bool connecting_ = false; // Holds the endpoint when first created before being handed off to - // the handshake manager. + // the handshake manager, and then again after handshake is done. grpc_endpoint* endpoint_ = nullptr; grpc_closure connected_; + grpc_closure on_receive_settings_; + grpc_timer timer_; + grpc_closure on_timeout_; + absl::optional notify_error_; RefCountedPtr handshake_mgr_; }; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index adab27a2c2a..3ea224e0f9b 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -591,7 +591,7 @@ static void close_transport_locked(grpc_chttp2_transport* t, } if (t->notify_on_receive_settings != nullptr) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings, - GRPC_ERROR_CANCELLED); + GRPC_ERROR_REF(error)); t->notify_on_receive_settings = nullptr; } GRPC_ERROR_UNREF(error); diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 6b44b22acf7..34b58593d37 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -43,6 +43,16 @@ describe GRPC::ActiveCall do @server = new_core_server_for_testing(nil) server_port = @server.add_http2_port(host, :this_port_is_insecure) @server.start + @received_rpcs_queue = Queue.new + @server_thread = Thread.new do + begin + received_rpc = @server.request_call + rescue GRPC::Core::CallError, StandardError => e + # enqueue the exception in this case as a way to indicate the error + received_rpc = e + end + @received_rpcs_queue.push(received_rpc) + end @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure) end @@ -50,6 +60,7 @@ describe GRPC::ActiveCall do after(:each) do @server.shutdown_and_notify(deadline) @server.close + @server_thread.join end describe 'restricted view methods' do @@ -105,7 +116,7 @@ describe GRPC::ActiveCall do client_call.remote_send(msg) # check that server rpc new was received - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop expect(recvd_rpc).to_not eq nil recvd_call = recvd_rpc.call @@ -130,7 +141,7 @@ describe GRPC::ActiveCall do client_call.remote_send(msg) # confirm that the message was marshalled - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call server_ops = { CallOps::SEND_INITIAL_METADATA => nil @@ -160,7 +171,7 @@ describe GRPC::ActiveCall do call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 # confirm that the message was marshalled - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call server_ops = { CallOps::SEND_INITIAL_METADATA => nil @@ -321,7 +332,7 @@ describe GRPC::ActiveCall do call = make_test_call metadata = { k1: 'v1', k2: 'v2' } ActiveCall.client_invoke(call, metadata) - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call expect(recvd_call).to_not be_nil expect(recvd_rpc.metadata).to_not be_nil @@ -339,7 +350,7 @@ describe GRPC::ActiveCall do call = make_test_call ActiveCall.client_invoke(call) - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop server_call = ActiveCall.new( recvd_rpc.call, @pass_through, @@ -405,7 +416,7 @@ describe GRPC::ActiveCall do client_call = make_test_call ActiveCall.client_invoke(client_call) - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call server_call = ActiveCall.new( @@ -575,7 +586,7 @@ describe GRPC::ActiveCall do @client_call = make_test_call @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop recvd_call = recvd_rpc.call @server_call = ActiveCall.new( recvd_call, @@ -654,7 +665,7 @@ describe GRPC::ActiveCall do end def expect_server_to_be_invoked(**kw) - recvd_rpc = @server.request_call + recvd_rpc = @received_rpcs_queue.pop expect(recvd_rpc).to_not eq nil recvd_call = recvd_rpc.call recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index dd927a155d3..56419a24647 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -29,6 +29,7 @@ #include #include +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/memory.h" @@ -47,15 +48,16 @@ "Content-Length: 0\n" \ "Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n" -#define HTTP2_RESP(STATUS_CODE) \ - "\x00\x00\x00\x04\x00\x00\x00\x00\x00" \ - "\x00\x00>\x01\x04\x00\x00\x00\x01" \ - "\x10\x0e" \ - "content-length\x01" \ - "0" \ - "\x10\x0c" \ - "content-type\x10" \ - "application/grpc" \ +#define HTTP2_SETTINGS_FRAME "\x00\x00\x00\x04\x00\x00\x00\x00\x00" + +#define HTTP2_RESP(STATUS_CODE) \ + "\x00\x00>\x01\x04\x00\x00\x00\x01" \ + "\x10\x0e" \ + "content-length\x01" \ + "0" \ + "\x10\x0c" \ + "content-type\x10" \ + "application/grpc" \ "\x10\x07:status\x03" #STATUS_CODE #define UNPARSEABLE_RESP "Bad Request\n" @@ -63,8 +65,6 @@ #define HTTP2_DETAIL_MSG(STATUS_CODE) \ "Received http2 header with status: " #STATUS_CODE -#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server" - /* TODO(zyc) Check the content of incoming data instead of using this length */ /* The 'bad' server will start sending responses after reading this amount of * data from the client. */ @@ -80,24 +80,32 @@ struct rpc_state { grpc_slice_buffer outgoing_buffer; grpc_endpoint* tcp; gpr_atm done_atm; - bool write_done; + bool http2_response; + bool send_settings; const char* response_payload; size_t response_payload_length; + bool connection_attempt_made; }; static int server_port; static struct rpc_state state; static grpc_closure on_read; +static grpc_closure on_writing_settings_frame; static grpc_closure on_write; static void* tag(intptr_t t) { return (void*)t; } static void done_write(void* /*arg*/, grpc_error* error) { GPR_ASSERT(error == GRPC_ERROR_NONE); - gpr_atm_rel_store(&state.done_atm, 1); } +static void done_writing_settings_frame(void* /* arg */, grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); +} + static void handle_write() { grpc_slice slice = grpc_slice_from_copied_buffer( state.response_payload, state.response_payload_length); @@ -108,7 +116,10 @@ static void handle_write() { } static void handle_read(void* /*arg*/, grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); + if (error != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "handle_read error: %s", grpc_error_string(error)); + return; + } state.incoming_data_length += state.temp_incoming_buffer.length; size_t i; @@ -119,11 +130,14 @@ static void handle_read(void* /*arg*/, grpc_error* error) { gpr_free(dump); } - gpr_log(GPR_DEBUG, "got %" PRIuPTR " bytes, expected %" PRIuPTR " bytes", + gpr_log(GPR_DEBUG, + "got %" PRIuPTR " bytes, expected %" PRIuPTR + " bytes or a non-HTTP2 response to be sent", state.incoming_data_length, SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD); if (state.incoming_data_length >= - SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { + SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD || + !state.http2_response) { handle_write(); } else { grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, @@ -137,14 +151,26 @@ static void on_connect(void* arg, grpc_endpoint* tcp, gpr_free(acceptor); test_tcp_server* server = static_cast(arg); GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&on_writing_settings_frame, done_writing_settings_frame, + nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&state.temp_incoming_buffer); grpc_slice_buffer_init(&state.outgoing_buffer); + state.connection_attempt_made = true; state.tcp = tcp; state.incoming_data_length = 0; grpc_endpoint_add_to_pollset(tcp, server->pollset[0]); - grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read, - /*urgent=*/false); + if (state.send_settings) { + // Send settings frame from server + grpc_slice slice = grpc_slice_from_static_buffer( + HTTP2_SETTINGS_FRAME, sizeof(HTTP2_SETTINGS_FRAME) - 1); + grpc_slice_buffer_add(&state.outgoing_buffer, slice); + grpc_endpoint_write(state.tcp, &state.outgoing_buffer, + &on_writing_settings_frame, nullptr); + } else { + grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, + /*urgent=*/false); + } } static gpr_timespec n_sec_deadline(int seconds) { @@ -166,13 +192,20 @@ static void start_rpc(int target_port, grpc_status_code expected_status, state.cq = grpc_completion_queue_create_for_next(nullptr); cqv = cq_verifier_create(state.cq); state.target = grpc_core::JoinHostPort("127.0.0.1", target_port); + state.channel = grpc_insecure_channel_create(state.target.c_str(), nullptr, nullptr); grpc_slice host = grpc_slice_from_static_string("localhost"); + // The default connect deadline is 20 seconds, so reduce the RPC deadline to 1 + // second. This helps us verify - a) If the server responded with a non-HTTP2 + // response, the connect fails immediately resulting in + // GRPC_STATUS_UNAVAILABLE instead of GRPC_STATUS_DEADLINE_EXCEEDED. b) If the + // server does not send a HTTP2 SETTINGs frame, the RPC fails with a + // DEADLINE_EXCEEDED. state.call = grpc_channel_create_call( state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq, grpc_slice_from_static_string("/Service/Method"), &host, - gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); + n_sec_deadline(1), nullptr); grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); @@ -241,7 +274,7 @@ typedef struct { static void actually_poll_server(void* arg) { poll_args* pa = static_cast(arg); - gpr_timespec deadline = n_sec_deadline(10); + gpr_timespec deadline = n_sec_deadline(1); while (true) { bool done = gpr_atm_acq_load(&state.done_atm) != 0; gpr_timespec time_left = @@ -251,7 +284,7 @@ static void actually_poll_server(void* arg) { if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) { break; } - test_tcp_server_poll(pa->server, 1000); + test_tcp_server_poll(pa->server, 100); } gpr_event_set(pa->signal_when_done, (void*)1); gpr_free(pa); @@ -260,7 +293,7 @@ static void actually_poll_server(void* arg) { static grpc_core::Thread* poll_server_until_read_done( test_tcp_server* server, gpr_event* signal_when_done) { gpr_atm_rel_store(&state.done_atm, 0); - state.write_done = 0; + state.connection_attempt_made = false; poll_args* pa = static_cast(gpr_malloc(sizeof(*pa))); pa->server = server; pa->signal_when_done = signal_when_done; @@ -270,7 +303,8 @@ static grpc_core::Thread* poll_server_until_read_done( return th; } -static void run_test(const char* response_payload, +static void run_test(bool http2_response, bool send_settings, + const char* response_payload, size_t response_payload_length, grpc_status_code expected_status, const char* expected_detail) { @@ -283,6 +317,8 @@ static void run_test(const char* response_payload, server_port = grpc_pick_unused_port_or_die(); test_tcp_server_init(&test_server, on_connect, &test_server); test_tcp_server_start(&test_server, server_port); + state.http2_response = http2_response; + state.send_settings = send_settings; state.response_payload = response_payload; state.response_payload_length = response_payload_length; @@ -292,7 +328,8 @@ static void run_test(const char* response_payload, start_rpc(server_port, expected_status, expected_detail); gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); thdptr->Join(); - + /* Proof that the server accepted the TCP connection. */ + GPR_ASSERT(state.connection_attempt_made == true); /* clean up */ grpc_endpoint_shutdown(state.tcp, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); @@ -309,43 +346,48 @@ int main(int argc, char** argv) { grpc_init(); /* status defined in hpack static table */ - run_test(HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, GRPC_STATUS_UNKNOWN, - HTTP2_DETAIL_MSG(204)); - run_test(HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1, GRPC_STATUS_UNKNOWN, - HTTP2_DETAIL_MSG(206)); - run_test(HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1, GRPC_STATUS_UNKNOWN, - HTTP2_DETAIL_MSG(304)); - run_test(HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1, GRPC_STATUS_INTERNAL, - HTTP2_DETAIL_MSG(400)); - run_test(HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, + run_test(true, true, HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, + GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(204)); + run_test(true, true, HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1, + GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(206)); + run_test(true, true, HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1, + GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(304)); + run_test(true, true, HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1, + GRPC_STATUS_INTERNAL, HTTP2_DETAIL_MSG(400)); + run_test(true, true, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, GRPC_STATUS_UNIMPLEMENTED, HTTP2_DETAIL_MSG(404)); - run_test(HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1, GRPC_STATUS_UNKNOWN, - HTTP2_DETAIL_MSG(500)); + run_test(true, true, HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1, + GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(500)); /* status not defined in hpack static table */ - run_test(HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1, + run_test(true, true, HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1, GRPC_STATUS_UNAUTHENTICATED, HTTP2_DETAIL_MSG(401)); - run_test(HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1, + run_test(true, true, HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1, GRPC_STATUS_PERMISSION_DENIED, HTTP2_DETAIL_MSG(403)); - run_test(HTTP2_RESP(429), sizeof(HTTP2_RESP(429)) - 1, + run_test(true, true, HTTP2_RESP(429), sizeof(HTTP2_RESP(429)) - 1, GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(429)); - run_test(HTTP2_RESP(499), sizeof(HTTP2_RESP(499)) - 1, GRPC_STATUS_UNKNOWN, - HTTP2_DETAIL_MSG(499)); - run_test(HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1, + run_test(true, true, HTTP2_RESP(499), sizeof(HTTP2_RESP(499)) - 1, + GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(499)); + run_test(true, true, HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1, GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(502)); - run_test(HTTP2_RESP(503), sizeof(HTTP2_RESP(503)) - 1, + run_test(true, true, HTTP2_RESP(503), sizeof(HTTP2_RESP(503)) - 1, GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(503)); - run_test(HTTP2_RESP(504), sizeof(HTTP2_RESP(504)) - 1, + run_test(true, true, HTTP2_RESP(504), sizeof(HTTP2_RESP(504)) - 1, GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(504)); - - /* unparseable response */ - run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, GRPC_STATUS_UNKNOWN, - nullptr); - - /* http1 response */ - run_test(HTTP1_RESP_400, sizeof(HTTP1_RESP_400) - 1, GRPC_STATUS_INTERNAL, - HTTP1_DETAIL_MSG); - + /* unparseable response. RPC should fail immediately due to a connect failure. + */ + run_test(false, false, UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, + GRPC_STATUS_UNAVAILABLE, nullptr); + + /* http1 response. RPC should fail immediately due to a connect failure. */ + run_test(false, false, HTTP1_RESP_400, sizeof(HTTP1_RESP_400) - 1, + GRPC_STATUS_UNAVAILABLE, nullptr); + + /* http2 response without sending a SETTINGs frame. RPC should fail with + * DEADLINE_EXCEEDED since the RPC deadline is lower than the connection + * attempt deadline. */ + run_test(true, false, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, + GRPC_STATUS_DEADLINE_EXCEEDED, nullptr); grpc_shutdown(); return 0; } diff --git a/test/core/handshake/client_ssl.cc b/test/core/handshake/client_ssl.cc index e7faa69f964..8a72620bb42 100644 --- a/test/core/handshake/client_ssl.cc +++ b/test/core/handshake/client_ssl.cc @@ -198,6 +198,10 @@ static void server_thread(void* arg) { gpr_log(GPR_INFO, "Handshake successful."); } + // Send out the settings frame. + const char settings_frame[] = "\x00\x00\x00\x04\x00\x00\x00\x00\x00"; + SSL_write(ssl, settings_frame, sizeof(settings_frame) - 1); + // Wait until the client drops its connection. char buf; while (SSL_read(ssl, &buf, sizeof(buf)) > 0) diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index 04421d2765e..062a47a8bef 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -184,6 +185,7 @@ class FilterEnd2endTest : public ::testing::Test { // The string needs to be long enough to test heap-based slice. send_request.set_message("Hello world. Hello world. Hello world."); + std::thread request_call([this]() { server_ok(4); }); std::unique_ptr call = generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); call->StartCall(tag(1)); @@ -200,7 +202,7 @@ class FilterEnd2endTest : public ::testing::Test { generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4)); - verify_ok(srv_cq_.get(), 4, true); + request_call.join(); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); ByteBuffer recv_buffer; @@ -278,6 +280,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); send_request.set_message("Hello"); + std::thread request_call([this]() { server_ok(2); }); std::unique_ptr cli_stream = generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); cli_stream->StartCall(tag(1)); @@ -286,7 +289,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), srv_cq_.get(), tag(2)); - verify_ok(srv_cq_.get(), 2, true); + request_call.join(); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); diff --git a/test/cpp/end2end/generic_end2end_test.cc b/test/cpp/end2end/generic_end2end_test.cc index 60ea51875b1..0d39691bb60 100644 --- a/test/cpp/end2end/generic_end2end_test.cc +++ b/test/cpp/end2end/generic_end2end_test.cc @@ -140,6 +140,7 @@ class GenericEnd2endTest : public ::testing::Test { delete method_name; // Make sure that this is not needed after invocation + std::thread request_call([this]() { server_ok(4); }); call->StartCall(tag(1)); client_ok(1); std::unique_ptr send_buffer = @@ -154,7 +155,7 @@ class GenericEnd2endTest : public ::testing::Test { generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4)); - verify_ok(srv_cq_.get(), 4, true); + request_call.join(); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); @@ -282,7 +283,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { std::unique_ptr cli_send_buffer = SerializeToByteBuffer(&send_request); - // Use the same cq as server so that events can be polled in time. + std::thread request_call([this]() { server_ok(4); }); std::unique_ptr call = generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, *cli_send_buffer.get(), &cli_cq_); @@ -293,8 +294,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) { generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), srv_cq_.get(), tag(4)); - - server_ok(4); + request_call.join(); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); @@ -337,6 +337,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); send_request.set_message("Hello"); + std::thread request_call([this]() { server_ok(2); }); std::unique_ptr cli_stream = generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); cli_stream->StartCall(tag(1)); @@ -344,8 +345,8 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) { generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), srv_cq_.get(), tag(2)); + request_call.join(); - verify_ok(srv_cq_.get(), 2, true); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(kMethodName, srv_ctx.method()); diff --git a/test/cpp/end2end/server_interceptors_end2end_test.cc b/test/cpp/end2end/server_interceptors_end2end_test.cc index e8e0435a960..25e64f7580d 100644 --- a/test/cpp/end2end/server_interceptors_end2end_test.cc +++ b/test/cpp/end2end/server_interceptors_end2end_test.cc @@ -536,6 +536,8 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { send_request.set_message("Hello"); cli_ctx.AddMetadata("testkey", "testvalue"); + CompletionQueue* cq = srv_cq.get(); + std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); }); std::unique_ptr call = generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq); call->StartCall(tag(1)); @@ -551,7 +553,7 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) { service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4)); - Verifier().Expect(4, true).Verify(srv_cq.get()); + request_call.join(); EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); srv_ctx.AddTrailingMetadata("testkey", "testvalue");