Merge pull request #13336 from markdroth/server_connection_timeout

On server, include receiving HTTP/2 settings in handshake timeout
reviewable/pr13494/r6^2
Mark D. Roth 7 years ago committed by GitHub
commit 6fa206de8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      CMakeLists.txt
  2. 48
      Makefile
  3. 12
      build.yaml
  4. 3
      include/grpc/impl/codegen/grpc_types.h
  5. 30
      src/core/ext/transport/chttp2/client/chttp2_connector.cc
  6. 4
      src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
  7. 87
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  8. 4
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  9. 21
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  10. 10
      src/core/ext/transport/chttp2/transport/chttp2_transport.h
  11. 5
      src/core/ext/transport/chttp2/transport/frame_settings.cc
  12. 2
      src/core/ext/transport/chttp2/transport/internal.h
  13. 3
      src/core/lib/transport/transport.h
  14. 5
      test/core/bad_client/bad_client.cc
  15. 8
      test/core/end2end/fixtures/h2_sockpair+trace.cc
  16. 8
      test/core/end2end/fixtures/h2_sockpair.cc
  17. 8
      test/core/end2end/fixtures/h2_sockpair_1byte.cc
  18. 4
      test/core/end2end/fuzzers/api_fuzzer.cc
  19. 4
      test/core/end2end/fuzzers/client_fuzzer.cc
  20. 4
      test/core/end2end/fuzzers/server_fuzzer.cc
  21. 15
      test/core/transport/chttp2/BUILD
  22. 258
      test/core/transport/chttp2/settings_timeout_test.cc
  23. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  24. 10
      test/cpp/microbenchmarks/fullstack_fixtures.h
  25. 12
      test/cpp/performance/writes_per_rpc_test.cc
  26. 17
      tools/run_tests/generated/sources_and_headers.json
  27. 24
      tools/run_tests/generated/tests.json

@ -678,6 +678,7 @@ add_dependencies(buildtests_cxx bm_pollset)
endif()
add_dependencies(buildtests_cxx channel_arguments_test)
add_dependencies(buildtests_cxx channel_filter_test)
add_dependencies(buildtests_cxx chttp2_settings_timeout_test)
add_dependencies(buildtests_cxx cli_call_test)
add_dependencies(buildtests_cxx client_channel_stress_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
@ -9863,6 +9864,44 @@ target_link_libraries(channel_filter_test
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(chttp2_settings_timeout_test
test/core/transport/chttp2/settings_timeout_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(chttp2_settings_timeout_test
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include
PRIVATE ${BORINGSSL_ROOT_DIR}/include
PRIVATE ${PROTOBUF_ROOT_DIR}/src
PRIVATE ${BENCHMARK_ROOT_DIR}/include
PRIVATE ${ZLIB_ROOT_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/zlib
PRIVATE ${CARES_INCLUDE_DIR}
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/cares/cares
PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/third_party/gflags/include
PRIVATE third_party/googletest/googletest/include
PRIVATE third_party/googletest/googletest
PRIVATE third_party/googletest/googlemock/include
PRIVATE third_party/googletest/googlemock
PRIVATE ${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(chttp2_settings_timeout_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_test_util
grpc
gpr_test_util
gpr
${_gRPC_GFLAGS_LIBRARIES}
)
endif (gRPC_BUILD_TESTS)
if (gRPC_BUILD_TESTS)
add_executable(cli_call_test
test/cpp/util/cli_call_test.cc
third_party/googletest/googletest/src/gtest-all.cc

@ -1114,6 +1114,7 @@ bm_metadata: $(BINDIR)/$(CONFIG)/bm_metadata
bm_pollset: $(BINDIR)/$(CONFIG)/bm_pollset
channel_arguments_test: $(BINDIR)/$(CONFIG)/channel_arguments_test
channel_filter_test: $(BINDIR)/$(CONFIG)/channel_filter_test
chttp2_settings_timeout_test: $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
cli_call_test: $(BINDIR)/$(CONFIG)/cli_call_test
client_channel_stress_test: $(BINDIR)/$(CONFIG)/client_channel_stress_test
client_crash_test: $(BINDIR)/$(CONFIG)/client_crash_test
@ -1557,6 +1558,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
@ -1684,6 +1686,7 @@ buildtests_cxx: privatelibs_cxx \
$(BINDIR)/$(CONFIG)/bm_pollset \
$(BINDIR)/$(CONFIG)/channel_arguments_test \
$(BINDIR)/$(CONFIG)/channel_filter_test \
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test \
$(BINDIR)/$(CONFIG)/cli_call_test \
$(BINDIR)/$(CONFIG)/client_channel_stress_test \
$(BINDIR)/$(CONFIG)/client_crash_test \
@ -2069,6 +2072,8 @@ test_cxx: buildtests_cxx
$(Q) $(BINDIR)/$(CONFIG)/channel_arguments_test || ( echo test channel_arguments_test failed ; exit 1 )
$(E) "[RUN] Testing channel_filter_test"
$(Q) $(BINDIR)/$(CONFIG)/channel_filter_test || ( echo test channel_filter_test failed ; exit 1 )
$(E) "[RUN] Testing chttp2_settings_timeout_test"
$(Q) $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test || ( echo test chttp2_settings_timeout_test failed ; exit 1 )
$(E) "[RUN] Testing cli_call_test"
$(Q) $(BINDIR)/$(CONFIG)/cli_call_test || ( echo test cli_call_test failed ; exit 1 )
$(E) "[RUN] Testing client_channel_stress_test"
@ -14369,6 +14374,49 @@ endif
endif
CHTTP2_SETTINGS_TIMEOUT_TEST_SRC = \
test/core/transport/chttp2/settings_timeout_test.cc \
CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CHTTP2_SETTINGS_TIMEOUT_TEST_SRC))))
ifeq ($(NO_SECURE),true)
# You can't build secure targets if you don't have OpenSSL.
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: openssl_dep_error
else
ifeq ($(NO_PROTOBUF),true)
# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+.
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: protobuf_dep_error
else
$(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test: $(PROTOBUF_DEP) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(E) "[LD] Linking $@"
$(Q) mkdir -p `dirname $@`
$(Q) $(LDXX) $(LDFLAGS) $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/chttp2_settings_timeout_test
endif
endif
$(OBJDIR)/$(CONFIG)/test/core/transport/chttp2/settings_timeout_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_chttp2_settings_timeout_test: $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)
ifneq ($(NO_DEPS),true)
-include $(CHTTP2_SETTINGS_TIMEOUT_TEST_OBJS:.o=.dep)
endif
endif
CLI_CALL_TEST_SRC = \
test/cpp/util/cli_call_test.cc \

@ -3824,6 +3824,18 @@ targets:
- grpc
- gpr
uses_polling: false
- name: chttp2_settings_timeout_test
gtest: true
build: test
language: c++
src:
- test/core/transport/chttp2/settings_timeout_test.cc
deps:
- grpc_test_util
- grpc
- gpr_test_util
- gpr
uses_polling: true
- name: cli_call_test
gtest: true
build: test

@ -240,6 +240,9 @@ typedef struct {
/** The time between the first and second connection attempts, in ms */
#define GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS \
"grpc.initial_reconnect_backoff_ms"
/** The timeout used on servers for finishing handshaking on an incoming
connection. Defaults to 120 seconds. */
#define GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS "grpc.server_handshake_timeout_ms"
/** This *should* be used for testing only.
The caller of the secure_channel_create functions may override the target
name used for SSL host name checking using this channel argument which is of

@ -117,11 +117,35 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
} else {
grpc_endpoint_delete_from_pollset_set(exec_ctx, args->endpoint,
c->args.interested_parties);
c->result->transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 1);
c->result->transport = grpc_create_chttp2_transport(exec_ctx, args->args,
args->endpoint, true);
GPR_ASSERT(c->result->transport);
// TODO(roth): We ideally want to wait until we receive HTTP/2
// settings from the server before we consider the connection
// established. If that doesn't happen before the connection
// timeout expires, then we should consider the connection attempt a
// failure and feed that information back into the backoff code.
// We could pass a notify_on_receive_settings callback to
// grpc_chttp2_transport_start_reading() to let us know when
// settings are received, but we would need to figure out how to use
// that information here.
//
// Unfortunately, we don't currently have a way to split apart the two
// effects of scheduling c->notify: we start sending RPCs immediately
// (which we want to do) and we consider the connection attempt successful
// (which we don't want to do until we get the notify_on_receive_settings
// callback from the transport). If we could split those things
// apart, then we could start sending RPCs but then wait for our
// timeout before deciding if the connection attempt is successful.
// If the attempt is not successful, then we would tear down the
// transport and feed the failure back into the backoff code.
//
// In addition, even if we did that, we would probably not want to do
// so until after transparent retries is implemented. Otherwise, any
// RPC that we attempt to send on the connection before the timeout
// would fail instead of being retried on a subsequent attempt.
grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport,
args->read_buffer);
args->read_buffer, nullptr);
c->result->channel_args = args->args;
}
grpc_closure* notify = c->notify;

@ -53,12 +53,12 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
&exec_ctx, grpc_fd_create(fd, "client"), args, "fd-client");
grpc_transport* transport =
grpc_create_chttp2_transport(&exec_ctx, final_args, client, 1);
grpc_create_chttp2_transport(&exec_ctx, final_args, client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_channel_args_destroy(&exec_ctx, final_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);

@ -21,6 +21,7 @@
#include <grpc/grpc.h>
#include <inttypes.h>
#include <limits.h>
#include <string.h>
#include <grpc/support/alloc.h>
@ -31,6 +32,7 @@
#include "src/core/ext/filters/http/server/http_server_filter.h"
#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/channel/handshaker_registry.h"
@ -53,12 +55,52 @@ typedef struct {
} server_state;
typedef struct {
gpr_refcount refs;
server_state* svr_state;
grpc_pollset* accepting_pollset;
grpc_tcp_server_acceptor* acceptor;
grpc_handshake_manager* handshake_mgr;
// State for enforcing handshake timeout on receiving HTTP/2 settings.
grpc_chttp2_transport* transport;
grpc_millis deadline;
grpc_timer timer;
grpc_closure on_timeout;
grpc_closure on_receive_settings;
} server_connection_state;
static void server_connection_state_unref(
grpc_exec_ctx* exec_ctx, server_connection_state* connection_state) {
if (gpr_unref(&connection_state->refs)) {
if (connection_state->transport != nullptr) {
GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, connection_state->transport,
"receive settings timeout");
}
gpr_free(connection_state);
}
}
static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
server_connection_state* connection_state = (server_connection_state*)arg;
// Note that we may be called with GRPC_ERROR_NONE when the timer fires
// or with an error indicating that the timer system is being shut down.
if (error != GRPC_ERROR_CANCELLED) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Did not receive HTTP/2 settings before handshake timeout");
grpc_transport_perform_op(exec_ctx, &connection_state->transport->base, op);
}
server_connection_state_unref(exec_ctx, connection_state);
}
static void on_receive_settings(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
server_connection_state* connection_state = (server_connection_state*)arg;
if (error == GRPC_ERROR_NONE) {
grpc_timer_cancel(exec_ctx, &connection_state->timer);
}
server_connection_state_unref(exec_ctx, connection_state);
}
static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_handshaker_args* args = (grpc_handshaker_args*)arg;
@ -68,7 +110,6 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) {
const char* error_str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str);
if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) {
// We were shut down after handshaking completed successfully, so
// destroy the endpoint here.
@ -87,14 +128,30 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
if (args->endpoint != nullptr) {
grpc_transport* transport =
grpc_create_chttp2_transport(exec_ctx, args->args, args->endpoint, 0);
grpc_transport* transport = grpc_create_chttp2_transport(
exec_ctx, args->args, args->endpoint, false);
grpc_server_setup_transport(
exec_ctx, connection_state->svr_state->server, transport,
connection_state->accepting_pollset, args->args);
grpc_chttp2_transport_start_reading(exec_ctx, transport,
args->read_buffer);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_state->transport = (grpc_chttp2_transport*)transport;
gpr_ref(&connection_state->refs);
GRPC_CLOSURE_INIT(&connection_state->on_receive_settings,
on_receive_settings, connection_state,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(
exec_ctx, transport, args->read_buffer,
&connection_state->on_receive_settings);
grpc_channel_args_destroy(exec_ctx, args->args);
gpr_ref(&connection_state->refs);
GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport,
"receive settings timeout");
GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout,
connection_state, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &connection_state->timer,
connection_state->deadline,
&connection_state->on_timeout);
}
}
grpc_handshake_manager_pending_list_remove(
@ -102,9 +159,9 @@ static void on_handshake_done(grpc_exec_ctx* exec_ctx, void* arg,
connection_state->handshake_mgr);
gpr_mu_unlock(&connection_state->svr_state->mu);
grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr);
grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
gpr_free(connection_state->acceptor);
gpr_free(connection_state);
grpc_tcp_server_unref(exec_ctx, connection_state->svr_state->tcp_server);
server_connection_state_unref(exec_ctx, connection_state);
}
static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
@ -125,19 +182,23 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, grpc_endpoint* tcp,
gpr_mu_unlock(&state->mu);
grpc_tcp_server_ref(state->tcp_server);
server_connection_state* connection_state =
(server_connection_state*)gpr_malloc(sizeof(*connection_state));
(server_connection_state*)gpr_zalloc(sizeof(*connection_state));
gpr_ref_init(&connection_state->refs, 1);
connection_state->svr_state = state;
connection_state->accepting_pollset = accepting_pollset;
connection_state->acceptor = acceptor;
connection_state->handshake_mgr = handshake_mgr;
grpc_handshakers_add(exec_ctx, HANDSHAKER_SERVER, state->args,
connection_state->handshake_mgr);
// TODO(roth): We should really get this timeout value from channel
// args instead of hard-coding it.
const grpc_millis deadline =
grpc_exec_ctx_now(exec_ctx) + 120 * GPR_MS_PER_SEC;
const grpc_arg* timeout_arg =
grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
connection_state->deadline =
grpc_exec_ctx_now(exec_ctx) +
grpc_channel_arg_get_integer(timeout_arg,
{120 * GPR_MS_PER_SEC, 1, INT_MAX});
grpc_handshake_manager_do_handshake(exec_ctx, connection_state->handshake_mgr,
tcp, state->args, deadline, acceptor,
tcp, state->args,
connection_state->deadline, acceptor,
on_handshake_done, connection_state);
}

@ -50,7 +50,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
const grpc_channel_args* server_args = grpc_server_get_channel_args(server);
grpc_transport* transport = grpc_create_chttp2_transport(
&exec_ctx, server_args, server_endpoint, 0 /* is_client */);
&exec_ctx, server_args, server_endpoint, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@ -62,7 +62,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
grpc_server_setup_transport(&exec_ctx, server, transport, nullptr,
server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -652,6 +652,11 @@ static void close_transport_locked(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error));
}
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
GRPC_ERROR_CANCELLED);
t->notify_on_receive_settings = nullptr;
}
GRPC_ERROR_UNREF(error);
}
@ -1791,7 +1796,6 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
grpc_transport_op* op = (grpc_transport_op*)stream_op;
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)op->handler_private.extra_arg;
grpc_error* close_transport = op->disconnect_with_error;
if (op->goaway_error) {
send_goaway(exec_ctx, t, op->goaway_error);
@ -1823,8 +1827,8 @@ static void perform_transport_op_locked(grpc_exec_ctx* exec_ctx,
op->on_connectivity_state_change);
}
if (close_transport != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, close_transport);
if (op->disconnect_with_error != GRPC_ERROR_NONE) {
close_transport_locked(exec_ctx, t, op->disconnect_with_error);
}
GRPC_CLOSURE_RUN(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
@ -3232,16 +3236,16 @@ static const grpc_transport_vtable* get_vtable(void) { return &vtable; }
grpc_transport* grpc_create_chttp2_transport(
grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
grpc_endpoint* ep, int is_client) {
grpc_endpoint* ep, bool is_client) {
grpc_chttp2_transport* t =
(grpc_chttp2_transport*)gpr_zalloc(sizeof(grpc_chttp2_transport));
init_transport(exec_ctx, t, channel_args, ep, is_client != 0);
init_transport(exec_ctx, t, channel_args, ep, is_client);
return &t->base;
}
void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
grpc_transport* transport,
grpc_slice_buffer* read_buffer) {
void grpc_chttp2_transport_start_reading(
grpc_exec_ctx* exec_ctx, grpc_transport* transport,
grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings) {
grpc_chttp2_transport* t = (grpc_chttp2_transport*)transport;
GRPC_CHTTP2_REF_TRANSPORT(
t, "reading_action"); /* matches unref inside reading_action */
@ -3249,5 +3253,6 @@ void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
gpr_free(read_buffer);
}
t->notify_on_receive_settings = notify_on_receive_settings;
GRPC_CLOSURE_SCHED(exec_ctx, &t->read_action_locked, GRPC_ERROR_NONE);
}

@ -29,12 +29,14 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount;
grpc_transport* grpc_create_chttp2_transport(
grpc_exec_ctx* exec_ctx, const grpc_channel_args* channel_args,
grpc_endpoint* ep, int is_client);
grpc_endpoint* ep, bool is_client);
/// Takes ownership of \a read_buffer, which (if non-NULL) contains
/// leftover bytes previously read from the endpoint (e.g., by handshakers).
void grpc_chttp2_transport_start_reading(grpc_exec_ctx* exec_ctx,
grpc_transport* transport,
grpc_slice_buffer* read_buffer);
/// If non-null, \a notify_on_receive_settings will be scheduled when
/// HTTP/2 settings are received from the peer.
void grpc_chttp2_transport_start_reading(
grpc_exec_ctx* exec_ctx, grpc_transport* transport,
grpc_slice_buffer* read_buffer, grpc_closure* notify_on_receive_settings);
#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_CHTTP2_TRANSPORT_H */

@ -131,6 +131,11 @@ grpc_error* grpc_chttp2_settings_parser_parse(grpc_exec_ctx* exec_ctx, void* p,
memcpy(parser->target_settings, parser->incoming_settings,
GRPC_CHTTP2_NUM_SETTINGS * sizeof(uint32_t));
grpc_slice_buffer_add(&t->qbuf, grpc_chttp2_settings_ack_create());
if (t->notify_on_receive_settings != nullptr) {
GRPC_CLOSURE_SCHED(exec_ctx, t->notify_on_receive_settings,
GRPC_ERROR_NONE);
t->notify_on_receive_settings = nullptr;
}
}
return GRPC_ERROR_NONE;
}

@ -241,6 +241,8 @@ struct grpc_chttp2_transport {
grpc_combiner* combiner;
grpc_closure* notify_on_receive_settings;
/** write execution state of the transport */
grpc_chttp2_write_state write_state;
/** is this the first write in a series of writes?

@ -327,9 +327,6 @@ void grpc_transport_ping(grpc_transport* transport, grpc_closure* cb);
void grpc_transport_goaway(grpc_transport* transport, grpc_status_code status,
grpc_slice debug_data);
/* Close a transport. Aborts all open streams. */
void grpc_transport_close(grpc_transport* transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_exec_ctx* exec_ctx, grpc_transport* transport);

@ -115,9 +115,10 @@ void grpc_run_bad_client_test(
GRPC_BAD_CLIENT_REGISTERED_HOST,
GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER, 0);
grpc_server_start(a.server);
transport = grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, 0);
transport =
grpc_create_chttp2_transport(&exec_ctx, nullptr, sfd.server, false);
server_setup_transport(&a, transport);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
/* Bind everything into the same pollset */

@ -97,10 +97,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -114,9 +114,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -91,10 +91,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -108,9 +108,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -102,10 +102,10 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
cs.client_args = client_args;
cs.f = f;
transport =
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, 1);
grpc_create_chttp2_transport(&exec_ctx, client_args, sfd->client, true);
client_setup_transport(&exec_ctx, &cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}
@ -119,9 +119,9 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_register_completion_queue(f->server, f->cq, nullptr);
grpc_server_start(f->server);
transport =
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, 0);
grpc_create_chttp2_transport(&exec_ctx, server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_exec_ctx_finish(&exec_ctx);
}

@ -468,10 +468,10 @@ static void do_connect(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
*fc->ep = client;
grpc_transport* transport =
grpc_create_chttp2_transport(exec_ctx, nullptr, server, 0);
grpc_create_chttp2_transport(exec_ctx, nullptr, server, false);
grpc_server_setup_transport(exec_ctx, g_server, transport, nullptr,
nullptr);
grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(exec_ctx, transport, nullptr, nullptr);
GRPC_CLOSURE_SCHED(exec_ctx, fc->closure, GRPC_ERROR_NONE);
} else {

@ -54,8 +54,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
grpc_transport* transport =
grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 1);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, true);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, "test-target", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, transport);

@ -61,9 +61,9 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
// grpc_server_register_method(server, "/reg", NULL, 0);
grpc_server_start(server);
grpc_transport* transport =
grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, 0);
grpc_create_chttp2_transport(&exec_ctx, nullptr, mock_endpoint, false);
grpc_server_setup_transport(&exec_ctx, server, transport, nullptr, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr, nullptr);
grpc_call* call1 = nullptr;
grpc_call_details call_details1;

@ -114,6 +114,21 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "settings_timeout_test",
srcs = ["settings_timeout_test.cc"],
language = "C++",
deps = [
"//:gpr",
"//:grpc",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
],
external_deps = [
"gtest",
],
)
grpc_cc_test(
name = "varint_test",
srcs = ["varint_test.cc"],

@ -0,0 +1,258 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
namespace grpc_core {
namespace test {
namespace {
// A gRPC server, running in its own thread.
class ServerThread {
public:
explicit ServerThread(const char* address) : address_(address) {}
void Start() {
// Start server with 1-second handshake timeout.
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS);
arg.value.integer = 1000;
grpc_channel_args args = {1, &arg};
server_ = grpc_server_create(&args, nullptr);
ASSERT_TRUE(grpc_server_add_insecure_http2_port(server_, address_));
cq_ = grpc_completion_queue_create_for_next(nullptr);
grpc_server_register_completion_queue(server_, cq_, nullptr);
grpc_server_start(server_);
thread_.reset(new std::thread(std::bind(&ServerThread::Serve, this)));
}
void Shutdown() {
grpc_completion_queue* shutdown_cq =
grpc_completion_queue_create_for_pluck(nullptr);
grpc_server_shutdown_and_notify(server_, shutdown_cq, nullptr);
GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, nullptr,
grpc_timeout_seconds_to_deadline(1),
nullptr)
.type == GRPC_OP_COMPLETE);
grpc_completion_queue_destroy(shutdown_cq);
grpc_server_destroy(server_);
grpc_completion_queue_destroy(cq_);
thread_->join();
}
private:
void Serve() {
// The completion queue should not return anything other than shutdown.
grpc_event ev = grpc_completion_queue_next(
cq_, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
ASSERT_EQ(GRPC_QUEUE_SHUTDOWN, ev.type);
}
const char* address_; // Do not own.
grpc_server* server_ = nullptr;
grpc_completion_queue* cq_ = nullptr;
std::unique_ptr<std::thread> thread_;
};
// A TCP client that connects to the server, reads data until the server
// closes, and then terminates.
class Client {
public:
explicit Client(const char* server_address)
: server_address_(server_address) {}
void Connect() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_addresses* server_addresses = nullptr;
grpc_error* error =
grpc_blocking_resolve_address(server_address_, "80", &server_addresses);
ASSERT_EQ(GRPC_ERROR_NONE, error) << grpc_error_string(error);
ASSERT_GE(server_addresses->naddrs, 1UL);
pollset_ = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
grpc_pollset_init(pollset_, &mu_);
grpc_pollset_set* pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(&exec_ctx, pollset_set, pollset_);
EventState state;
grpc_tcp_client_connect(&exec_ctx, state.closure(), &endpoint_, pollset_set,
nullptr /* channel_args */, server_addresses->addrs,
1000);
ASSERT_TRUE(PollUntilDone(
&exec_ctx, &state,
grpc_timespec_to_millis_round_up(gpr_inf_future(GPR_CLOCK_MONOTONIC))));
ASSERT_EQ(GRPC_ERROR_NONE, state.error());
grpc_pollset_set_destroy(&exec_ctx, pollset_set);
grpc_endpoint_add_to_pollset(&exec_ctx, endpoint_, pollset_);
grpc_resolved_addresses_destroy(server_addresses);
grpc_exec_ctx_finish(&exec_ctx);
}
// Reads until an error is returned.
// Returns true if an error was encountered before the deadline.
bool ReadUntilError() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_slice_buffer read_buffer;
grpc_slice_buffer_init(&read_buffer);
bool retval = true;
// Use a deadline of 3 seconds, which is a lot more than we should
// need for a 1-second timeout, but this helps avoid flakes.
grpc_millis deadline = grpc_exec_ctx_now(&exec_ctx) + 3000;
while (true) {
EventState state;
grpc_endpoint_read(&exec_ctx, endpoint_, &read_buffer, state.closure());
if (!PollUntilDone(&exec_ctx, &state, deadline)) {
retval = false;
break;
}
if (state.error() != GRPC_ERROR_NONE) break;
gpr_log(GPR_INFO, "client read %" PRIuPTR " bytes", read_buffer.length);
grpc_slice_buffer_reset_and_unref_internal(&exec_ctx, &read_buffer);
}
grpc_endpoint_shutdown(&exec_ctx, endpoint_,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown"));
grpc_slice_buffer_destroy_internal(&exec_ctx, &read_buffer);
grpc_exec_ctx_finish(&exec_ctx);
return retval;
}
void Shutdown() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_destroy(&exec_ctx, endpoint_);
grpc_pollset_shutdown(&exec_ctx, pollset_,
GRPC_CLOSURE_CREATE(&Client::PollsetDestroy, pollset_,
grpc_schedule_on_exec_ctx));
grpc_exec_ctx_finish(&exec_ctx);
}
private:
// State used to wait for an I/O event.
class EventState {
public:
EventState() {
GRPC_CLOSURE_INIT(&closure_, &EventState::OnEventDone, this,
grpc_schedule_on_exec_ctx);
}
~EventState() { GRPC_ERROR_UNREF(error_); }
grpc_closure* closure() { return &closure_; }
bool done() const { return done_; }
// Caller does NOT take ownership of the error.
grpc_error* error() const { return error_; }
private:
static void OnEventDone(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
gpr_log(GPR_INFO, "OnEventDone(): %s", grpc_error_string(error));
EventState* state = (EventState*)arg;
state->error_ = GRPC_ERROR_REF(error);
state->done_ = true;
}
grpc_closure closure_;
bool done_ = false;
grpc_error* error_ = GRPC_ERROR_NONE;
};
// Returns true if done, or false if deadline exceeded.
bool PollUntilDone(grpc_exec_ctx* exec_ctx, EventState* state,
grpc_millis deadline) {
while (true) {
grpc_pollset_worker* worker = nullptr;
gpr_mu_lock(mu_);
GRPC_LOG_IF_ERROR("grpc_pollset_work",
grpc_pollset_work(exec_ctx, pollset_, &worker,
grpc_exec_ctx_now(exec_ctx) + 1000));
gpr_mu_unlock(mu_);
if (state != nullptr && state->done()) return true;
if (grpc_exec_ctx_now(exec_ctx) >= deadline) return false;
}
}
static void PollsetDestroy(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_pollset* pollset = (grpc_pollset*)arg;
grpc_pollset_destroy(exec_ctx, pollset);
gpr_free(pollset);
}
const char* server_address_; // Do not own.
grpc_endpoint* endpoint_;
gpr_mu* mu_;
grpc_pollset* pollset_;
};
TEST(SettingsTimeout, Basic) {
// Construct server address string.
const int server_port = grpc_pick_unused_port_or_die();
char* server_address_string;
gpr_asprintf(&server_address_string, "localhost:%d", server_port);
// Start server.
gpr_log(GPR_INFO, "starting server on %s", server_address_string);
ServerThread server_thread(server_address_string);
server_thread.Start();
// Create client and connect to server.
gpr_log(GPR_INFO, "starting client connect");
Client client(server_address_string);
client.Connect();
// Client read. Should fail due to server dropping connection.
gpr_log(GPR_INFO, "starting client read");
EXPECT_TRUE(client.ReadUntilError());
// Shut down client.
gpr_log(GPR_INFO, "shutting down client");
client.Shutdown();
// Shut down server.
gpr_log(GPR_INFO, "shutting down server");
server_thread.Shutdown();
// Clean up.
gpr_free(server_address_string);
}
} // namespace
} // namespace test
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
grpc_test_init(argc, argv);
grpc_init();
int result = RUN_ALL_TESTS();
grpc_shutdown();
return result;
}

@ -137,7 +137,7 @@ class Fixture {
grpc_channel_args c_args = args.c_channel_args();
ep_ = new DummyEndpoint;
t_ = grpc_create_chttp2_transport(exec_ctx(), &c_args, ep_, client);
grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr);
grpc_chttp2_transport_start_reading(exec_ctx(), t_, nullptr, nullptr);
FlushExecCtx();
}

@ -174,7 +174,7 @@ class EndpointPairFixture : public BaseFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
server_transport_ = grpc_create_chttp2_transport(
&exec_ctx, server_args, endpoints.server, 0 /* is_client */);
&exec_ctx, server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@ -186,7 +186,7 @@ class EndpointPairFixture : public BaseFixture {
grpc_server_setup_transport(&exec_ctx, server_->c_server(),
server_transport_, nullptr, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_,
grpc_chttp2_transport_start_reading(&exec_ctx, server_transport_, nullptr,
nullptr);
}
@ -197,13 +197,13 @@ class EndpointPairFixture : public BaseFixture {
fixture_configuration.ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
client_transport_ =
grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
client_transport_ = grpc_create_chttp2_transport(&exec_ctx, &c_args,
endpoints.client, true);
GPR_ASSERT(client_transport_);
grpc_channel* channel =
grpc_channel_create(&exec_ctx, "target", &c_args,
GRPC_CLIENT_DIRECT_CHANNEL, client_transport_);
grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_,
grpc_chttp2_transport_start_reading(&exec_ctx, client_transport_, nullptr,
nullptr);
channel_ = CreateChannelInternal("", channel);

@ -89,7 +89,7 @@ class EndpointPairFixture {
const grpc_channel_args* server_args =
grpc_server_get_channel_args(server_->c_server());
grpc_transport* transport = grpc_create_chttp2_transport(
&exec_ctx, server_args, endpoints.server, 0 /* is_client */);
&exec_ctx, server_args, endpoints.server, false /* is_client */);
grpc_pollset** pollsets;
size_t num_pollsets = 0;
@ -101,7 +101,8 @@ class EndpointPairFixture {
grpc_server_setup_transport(&exec_ctx, server_->c_server(), transport,
nullptr, server_args);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
nullptr);
}
/* create channel */
@ -111,12 +112,13 @@ class EndpointPairFixture {
ApplyCommonChannelArguments(&args);
grpc_channel_args c_args = args.c_channel_args();
grpc_transport* transport =
grpc_create_chttp2_transport(&exec_ctx, &c_args, endpoints.client, 1);
grpc_transport* transport = grpc_create_chttp2_transport(
&exec_ctx, &c_args, endpoints.client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
&exec_ctx, "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr);
grpc_chttp2_transport_start_reading(&exec_ctx, transport, nullptr,
nullptr);
channel_ = CreateChannelInternal("", channel);
}

@ -2891,6 +2891,23 @@
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",
"gpr_test_util",
"grpc",
"grpc_test_util"
],
"headers": [],
"is_filegroup": false,
"language": "c++",
"name": "chttp2_settings_timeout_test",
"src": [
"test/core/transport/chttp2/settings_timeout_test.cc"
],
"third_party": false,
"type": "target"
},
{
"deps": [
"gpr",

@ -3361,6 +3361,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "chttp2_settings_timeout_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save