Merge pull request #23636 from yashykt/settingsdeadline

Receive SETTINGS frame on clients before declaring subchannel READY
pull/23727/head
Yash Tibrewal 4 years ago committed by GitHub
commit d35c1c13cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 117
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  2. 19
      src/core/ext/transport/chttp2/client/chttp2_connector.h
  3. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 27
      src/ruby/spec/generic/active_call_spec.rb
  5. 146
      test/core/end2end/bad_server_response_test.cc
  6. 4
      test/core/handshake/client_ssl.cc
  7. 7
      test/cpp/end2end/filter_end2end_test.cc
  8. 11
      test/cpp/end2end/generic_end2end_test.cc
  9. 4
      test/cpp/end2end/server_interceptors_end2end_test.cc

@ -129,6 +129,15 @@ void Chttp2Connector::StartHandshakeLocked() {
endpoint_ = nullptr; // Endpoint handed off to handshake manager. endpoint_ = nullptr; // Endpoint handed off to handshake manager.
} }
namespace {
void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure,
grpc_error* error) {
grpc_closure* c = *closure;
*closure = nullptr;
ExecCtx::Run(location, c, error);
}
} // namespace
void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) { void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) {
auto* args = static_cast<HandshakerArgs*>(arg); auto* args = static_cast<HandshakerArgs*>(arg);
Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data); Chttp2Connector* self = static_cast<Chttp2Connector*>(args->user_data);
@ -154,53 +163,99 @@ void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error* error) {
error = GRPC_ERROR_REF(error); error = GRPC_ERROR_REF(error);
} }
self->result_->Reset(); self->result_->Reset();
NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
} else if (args->endpoint != nullptr) { } else if (args->endpoint != nullptr) {
grpc_endpoint_delete_from_pollset_set(args->endpoint,
self->args_.interested_parties);
self->result_->transport = self->result_->transport =
grpc_create_chttp2_transport(args->args, args->endpoint, true); grpc_create_chttp2_transport(args->args, args->endpoint, true);
self->result_->socket_node = self->result_->socket_node =
grpc_chttp2_transport_get_socket_node(self->result_->transport); grpc_chttp2_transport_get_socket_node(self->result_->transport);
self->result_->channel_args = args->args;
GPR_ASSERT(self->result_->transport != nullptr); GPR_ASSERT(self->result_->transport != nullptr);
// TODO(roth): We ideally want to wait until we receive HTTP/2 self->endpoint_ = args->endpoint;
// settings from the server before we consider the connection self->Ref().release(); // Ref held by OnReceiveSettings()
// established. If that doesn't happen before the connection GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
// timeout expires, then we should consider the connection attempt a grpc_schedule_on_exec_ctx);
// failure and feed that information back into the backoff code. self->Ref().release(); // Ref held by OnTimeout()
// We could pass a notify_on_receive_settings callback to
// grpc_chttp2_transport_start_reading() to let us know when
// settings are received, but we would need to figure out how to use
// that information here.
//
// Unfortunately, we don't currently have a way to split apart the two
// effects of scheduling c->notify: we start sending RPCs immediately
// (which we want to do) and we consider the connection attempt successful
// (which we don't want to do until we get the notify_on_receive_settings
// callback from the transport). If we could split those things
// apart, then we could start sending RPCs but then wait for our
// timeout before deciding if the connection attempt is successful.
// If the attempt is not successful, then we would tear down the
// transport and feed the failure back into the backoff code.
//
// In addition, even if we did that, we would probably not want to do
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
grpc_chttp2_transport_start_reading(self->result_->transport, grpc_chttp2_transport_start_reading(self->result_->transport,
args->read_buffer, nullptr); args->read_buffer,
self->result_->channel_args = args->args; &self->on_receive_settings_);
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->args_.deadline, &self->on_timeout_);
} else { } else {
// If the handshaking succeeded but there is no endpoint, then the // If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external // handshaker may have handed off the connection to some external
// code. Just verify that exit_early flag is set. // code. Just verify that exit_early flag is set.
GPR_DEBUG_ASSERT(args->exit_early); GPR_DEBUG_ASSERT(args->exit_early);
NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error);
} }
grpc_closure* notify = self->notify_;
self->notify_ = nullptr;
ExecCtx::Run(DEBUG_LOCATION, notify, error);
self->handshake_mgr_.reset(); self->handshake_mgr_.reset();
} }
self->Unref(); self->Unref();
} }
void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error* error) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
if (error != GRPC_ERROR_NONE) {
// Transport got an error while waiting on SETTINGS frame.
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(self->result_->transport);
grpc_channel_args_destroy(self->result_->channel_args);
self->result_->Reset();
}
self->MaybeNotify(GRPC_ERROR_REF(error));
grpc_timer_cancel(&self->timer_);
} else {
// OnTimeout() was already invoked. Call Notify() again so that notify_
// can be invoked.
self->MaybeNotify(GRPC_ERROR_NONE);
}
}
self->Unref();
}
void Chttp2Connector::OnTimeout(void* arg, grpc_error* error) {
Chttp2Connector* self = static_cast<Chttp2Connector*>(arg);
{
MutexLock lock(&self->mu_);
if (!self->notify_error_.has_value()) {
// The transport did not receive the settings frame in time. Destroy the
// transport.
// TODO(yashykt): The following two lines should be moved to
// SubchannelConnector::Result::Reset()
grpc_transport_destroy(self->result_->transport);
grpc_channel_args_destroy(self->result_->channel_args);
self->result_->Reset();
self->MaybeNotify(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"connection attempt timed out before receiving SETTINGS frame"));
} else {
// OnReceiveSettings() was already invoked. Call Notify() again so that
// notify_ can be invoked.
self->MaybeNotify(GRPC_ERROR_NONE);
}
}
self->Unref();
}
void Chttp2Connector::MaybeNotify(grpc_error* error) {
if (notify_error_.has_value()) {
GRPC_ERROR_UNREF(error);
NullThenSchedClosure(DEBUG_LOCATION, &notify_, notify_error_.value());
// Clear out the endpoint, since it is the responsibility of the transport
// to shut it down.
// Clear state for a new Connect().
grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties);
// We do not destroy the endpoint here, since it is the responsibility of
// the transport to shut it down.
endpoint_ = nullptr;
notify_error_.reset();
} else {
notify_error_ = error;
}
}
} // namespace grpc_core } // namespace grpc_core

@ -39,6 +39,19 @@ class Chttp2Connector : public SubchannelConnector {
static void Connected(void* arg, grpc_error* error); static void Connected(void* arg, grpc_error* error);
void StartHandshakeLocked(); void StartHandshakeLocked();
static void OnHandshakeDone(void* arg, grpc_error* error); static void OnHandshakeDone(void* arg, grpc_error* error);
static void OnReceiveSettings(void* arg, grpc_error* error);
static void OnTimeout(void* arg, grpc_error* error);
// We cannot invoke notify_ until both OnTimeout() and OnReceiveSettings()
// have been called since that is an indicator to the upper layer that we are
// done with the connection attempt. So, the notification process is broken
// into two steps. 1) Either OnTimeout() or OnReceiveSettings() gets invoked
// first. Whichever gets invoked, calls MaybeNotify() to set the result and
// triggers the other callback to be invoked. 2) When the other callback is
// invoked, we call MaybeNotify() again to actually invoke the notify_
// callback. Note that this only happens if the handshake is done and the
// connector is waiting on the SETTINGS frame.
void MaybeNotify(grpc_error* error);
Mutex mu_; Mutex mu_;
Args args_; Args args_;
@ -47,9 +60,13 @@ class Chttp2Connector : public SubchannelConnector {
bool shutdown_ = false; bool shutdown_ = false;
bool connecting_ = false; bool connecting_ = false;
// Holds the endpoint when first created before being handed off to // Holds the endpoint when first created before being handed off to
// the handshake manager. // the handshake manager, and then again after handshake is done.
grpc_endpoint* endpoint_ = nullptr; grpc_endpoint* endpoint_ = nullptr;
grpc_closure connected_; grpc_closure connected_;
grpc_closure on_receive_settings_;
grpc_timer timer_;
grpc_closure on_timeout_;
absl::optional<grpc_error*> notify_error_;
RefCountedPtr<HandshakeManager> handshake_mgr_; RefCountedPtr<HandshakeManager> handshake_mgr_;
}; };

@ -591,7 +591,7 @@ static void close_transport_locked(grpc_chttp2_transport* t,
} }
if (t->notify_on_receive_settings != nullptr) { if (t->notify_on_receive_settings != nullptr) {
grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings, grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
GRPC_ERROR_CANCELLED); GRPC_ERROR_REF(error));
t->notify_on_receive_settings = nullptr; t->notify_on_receive_settings = nullptr;
} }
GRPC_ERROR_UNREF(error); GRPC_ERROR_UNREF(error);

@ -43,6 +43,16 @@ describe GRPC::ActiveCall do
@server = new_core_server_for_testing(nil) @server = new_core_server_for_testing(nil)
server_port = @server.add_http2_port(host, :this_port_is_insecure) server_port = @server.add_http2_port(host, :this_port_is_insecure)
@server.start @server.start
@received_rpcs_queue = Queue.new
@server_thread = Thread.new do
begin
received_rpc = @server.request_call
rescue GRPC::Core::CallError, StandardError => e
# enqueue the exception in this case as a way to indicate the error
received_rpc = e
end
@received_rpcs_queue.push(received_rpc)
end
@ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
:this_channel_is_insecure) :this_channel_is_insecure)
end end
@ -50,6 +60,7 @@ describe GRPC::ActiveCall do
after(:each) do after(:each) do
@server.shutdown_and_notify(deadline) @server.shutdown_and_notify(deadline)
@server.close @server.close
@server_thread.join
end end
describe 'restricted view methods' do describe 'restricted view methods' do
@ -105,7 +116,7 @@ describe GRPC::ActiveCall do
client_call.remote_send(msg) client_call.remote_send(msg)
# check that server rpc new was received # check that server rpc new was received
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
expect(recvd_rpc).to_not eq nil expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
@ -130,7 +141,7 @@ describe GRPC::ActiveCall do
client_call.remote_send(msg) client_call.remote_send(msg)
# confirm that the message was marshalled # confirm that the message was marshalled
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
server_ops = { server_ops = {
CallOps::SEND_INITIAL_METADATA => nil CallOps::SEND_INITIAL_METADATA => nil
@ -160,7 +171,7 @@ describe GRPC::ActiveCall do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1
# confirm that the message was marshalled # confirm that the message was marshalled
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
server_ops = { server_ops = {
CallOps::SEND_INITIAL_METADATA => nil CallOps::SEND_INITIAL_METADATA => nil
@ -321,7 +332,7 @@ describe GRPC::ActiveCall do
call = make_test_call call = make_test_call
metadata = { k1: 'v1', k2: 'v2' } metadata = { k1: 'v1', k2: 'v2' }
ActiveCall.client_invoke(call, metadata) ActiveCall.client_invoke(call, metadata)
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil expect(recvd_call).to_not be_nil
expect(recvd_rpc.metadata).to_not be_nil expect(recvd_rpc.metadata).to_not be_nil
@ -339,7 +350,7 @@ describe GRPC::ActiveCall do
call = make_test_call call = make_test_call
ActiveCall.client_invoke(call) ActiveCall.client_invoke(call)
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
server_call = ActiveCall.new( server_call = ActiveCall.new(
recvd_rpc.call, recvd_rpc.call,
@pass_through, @pass_through,
@ -405,7 +416,7 @@ describe GRPC::ActiveCall do
client_call = make_test_call client_call = make_test_call
ActiveCall.client_invoke(client_call) ActiveCall.client_invoke(client_call)
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
server_call = ActiveCall.new( server_call = ActiveCall.new(
@ -575,7 +586,7 @@ describe GRPC::ActiveCall do
@client_call = make_test_call @client_call = make_test_call
@client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
@server_call = ActiveCall.new( @server_call = ActiveCall.new(
recvd_call, recvd_call,
@ -654,7 +665,7 @@ describe GRPC::ActiveCall do
end end
def expect_server_to_be_invoked(**kw) def expect_server_to_be_invoked(**kw)
recvd_rpc = @server.request_call recvd_rpc = @received_rpcs_queue.pop
expect(recvd_rpc).to_not eq nil expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call recvd_call = recvd_rpc.call
recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw)

@ -29,6 +29,7 @@
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/log.h> #include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/string.h"
#include "src/core/lib/gprpp/host_port.h" #include "src/core/lib/gprpp/host_port.h"
#include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/memory.h"
@ -47,15 +48,16 @@
"Content-Length: 0\n" \ "Content-Length: 0\n" \
"Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n" "Date: Tue, 07 Jun 2016 17:43:20 GMT\n\n"
#define HTTP2_RESP(STATUS_CODE) \ #define HTTP2_SETTINGS_FRAME "\x00\x00\x00\x04\x00\x00\x00\x00\x00"
"\x00\x00\x00\x04\x00\x00\x00\x00\x00" \
"\x00\x00>\x01\x04\x00\x00\x00\x01" \ #define HTTP2_RESP(STATUS_CODE) \
"\x10\x0e" \ "\x00\x00>\x01\x04\x00\x00\x00\x01" \
"content-length\x01" \ "\x10\x0e" \
"0" \ "content-length\x01" \
"\x10\x0c" \ "0" \
"content-type\x10" \ "\x10\x0c" \
"application/grpc" \ "content-type\x10" \
"application/grpc" \
"\x10\x07:status\x03" #STATUS_CODE "\x10\x07:status\x03" #STATUS_CODE
#define UNPARSEABLE_RESP "Bad Request\n" #define UNPARSEABLE_RESP "Bad Request\n"
@ -63,8 +65,6 @@
#define HTTP2_DETAIL_MSG(STATUS_CODE) \ #define HTTP2_DETAIL_MSG(STATUS_CODE) \
"Received http2 header with status: " #STATUS_CODE "Received http2 header with status: " #STATUS_CODE
#define HTTP1_DETAIL_MSG "Trying to connect an http1.x server"
/* TODO(zyc) Check the content of incoming data instead of using this length */ /* TODO(zyc) Check the content of incoming data instead of using this length */
/* The 'bad' server will start sending responses after reading this amount of /* The 'bad' server will start sending responses after reading this amount of
* data from the client. */ * data from the client. */
@ -80,24 +80,32 @@ struct rpc_state {
grpc_slice_buffer outgoing_buffer; grpc_slice_buffer outgoing_buffer;
grpc_endpoint* tcp; grpc_endpoint* tcp;
gpr_atm done_atm; gpr_atm done_atm;
bool write_done; bool http2_response;
bool send_settings;
const char* response_payload; const char* response_payload;
size_t response_payload_length; size_t response_payload_length;
bool connection_attempt_made;
}; };
static int server_port; static int server_port;
static struct rpc_state state; static struct rpc_state state;
static grpc_closure on_read; static grpc_closure on_read;
static grpc_closure on_writing_settings_frame;
static grpc_closure on_write; static grpc_closure on_write;
static void* tag(intptr_t t) { return (void*)t; } static void* tag(intptr_t t) { return (void*)t; }
static void done_write(void* /*arg*/, grpc_error* error) { static void done_write(void* /*arg*/, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE); GPR_ASSERT(error == GRPC_ERROR_NONE);
gpr_atm_rel_store(&state.done_atm, 1); gpr_atm_rel_store(&state.done_atm, 1);
} }
static void done_writing_settings_frame(void* /* arg */, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}
static void handle_write() { static void handle_write() {
grpc_slice slice = grpc_slice_from_copied_buffer( grpc_slice slice = grpc_slice_from_copied_buffer(
state.response_payload, state.response_payload_length); state.response_payload, state.response_payload_length);
@ -108,7 +116,10 @@ static void handle_write() {
} }
static void handle_read(void* /*arg*/, grpc_error* error) { static void handle_read(void* /*arg*/, grpc_error* error) {
GPR_ASSERT(error == GRPC_ERROR_NONE); if (error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "handle_read error: %s", grpc_error_string(error));
return;
}
state.incoming_data_length += state.temp_incoming_buffer.length; state.incoming_data_length += state.temp_incoming_buffer.length;
size_t i; size_t i;
@ -119,11 +130,14 @@ static void handle_read(void* /*arg*/, grpc_error* error) {
gpr_free(dump); gpr_free(dump);
} }
gpr_log(GPR_DEBUG, "got %" PRIuPTR " bytes, expected %" PRIuPTR " bytes", gpr_log(GPR_DEBUG,
"got %" PRIuPTR " bytes, expected %" PRIuPTR
" bytes or a non-HTTP2 response to be sent",
state.incoming_data_length, state.incoming_data_length,
SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD); SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD);
if (state.incoming_data_length >= if (state.incoming_data_length >=
SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD) { SERVER_INCOMING_DATA_LENGTH_LOWER_THRESHOLD ||
!state.http2_response) {
handle_write(); handle_write();
} else { } else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read, grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
@ -137,14 +151,26 @@ static void on_connect(void* arg, grpc_endpoint* tcp,
gpr_free(acceptor); gpr_free(acceptor);
test_tcp_server* server = static_cast<test_tcp_server*>(arg); test_tcp_server* server = static_cast<test_tcp_server*>(arg);
GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_read, handle_read, nullptr, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_writing_settings_frame, done_writing_settings_frame,
nullptr, grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_write, done_write, nullptr, grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&state.temp_incoming_buffer); grpc_slice_buffer_init(&state.temp_incoming_buffer);
grpc_slice_buffer_init(&state.outgoing_buffer); grpc_slice_buffer_init(&state.outgoing_buffer);
state.connection_attempt_made = true;
state.tcp = tcp; state.tcp = tcp;
state.incoming_data_length = 0; state.incoming_data_length = 0;
grpc_endpoint_add_to_pollset(tcp, server->pollset[0]); grpc_endpoint_add_to_pollset(tcp, server->pollset[0]);
grpc_endpoint_read(tcp, &state.temp_incoming_buffer, &on_read, if (state.send_settings) {
/*urgent=*/false); // Send settings frame from server
grpc_slice slice = grpc_slice_from_static_buffer(
HTTP2_SETTINGS_FRAME, sizeof(HTTP2_SETTINGS_FRAME) - 1);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
grpc_endpoint_write(state.tcp, &state.outgoing_buffer,
&on_writing_settings_frame, nullptr);
} else {
grpc_endpoint_read(state.tcp, &state.temp_incoming_buffer, &on_read,
/*urgent=*/false);
}
} }
static gpr_timespec n_sec_deadline(int seconds) { static gpr_timespec n_sec_deadline(int seconds) {
@ -166,13 +192,20 @@ static void start_rpc(int target_port, grpc_status_code expected_status,
state.cq = grpc_completion_queue_create_for_next(nullptr); state.cq = grpc_completion_queue_create_for_next(nullptr);
cqv = cq_verifier_create(state.cq); cqv = cq_verifier_create(state.cq);
state.target = grpc_core::JoinHostPort("127.0.0.1", target_port); state.target = grpc_core::JoinHostPort("127.0.0.1", target_port);
state.channel = state.channel =
grpc_insecure_channel_create(state.target.c_str(), nullptr, nullptr); grpc_insecure_channel_create(state.target.c_str(), nullptr, nullptr);
grpc_slice host = grpc_slice_from_static_string("localhost"); grpc_slice host = grpc_slice_from_static_string("localhost");
// The default connect deadline is 20 seconds, so reduce the RPC deadline to 1
// second. This helps us verify - a) If the server responded with a non-HTTP2
// response, the connect fails immediately resulting in
// GRPC_STATUS_UNAVAILABLE instead of GRPC_STATUS_DEADLINE_EXCEEDED. b) If the
// server does not send a HTTP2 SETTINGs frame, the RPC fails with a
// DEADLINE_EXCEEDED.
state.call = grpc_channel_create_call( state.call = grpc_channel_create_call(
state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq, state.channel, nullptr, GRPC_PROPAGATE_DEFAULTS, state.cq,
grpc_slice_from_static_string("/Service/Method"), &host, grpc_slice_from_static_string("/Service/Method"), &host,
gpr_inf_future(GPR_CLOCK_REALTIME), nullptr); n_sec_deadline(1), nullptr);
grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv);
@ -241,7 +274,7 @@ typedef struct {
static void actually_poll_server(void* arg) { static void actually_poll_server(void* arg) {
poll_args* pa = static_cast<poll_args*>(arg); poll_args* pa = static_cast<poll_args*>(arg);
gpr_timespec deadline = n_sec_deadline(10); gpr_timespec deadline = n_sec_deadline(1);
while (true) { while (true) {
bool done = gpr_atm_acq_load(&state.done_atm) != 0; bool done = gpr_atm_acq_load(&state.done_atm) != 0;
gpr_timespec time_left = gpr_timespec time_left =
@ -251,7 +284,7 @@ static void actually_poll_server(void* arg) {
if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) { if (done || gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) < 0) {
break; break;
} }
test_tcp_server_poll(pa->server, 1000); test_tcp_server_poll(pa->server, 100);
} }
gpr_event_set(pa->signal_when_done, (void*)1); gpr_event_set(pa->signal_when_done, (void*)1);
gpr_free(pa); gpr_free(pa);
@ -260,7 +293,7 @@ static void actually_poll_server(void* arg) {
static grpc_core::Thread* poll_server_until_read_done( static grpc_core::Thread* poll_server_until_read_done(
test_tcp_server* server, gpr_event* signal_when_done) { test_tcp_server* server, gpr_event* signal_when_done) {
gpr_atm_rel_store(&state.done_atm, 0); gpr_atm_rel_store(&state.done_atm, 0);
state.write_done = 0; state.connection_attempt_made = false;
poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa))); poll_args* pa = static_cast<poll_args*>(gpr_malloc(sizeof(*pa)));
pa->server = server; pa->server = server;
pa->signal_when_done = signal_when_done; pa->signal_when_done = signal_when_done;
@ -270,7 +303,8 @@ static grpc_core::Thread* poll_server_until_read_done(
return th; return th;
} }
static void run_test(const char* response_payload, static void run_test(bool http2_response, bool send_settings,
const char* response_payload,
size_t response_payload_length, size_t response_payload_length,
grpc_status_code expected_status, grpc_status_code expected_status,
const char* expected_detail) { const char* expected_detail) {
@ -283,6 +317,8 @@ static void run_test(const char* response_payload,
server_port = grpc_pick_unused_port_or_die(); server_port = grpc_pick_unused_port_or_die();
test_tcp_server_init(&test_server, on_connect, &test_server); test_tcp_server_init(&test_server, on_connect, &test_server);
test_tcp_server_start(&test_server, server_port); test_tcp_server_start(&test_server, server_port);
state.http2_response = http2_response;
state.send_settings = send_settings;
state.response_payload = response_payload; state.response_payload = response_payload;
state.response_payload_length = response_payload_length; state.response_payload_length = response_payload_length;
@ -292,7 +328,8 @@ static void run_test(const char* response_payload,
start_rpc(server_port, expected_status, expected_detail); start_rpc(server_port, expected_status, expected_detail);
gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
thdptr->Join(); thdptr->Join();
/* Proof that the server accepted the TCP connection. */
GPR_ASSERT(state.connection_attempt_made == true);
/* clean up */ /* clean up */
grpc_endpoint_shutdown(state.tcp, grpc_endpoint_shutdown(state.tcp,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
@ -309,43 +346,48 @@ int main(int argc, char** argv) {
grpc_init(); grpc_init();
/* status defined in hpack static table */ /* status defined in hpack static table */
run_test(HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1, GRPC_STATUS_UNKNOWN, run_test(true, true, HTTP2_RESP(204), sizeof(HTTP2_RESP(204)) - 1,
HTTP2_DETAIL_MSG(204)); GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(204));
run_test(HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1, GRPC_STATUS_UNKNOWN, run_test(true, true, HTTP2_RESP(206), sizeof(HTTP2_RESP(206)) - 1,
HTTP2_DETAIL_MSG(206)); GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(206));
run_test(HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1, GRPC_STATUS_UNKNOWN, run_test(true, true, HTTP2_RESP(304), sizeof(HTTP2_RESP(304)) - 1,
HTTP2_DETAIL_MSG(304)); GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(304));
run_test(HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1, GRPC_STATUS_INTERNAL, run_test(true, true, HTTP2_RESP(400), sizeof(HTTP2_RESP(400)) - 1,
HTTP2_DETAIL_MSG(400)); GRPC_STATUS_INTERNAL, HTTP2_DETAIL_MSG(400));
run_test(HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1, run_test(true, true, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1,
GRPC_STATUS_UNIMPLEMENTED, HTTP2_DETAIL_MSG(404)); GRPC_STATUS_UNIMPLEMENTED, HTTP2_DETAIL_MSG(404));
run_test(HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1, GRPC_STATUS_UNKNOWN, run_test(true, true, HTTP2_RESP(500), sizeof(HTTP2_RESP(500)) - 1,
HTTP2_DETAIL_MSG(500)); GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(500));
/* status not defined in hpack static table */ /* status not defined in hpack static table */
run_test(HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1, run_test(true, true, HTTP2_RESP(401), sizeof(HTTP2_RESP(401)) - 1,
GRPC_STATUS_UNAUTHENTICATED, HTTP2_DETAIL_MSG(401)); GRPC_STATUS_UNAUTHENTICATED, HTTP2_DETAIL_MSG(401));
run_test(HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1, run_test(true, true, HTTP2_RESP(403), sizeof(HTTP2_RESP(403)) - 1,
GRPC_STATUS_PERMISSION_DENIED, HTTP2_DETAIL_MSG(403)); GRPC_STATUS_PERMISSION_DENIED, HTTP2_DETAIL_MSG(403));
run_test(HTTP2_RESP(429), sizeof(HTTP2_RESP(429)) - 1, run_test(true, true, HTTP2_RESP(429), sizeof(HTTP2_RESP(429)) - 1,
GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(429)); GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(429));
run_test(HTTP2_RESP(499), sizeof(HTTP2_RESP(499)) - 1, GRPC_STATUS_UNKNOWN, run_test(true, true, HTTP2_RESP(499), sizeof(HTTP2_RESP(499)) - 1,
HTTP2_DETAIL_MSG(499)); GRPC_STATUS_UNKNOWN, HTTP2_DETAIL_MSG(499));
run_test(HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1, run_test(true, true, HTTP2_RESP(502), sizeof(HTTP2_RESP(502)) - 1,
GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(502)); GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(502));
run_test(HTTP2_RESP(503), sizeof(HTTP2_RESP(503)) - 1, run_test(true, true, HTTP2_RESP(503), sizeof(HTTP2_RESP(503)) - 1,
GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(503)); GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(503));
run_test(HTTP2_RESP(504), sizeof(HTTP2_RESP(504)) - 1, run_test(true, true, HTTP2_RESP(504), sizeof(HTTP2_RESP(504)) - 1,
GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(504)); GRPC_STATUS_UNAVAILABLE, HTTP2_DETAIL_MSG(504));
/* unparseable response. RPC should fail immediately due to a connect failure.
/* unparseable response */ */
run_test(UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1, GRPC_STATUS_UNKNOWN, run_test(false, false, UNPARSEABLE_RESP, sizeof(UNPARSEABLE_RESP) - 1,
nullptr); GRPC_STATUS_UNAVAILABLE, nullptr);
/* http1 response */ /* http1 response. RPC should fail immediately due to a connect failure. */
run_test(HTTP1_RESP_400, sizeof(HTTP1_RESP_400) - 1, GRPC_STATUS_INTERNAL, run_test(false, false, HTTP1_RESP_400, sizeof(HTTP1_RESP_400) - 1,
HTTP1_DETAIL_MSG); GRPC_STATUS_UNAVAILABLE, nullptr);
/* http2 response without sending a SETTINGs frame. RPC should fail with
* DEADLINE_EXCEEDED since the RPC deadline is lower than the connection
* attempt deadline. */
run_test(true, false, HTTP2_RESP(404), sizeof(HTTP2_RESP(404)) - 1,
GRPC_STATUS_DEADLINE_EXCEEDED, nullptr);
grpc_shutdown(); grpc_shutdown();
return 0; return 0;
} }

@ -198,6 +198,10 @@ static void server_thread(void* arg) {
gpr_log(GPR_INFO, "Handshake successful."); gpr_log(GPR_INFO, "Handshake successful.");
} }
// Send out the settings frame.
const char settings_frame[] = "\x00\x00\x00\x04\x00\x00\x00\x00\x00";
SSL_write(ssl, settings_frame, sizeof(settings_frame) - 1);
// Wait until the client drops its connection. // Wait until the client drops its connection.
char buf; char buf;
while (SSL_read(ssl, &buf, sizeof(buf)) > 0) while (SSL_read(ssl, &buf, sizeof(buf)) > 0)

@ -18,6 +18,7 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <thread>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/time.h> #include <grpc/support/time.h>
@ -184,6 +185,7 @@ class FilterEnd2endTest : public ::testing::Test {
// The string needs to be long enough to test heap-based slice. // The string needs to be long enough to test heap-based slice.
send_request.set_message("Hello world. Hello world. Hello world."); send_request.set_message("Hello world. Hello world. Hello world.");
std::thread request_call([this]() { server_ok(4); });
std::unique_ptr<GenericClientAsyncReaderWriter> call = std::unique_ptr<GenericClientAsyncReaderWriter> call =
generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
call->StartCall(tag(1)); call->StartCall(tag(1));
@ -200,7 +202,7 @@ class FilterEnd2endTest : public ::testing::Test {
generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
srv_cq_.get(), tag(4)); srv_cq_.get(), tag(4));
verify_ok(srv_cq_.get(), 4, true); request_call.join();
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());
ByteBuffer recv_buffer; ByteBuffer recv_buffer;
@ -278,6 +280,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
send_request.set_message("Hello"); send_request.set_message("Hello");
std::thread request_call([this]() { server_ok(2); });
std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream = std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
cli_stream->StartCall(tag(1)); cli_stream->StartCall(tag(1));
@ -286,7 +289,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
verify_ok(srv_cq_.get(), 2, true); request_call.join();
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());

@ -140,6 +140,7 @@ class GenericEnd2endTest : public ::testing::Test {
delete method_name; // Make sure that this is not needed after invocation delete method_name; // Make sure that this is not needed after invocation
std::thread request_call([this]() { server_ok(4); });
call->StartCall(tag(1)); call->StartCall(tag(1));
client_ok(1); client_ok(1);
std::unique_ptr<ByteBuffer> send_buffer = std::unique_ptr<ByteBuffer> send_buffer =
@ -154,7 +155,7 @@ class GenericEnd2endTest : public ::testing::Test {
generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
srv_cq_.get(), tag(4)); srv_cq_.get(), tag(4));
verify_ok(srv_cq_.get(), 4, true); request_call.join();
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());
@ -282,7 +283,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
std::unique_ptr<ByteBuffer> cli_send_buffer = std::unique_ptr<ByteBuffer> cli_send_buffer =
SerializeToByteBuffer(&send_request); SerializeToByteBuffer(&send_request);
// Use the same cq as server so that events can be polled in time. std::thread request_call([this]() { server_ok(4); });
std::unique_ptr<GenericClientAsyncResponseReader> call = std::unique_ptr<GenericClientAsyncResponseReader> call =
generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName, generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
*cli_send_buffer.get(), &cli_cq_); *cli_send_buffer.get(), &cli_cq_);
@ -293,8 +294,7 @@ TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
srv_cq_.get(), tag(4)); srv_cq_.get(), tag(4));
request_call.join();
server_ok(4);
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());
@ -337,6 +337,7 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
send_request.set_message("Hello"); send_request.set_message("Hello");
std::thread request_call([this]() { server_ok(2); });
std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream = std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_); generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
cli_stream->StartCall(tag(1)); cli_stream->StartCall(tag(1));
@ -344,8 +345,8 @@ TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
srv_cq_.get(), tag(2)); srv_cq_.get(), tag(2));
request_call.join();
verify_ok(srv_cq_.get(), 2, true);
EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());

@ -536,6 +536,8 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
send_request.set_message("Hello"); send_request.set_message("Hello");
cli_ctx.AddMetadata("testkey", "testvalue"); cli_ctx.AddMetadata("testkey", "testvalue");
CompletionQueue* cq = srv_cq.get();
std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
std::unique_ptr<GenericClientAsyncReaderWriter> call = std::unique_ptr<GenericClientAsyncReaderWriter> call =
generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq); generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
call->StartCall(tag(1)); call->StartCall(tag(1));
@ -551,7 +553,7 @@ TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4)); service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
Verifier().Expect(4, true).Verify(srv_cq.get()); request_call.join();
EXPECT_EQ(kMethodName, srv_ctx.method()); EXPECT_EQ(kMethodName, srv_ctx.method());
EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue")); EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
srv_ctx.AddTrailingMetadata("testkey", "testvalue"); srv_ctx.AddTrailingMetadata("testkey", "testvalue");

Loading…
Cancel
Save