Merge pull request #24577 from yang-g/init_failure

Do not crash if server filter fails at ChannelData::Init
pull/24600/head
Yang Gao 4 years ago committed by GitHub
commit e6e6be4b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 4
      build_autogenerated.yaml
  3. 2
      gRPC-Core.podspec
  4. 4
      grpc.gyp
  5. 28
      src/core/ext/transport/chttp2/client/insecure/channel_create.cc
  6. 28
      src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc
  7. 28
      src/core/ext/transport/chttp2/client/secure/secure_channel_create.cc
  8. 62
      src/core/ext/transport/chttp2/server/chttp2_server.cc
  9. 17
      src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc
  10. 41
      src/core/ext/transport/inproc/inproc_transport.cc
  11. 22
      src/core/lib/surface/channel.cc
  12. 5
      src/core/lib/surface/channel.h
  13. 9
      src/core/lib/surface/server.cc
  14. 10
      src/core/lib/surface/server.h
  15. 16
      test/core/end2end/end2end_nosec_tests.cc
  16. 16
      test/core/end2end/end2end_tests.cc
  17. 29
      test/core/end2end/fixtures/h2_sockpair+trace.cc
  18. 29
      test/core/end2end/fixtures/h2_sockpair.cc
  19. 29
      test/core/end2end/fixtures/h2_sockpair_1byte.cc
  20. 2
      test/core/end2end/generate_tests.bzl
  21. 63
      test/core/end2end/tests/filter_init_fails.cc

@ -1042,9 +1042,9 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/default_host.cc
test/core/end2end/tests/disappearing_server.cc
test/core/end2end/tests/empty_batch.cc
test/core/end2end/tests/filter_call_init_fails.cc
test/core/end2end/tests/filter_causes_close.cc
test/core/end2end/tests/filter_context.cc
test/core/end2end/tests/filter_init_fails.cc
test/core/end2end/tests/filter_latency.cc
test/core/end2end/tests/filter_status_code.cc
test/core/end2end/tests/graceful_server_shutdown.cc
@ -1176,9 +1176,9 @@ add_library(end2end_tests
test/core/end2end/tests/default_host.cc
test/core/end2end/tests/disappearing_server.cc
test/core/end2end/tests/empty_batch.cc
test/core/end2end/tests/filter_call_init_fails.cc
test/core/end2end/tests/filter_causes_close.cc
test/core/end2end/tests/filter_context.cc
test/core/end2end/tests/filter_init_fails.cc
test/core/end2end/tests/filter_latency.cc
test/core/end2end/tests/filter_status_code.cc
test/core/end2end/tests/graceful_server_shutdown.cc

@ -55,9 +55,9 @@ libs:
- test/core/end2end/tests/default_host.cc
- test/core/end2end/tests/disappearing_server.cc
- test/core/end2end/tests/empty_batch.cc
- test/core/end2end/tests/filter_call_init_fails.cc
- test/core/end2end/tests/filter_causes_close.cc
- test/core/end2end/tests/filter_context.cc
- test/core/end2end/tests/filter_init_fails.cc
- test/core/end2end/tests/filter_latency.cc
- test/core/end2end/tests/filter_status_code.cc
- test/core/end2end/tests/graceful_server_shutdown.cc
@ -165,9 +165,9 @@ libs:
- test/core/end2end/tests/default_host.cc
- test/core/end2end/tests/disappearing_server.cc
- test/core/end2end/tests/empty_batch.cc
- test/core/end2end/tests/filter_call_init_fails.cc
- test/core/end2end/tests/filter_causes_close.cc
- test/core/end2end/tests/filter_context.cc
- test/core/end2end/tests/filter_init_fails.cc
- test/core/end2end/tests/filter_latency.cc
- test/core/end2end/tests/filter_status_code.cc
- test/core/end2end/tests/graceful_server_shutdown.cc

@ -1961,9 +1961,9 @@ Pod::Spec.new do |s|
'test/core/end2end/tests/default_host.cc',
'test/core/end2end/tests/disappearing_server.cc',
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_init_fails.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',

@ -210,9 +210,9 @@
'test/core/end2end/tests/default_host.cc',
'test/core/end2end/tests/disappearing_server.cc',
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_init_fails.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',
@ -313,9 +313,9 @@
'test/core/end2end/tests/default_host.cc',
'test/core/end2end/tests/disappearing_server.cc',
'test/core/end2end/tests/empty_batch.cc',
'test/core/end2end/tests/filter_call_init_fails.cc',
'test/core/end2end/tests/filter_causes_close.cc',
'test/core/end2end/tests/filter_context.cc',
'test/core/end2end/tests/filter_init_fails.cc',
'test/core/end2end/tests/filter_latency.cc',
'test/core/end2end/tests/filter_status_code.cc',
'test/core/end2end/tests/graceful_server_shutdown.cc',

@ -49,9 +49,13 @@ class Chttp2InsecureClientChannelFactory : public ClientChannelFactory {
namespace {
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args) {
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args,
grpc_error** error) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
if (error != nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel target is NULL");
}
return nullptr;
}
// Add channel arg containing the server URI.
@ -62,8 +66,8 @@ grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args) {
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel* channel = grpc_channel_create(
target, new_args, GRPC_CLIENT_CHANNEL, nullptr, nullptr, error);
grpc_channel_args_destroy(new_args);
return channel;
}
@ -101,12 +105,20 @@ grpc_channel* grpc_insecure_channel_create(const char* target,
const char* arg_to_remove = arg.key;
grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
args, &arg_to_remove, 1, &arg, 1);
grpc_error* error = GRPC_ERROR_NONE;
// Create channel.
grpc_channel* channel = grpc_core::CreateChannel(target, new_args);
grpc_channel* channel = grpc_core::CreateChannel(target, new_args, &error);
// Clean up.
grpc_channel_args_destroy(new_args);
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
"Failed to create client channel");
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create client channel");
}
return channel;
}

@ -55,17 +55,27 @@ grpc_channel* grpc_insecure_channel_create_from_fd(
grpc_transport* transport =
grpc_create_chttp2_transport(final_args, client, true);
GPR_ASSERT(transport);
grpc_channel* channel = grpc_channel_create(
target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_error* error = nullptr;
grpc_channel* channel =
grpc_channel_create(target, final_args, GRPC_CLIENT_DIRECT_CHANNEL,
transport, nullptr, &error);
grpc_channel_args_destroy(final_args);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
if (channel != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
grpc_core::ExecCtx::Get()->Flush();
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create client channel");
}
grpc_core::ExecCtx::Get()->Flush();
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
"Failed to create client channel");
return channel;
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD

@ -127,9 +127,13 @@ class Chttp2SecureClientChannelFactory : public ClientChannelFactory {
namespace {
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args) {
grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args,
grpc_error** error) {
if (target == nullptr) {
gpr_log(GPR_ERROR, "cannot create channel with NULL target name");
if (error != nullptr) {
*error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel target is NULL");
}
return nullptr;
}
// Add channel arg containing the server URI.
@ -140,8 +144,8 @@ grpc_channel* CreateChannel(const char* target, const grpc_channel_args* args) {
const char* to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args* new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
grpc_channel* channel =
grpc_channel_create(target, new_args, GRPC_CLIENT_CHANNEL, nullptr);
grpc_channel* channel = grpc_channel_create(
target, new_args, GRPC_CLIENT_CHANNEL, nullptr, nullptr, error);
grpc_channel_args_destroy(new_args);
return channel;
}
@ -176,6 +180,7 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds,
4, ((void*)creds, target, (void*)args, (void*)reserved));
GPR_ASSERT(reserved == nullptr);
grpc_channel* channel = nullptr;
grpc_error* error = GRPC_ERROR_NONE;
if (creds != nullptr) {
// Add channel args containing the client channel factory and channel
// credentials.
@ -189,12 +194,19 @@ grpc_channel* grpc_secure_channel_create(grpc_channel_credentials* creds,
args, &arg_to_remove, 1, args_to_add, GPR_ARRAY_SIZE(args_to_add));
new_args = creds->update_arguments(new_args);
// Create channel.
channel = grpc_core::CreateChannel(target, new_args);
channel = grpc_core::CreateChannel(target, new_args, &error);
// Clean up.
grpc_channel_args_destroy(new_args);
}
return channel != nullptr ? channel
: grpc_lame_client_channel_create(
target, GRPC_STATUS_INTERNAL,
"Failed to create secure client channel");
if (channel == nullptr) {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
channel = grpc_lame_client_channel_create(
target, status, "Failed to create secure client channel");
}
return channel;
}

@ -227,31 +227,47 @@ void Chttp2ServerListener::ConnectionState::OnHandshakeDone(void* arg,
if (args->endpoint != nullptr) {
grpc_transport* transport = grpc_create_chttp2_transport(
args->args, args->endpoint, false, resource_user);
self->listener_->server_->SetupTransport(
grpc_error* channel_init_err = self->listener_->server_->SetupTransport(
transport, self->accepting_pollset_, args->args,
grpc_chttp2_transport_get_socket_node(transport), resource_user);
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
// Note: The reinterpret_cast<>s here are safe, because
// grpc_chttp2_transport is a C-style extension of
// grpc_transport, so this is morally equivalent of a
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->transport_ = reinterpret_cast<grpc_chttp2_transport*>(transport);
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self,
grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_);
grpc_channel_args_destroy(args->args);
self->Ref().release(); // Held by OnTimeout().
GRPC_CHTTP2_REF_TRANSPORT(
reinterpret_cast<grpc_chttp2_transport*>(transport),
"receive settings timeout");
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
if (channel_init_err == GRPC_ERROR_NONE) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
// Note: The reinterpret_cast<>s here are safe, because
// grpc_chttp2_transport is a C-style extension of
// grpc_transport, so this is morally equivalent of a
// static_cast<> to a derived class.
// TODO(roth): Change to static_cast<> when we C++-ify the
// transport API.
self->transport_ =
reinterpret_cast<grpc_chttp2_transport*>(transport);
self->Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings,
self, grpc_schedule_on_exec_ctx);
grpc_chttp2_transport_start_reading(transport, args->read_buffer,
&self->on_receive_settings_);
grpc_channel_args_destroy(args->args);
self->Ref().release(); // Held by OnTimeout().
GRPC_CHTTP2_REF_TRANSPORT(
reinterpret_cast<grpc_chttp2_transport*>(transport),
"receive settings timeout");
GRPC_CLOSURE_INIT(&self->on_timeout_, OnTimeout, self,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&self->timer_, self->deadline_, &self->on_timeout_);
} else {
// Failed to create channel from transport. Clean up.
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_error_string(channel_init_err));
GRPC_ERROR_UNREF(channel_init_err);
grpc_transport_destroy(transport);
grpc_slice_buffer_destroy_internal(args->read_buffer);
gpr_free(args->read_buffer);
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,
GRPC_RESOURCE_QUOTA_CHANNEL_SIZE);
}
grpc_channel_args_destroy(args->args);
}
} else {
if (resource_user != nullptr) {
grpc_resource_user_free(resource_user,

@ -51,12 +51,19 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server,
grpc_transport* transport = grpc_create_chttp2_transport(
server_args, server_endpoint, false /* is_client */);
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
grpc_error* error =
core_server->SetupTransport(transport, nullptr, server_args, nullptr);
if (error == GRPC_ERROR_NONE) {
for (grpc_pollset* pollset : core_server->pollsets()) {
grpc_endpoint_add_to_pollset(server_endpoint, pollset);
}
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
gpr_log(GPR_ERROR, "Failed to create channel: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
core_server->SetupTransport(transport, nullptr, server_args, nullptr);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
#else // !GPR_SUPPORT_CHANNELS_FROM_FD

@ -1292,10 +1292,43 @@ grpc_channel* grpc_inproc_channel_create(grpc_server* server,
client_args);
// TODO(ncteisen): design and support channelz GetSocket for inproc.
server->core_server->SetupTransport(server_transport, nullptr, server_args,
nullptr);
grpc_channel* channel = grpc_channel_create(
"inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
grpc_error* error = server->core_server->SetupTransport(
server_transport, nullptr, server_args, nullptr);
grpc_channel* channel = nullptr;
if (error == GRPC_ERROR_NONE) {
channel =
grpc_channel_create("inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL,
client_transport, nullptr, &error);
if (error != GRPC_ERROR_NONE) {
GPR_ASSERT(!channel);
gpr_log(GPR_ERROR, "Failed to create client channel: %s",
grpc_error_string(error));
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
// client_transport was destroyed when grpc_channel_create saw an error.
grpc_transport_destroy(server_transport);
channel = grpc_lame_client_channel_create(
nullptr, status, "Failed to create client channel");
}
} else {
GPR_ASSERT(!channel);
gpr_log(GPR_ERROR, "Failed to create server channel: %s",
grpc_error_string(error));
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(client_transport);
grpc_transport_destroy(server_transport);
channel = grpc_lame_client_channel_create(
nullptr, status, "Failed to create server channel");
}
// Free up created channel args
grpc_channel_args_destroy(server_args);

@ -58,7 +58,7 @@ static void destroy_channel(void* arg, grpc_error* error);
grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type) {
grpc_channel_stack_type channel_stack_type, grpc_error** error) {
char* target = gpr_strdup(grpc_channel_stack_builder_get_target(builder));
grpc_channel_args* args = grpc_channel_args_copy(
grpc_channel_stack_builder_get_channel_arguments(builder));
@ -70,16 +70,21 @@ grpc_channel* grpc_channel_create_with_builder(
} else {
GRPC_STATS_INC_CLIENT_CHANNELS_CREATED();
}
grpc_error* error = grpc_channel_stack_builder_finish(
grpc_error* builder_error = grpc_channel_stack_builder_finish(
builder, sizeof(grpc_channel), 1, destroy_channel, nullptr,
reinterpret_cast<void**>(&channel));
if (error != GRPC_ERROR_NONE) {
if (builder_error != GRPC_ERROR_NONE) {
gpr_log(GPR_ERROR, "channel stack builder failed: %s",
grpc_error_string(error));
GRPC_ERROR_UNREF(error);
grpc_error_string(builder_error));
GPR_ASSERT(channel == nullptr);
if (error != nullptr) {
*error = builder_error;
} else {
GRPC_ERROR_UNREF(builder_error);
}
gpr_free(target);
grpc_channel_args_destroy(args);
return channel;
return nullptr;
}
channel->target = target;
channel->resource_user = resource_user;
@ -219,7 +224,8 @@ grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* input_args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport,
grpc_resource_user* resource_user) {
grpc_resource_user* resource_user,
grpc_error** error) {
// We need to make sure that grpc_shutdown() does not shut things down
// until after the channel is destroyed. However, the channel may not
// actually be destroyed by the time grpc_channel_destroy() returns,
@ -268,7 +274,7 @@ grpc_channel* grpc_channel_create(const char* target,
CreateChannelzNode(builder);
}
grpc_channel* channel =
grpc_channel_create_with_builder(builder, channel_stack_type);
grpc_channel_create_with_builder(builder, channel_stack_type, error);
if (channel == nullptr) {
grpc_shutdown(); // Since we won't call destroy_channel().
}

@ -34,7 +34,8 @@ grpc_channel* grpc_channel_create(const char* target,
const grpc_channel_args* args,
grpc_channel_stack_type channel_stack_type,
grpc_transport* optional_transport,
grpc_resource_user* resource_user = nullptr);
grpc_resource_user* resource_user = nullptr,
grpc_error** error = nullptr);
/** The same as grpc_channel_destroy, but doesn't create an ExecCtx, and so
* is safe to use from within core. */
@ -42,7 +43,7 @@ void grpc_channel_destroy_internal(grpc_channel* channel);
grpc_channel* grpc_channel_create_with_builder(
grpc_channel_stack_builder* builder,
grpc_channel_stack_type channel_stack_type);
grpc_channel_stack_type channel_stack_type, grpc_error** error = nullptr);
/** Create a call given a grpc_channel, in order to call \a method.
Progress is tied to activity on \a pollset_set. The returned call object is

@ -579,14 +579,18 @@ void Server::Start() {
starting_cv_.Signal();
}
void Server::SetupTransport(
grpc_error* Server::SetupTransport(
grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
const RefCountedPtr<grpc_core::channelz::SocketNode>& socket_node,
grpc_resource_user* resource_user) {
// Create channel.
grpc_error* error = GRPC_ERROR_NONE;
grpc_channel* channel = grpc_channel_create(
nullptr, args, GRPC_SERVER_CHANNEL, transport, resource_user);
nullptr, args, GRPC_SERVER_CHANNEL, transport, resource_user, &error);
if (channel == nullptr) {
return error;
}
ChannelData* chand = static_cast<ChannelData*>(
grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
->channel_data);
@ -607,6 +611,7 @@ void Server::SetupTransport(
}
// Initialize chand.
chand->InitTransport(Ref(), channel, cq_idx, transport, channelz_socket_uuid);
return GRPC_ERROR_NONE;
}
bool Server::HasOpenConnections() {

@ -115,11 +115,11 @@ class Server : public InternallyRefCounted<Server> {
// Sets up a transport. Creates a channel stack and binds the transport to
// the server. Called from the listener when a new connection is accepted.
void SetupTransport(grpc_transport* transport,
grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
const RefCountedPtr<channelz::SocketNode>& socket_node,
grpc_resource_user* resource_user = nullptr);
grpc_error* SetupTransport(
grpc_transport* transport, grpc_pollset* accepting_pollset,
const grpc_channel_args* args,
const RefCountedPtr<channelz::SocketNode>& socket_node,
grpc_resource_user* resource_user = nullptr);
void RegisterCompletionQueue(grpc_completion_queue* cq);

@ -67,12 +67,12 @@ extern void disappearing_server(grpc_end2end_test_config config);
extern void disappearing_server_pre_init(void);
extern void empty_batch(grpc_end2end_test_config config);
extern void empty_batch_pre_init(void);
extern void filter_call_init_fails(grpc_end2end_test_config config);
extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void filter_context(grpc_end2end_test_config config);
extern void filter_context_pre_init(void);
extern void filter_init_fails(grpc_end2end_test_config config);
extern void filter_init_fails_pre_init(void);
extern void filter_latency(grpc_end2end_test_config config);
extern void filter_latency_pre_init(void);
extern void filter_status_code(grpc_end2end_test_config config);
@ -210,9 +210,9 @@ void grpc_end2end_tests_pre_init(void) {
default_host_pre_init();
disappearing_server_pre_init();
empty_batch_pre_init();
filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
filter_context_pre_init();
filter_init_fails_pre_init();
filter_latency_pre_init();
filter_status_code_pre_init();
graceful_server_shutdown_pre_init();
@ -298,9 +298,9 @@ void grpc_end2end_tests(int argc, char **argv,
default_host(config);
disappearing_server(config);
empty_batch(config);
filter_call_init_fails(config);
filter_causes_close(config);
filter_context(config);
filter_init_fails(config);
filter_latency(config);
filter_status_code(config);
graceful_server_shutdown(config);
@ -438,10 +438,6 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
continue;
}
if (0 == strcmp("filter_call_init_fails", argv[i])) {
filter_call_init_fails(config);
continue;
}
if (0 == strcmp("filter_causes_close", argv[i])) {
filter_causes_close(config);
continue;
@ -450,6 +446,10 @@ void grpc_end2end_tests(int argc, char **argv,
filter_context(config);
continue;
}
if (0 == strcmp("filter_init_fails", argv[i])) {
filter_init_fails(config);
continue;
}
if (0 == strcmp("filter_latency", argv[i])) {
filter_latency(config);
continue;

@ -69,12 +69,12 @@ extern void disappearing_server(grpc_end2end_test_config config);
extern void disappearing_server_pre_init(void);
extern void empty_batch(grpc_end2end_test_config config);
extern void empty_batch_pre_init(void);
extern void filter_call_init_fails(grpc_end2end_test_config config);
extern void filter_call_init_fails_pre_init(void);
extern void filter_causes_close(grpc_end2end_test_config config);
extern void filter_causes_close_pre_init(void);
extern void filter_context(grpc_end2end_test_config config);
extern void filter_context_pre_init(void);
extern void filter_init_fails(grpc_end2end_test_config config);
extern void filter_init_fails_pre_init(void);
extern void filter_latency(grpc_end2end_test_config config);
extern void filter_latency_pre_init(void);
extern void filter_status_code(grpc_end2end_test_config config);
@ -213,9 +213,9 @@ void grpc_end2end_tests_pre_init(void) {
default_host_pre_init();
disappearing_server_pre_init();
empty_batch_pre_init();
filter_call_init_fails_pre_init();
filter_causes_close_pre_init();
filter_context_pre_init();
filter_init_fails_pre_init();
filter_latency_pre_init();
filter_status_code_pre_init();
graceful_server_shutdown_pre_init();
@ -302,9 +302,9 @@ void grpc_end2end_tests(int argc, char **argv,
default_host(config);
disappearing_server(config);
empty_batch(config);
filter_call_init_fails(config);
filter_causes_close(config);
filter_context(config);
filter_init_fails(config);
filter_latency(config);
filter_status_code(config);
graceful_server_shutdown(config);
@ -446,10 +446,6 @@ void grpc_end2end_tests(int argc, char **argv,
empty_batch(config);
continue;
}
if (0 == strcmp("filter_call_init_fails", argv[i])) {
filter_call_init_fails(config);
continue;
}
if (0 == strcmp("filter_causes_close", argv[i])) {
filter_causes_close(config);
continue;
@ -458,6 +454,10 @@ void grpc_end2end_tests(int argc, char **argv,
filter_context(config);
continue;
}
if (0 == strcmp("filter_init_fails", argv[i])) {
filter_init_fails(config);
continue;
}
if (0 == strcmp("filter_latency", argv[i])) {
filter_latency(config);
continue;

@ -52,8 +52,14 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
f->server->core_server->SetupTransport(
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
}
typedef struct {
@ -68,9 +74,24 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
const_cast<char*>("test-authority"));
grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
cs->f->client = grpc_channel_create("socketpair-target", args,
GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_error* error = GRPC_ERROR_NONE;
cs->f->client =
grpc_channel_create("socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL,
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
grpc_transport_destroy(transport);
}
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -100,7 +121,6 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@ -114,7 +134,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {

@ -46,8 +46,14 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
f->server->core_server->SetupTransport(
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
}
typedef struct {
@ -63,9 +69,24 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
const_cast<char*>("test-authority"));
grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
cs->f->client = grpc_channel_create("socketpair-target", args,
GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_error* error = GRPC_ERROR_NONE;
cs->f->client =
grpc_channel_create("socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL,
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
grpc_transport_destroy(transport);
}
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -95,7 +116,6 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@ -109,7 +129,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {

@ -46,8 +46,14 @@ static void server_setup_transport(void* ts, grpc_transport* transport) {
grpc_core::ExecCtx exec_ctx;
grpc_endpoint_pair* sfd = static_cast<grpc_endpoint_pair*>(f->fixture_data);
grpc_endpoint_add_to_pollset(sfd->server, grpc_cq_pollset(f->cq));
f->server->core_server->SetupTransport(
grpc_error* error = f->server->core_server->SetupTransport(
transport, nullptr, f->server->core_server->channel_args(), nullptr);
if (error == GRPC_ERROR_NONE) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
GRPC_ERROR_UNREF(error);
grpc_transport_destroy(transport);
}
}
typedef struct {
@ -63,9 +69,24 @@ static void client_setup_transport(void* ts, grpc_transport* transport) {
const_cast<char*>("test-authority"));
grpc_channel_args* args =
grpc_channel_args_copy_and_add(cs->client_args, &authority_arg, 1);
cs->f->client = grpc_channel_create("socketpair-target", args,
GRPC_CLIENT_DIRECT_CHANNEL, transport);
grpc_error* error = GRPC_ERROR_NONE;
cs->f->client =
grpc_channel_create("socketpair-target", args, GRPC_CLIENT_DIRECT_CHANNEL,
transport, nullptr, &error);
grpc_channel_args_destroy(args);
if (cs->f->client != nullptr) {
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
} else {
intptr_t integer;
grpc_status_code status = GRPC_STATUS_INTERNAL;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &integer)) {
status = static_cast<grpc_status_code>(integer);
}
GRPC_ERROR_UNREF(error);
cs->f->client =
grpc_lame_client_channel_create(nullptr, status, "lame channel");
grpc_transport_destroy(transport);
}
}
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
@ -106,7 +127,6 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture* f,
transport = grpc_create_chttp2_transport(client_args, sfd->client, true);
client_setup_transport(&cs, transport);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
@ -120,7 +140,6 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture* f,
grpc_server_start(f->server);
transport = grpc_create_chttp2_transport(server_args, sfd->server, false);
server_setup_transport(f, transport);
grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture* f) {

@ -236,7 +236,7 @@ END2END_TESTS = {
"disappearing_server": _test_options(needs_fullstack = True, needs_names = True),
"empty_batch": _test_options(),
"filter_causes_close": _test_options(),
"filter_call_init_fails": _test_options(),
"filter_init_fails": _test_options(),
"filter_context": _test_options(),
"graceful_server_shutdown": _test_options(exclude_inproc = True),
"hpack_size": _test_options(

@ -36,6 +36,7 @@ enum { TIMEOUT = 200000 };
static bool g_enable_server_channel_filter = false;
static bool g_enable_client_channel_filter = false;
static bool g_enable_client_subchannel_filter = false;
static bool g_channel_filter_init_failure = false;
static void* tag(intptr_t t) { return (void*)t; }
@ -103,7 +104,7 @@ static void test_server_channel_filter(grpc_end2end_test_config config) {
grpc_byte_buffer* request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_end2end_test_fixture f =
begin_test(config, "filter_call_init_fails", nullptr, nullptr);
begin_test(config, "filter_init_fails", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
@ -168,8 +169,17 @@ static void test_server_channel_filter(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
if (g_channel_filter_init_failure == true) {
// Inproc channel returns invalid_argument and other clients return
// unavailable.
// Windows with sockpair returns unknown.
GPR_ASSERT(status == GRPC_STATUS_UNKNOWN ||
status == GRPC_STATUS_UNAVAILABLE ||
status == GRPC_STATUS_INVALID_ARGUMENT);
} else {
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
@ -198,7 +208,7 @@ static void test_client_channel_filter(grpc_end2end_test_config config) {
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_from_now();
grpc_end2end_test_fixture f =
begin_test(config, "filter_call_init_fails", nullptr, nullptr);
begin_test(config, "filter_init_fails", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
@ -257,8 +267,12 @@ static void test_client_channel_filter(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
if (g_channel_filter_init_failure) {
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
} else {
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
@ -287,7 +301,7 @@ static void test_client_subchannel_filter(grpc_end2end_test_config config) {
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_from_now();
grpc_end2end_test_fixture f =
begin_test(config, "filter_call_init_fails", nullptr, nullptr);
begin_test(config, "filter_init_fails", nullptr, nullptr);
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
@ -347,8 +361,12 @@ static void test_client_subchannel_filter(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
if (g_channel_filter_init_failure) {
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
} else {
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
}
// Reset and create a new call. (The first call uses a different code
// path in client_channel.c than subsequent calls on the same channel,
@ -370,8 +388,12 @@ static void test_client_subchannel_filter(grpc_end2end_test_config config) {
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
if (g_channel_filter_init_failure) {
GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
} else {
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
}
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
@ -407,6 +429,11 @@ static void destroy_call_elem(grpc_call_element* /*elem*/,
static grpc_error* init_channel_elem(grpc_channel_element* /*elem*/,
grpc_channel_element_args* /*args*/) {
if (g_channel_filter_init_failure) {
return grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test channel filter init error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT);
}
return GRPC_ERROR_NONE;
}
@ -423,7 +450,7 @@ static const grpc_channel_filter test_filter = {
init_channel_elem,
destroy_channel_elem,
grpc_channel_next_get_info,
"filter_call_init_fails"};
"filter_init_fails"};
/*******************************************************************************
* Registration
@ -499,7 +526,7 @@ static void init_plugin(void) {
static void destroy_plugin(void) {}
void filter_call_init_fails(grpc_end2end_test_config config) {
static void filter_init_fails_internal(grpc_end2end_test_config config) {
gpr_log(GPR_INFO, "Testing SERVER_CHANNEL filter.");
g_enable_server_channel_filter = true;
test_server_channel_filter(config);
@ -524,6 +551,14 @@ void filter_call_init_fails(grpc_end2end_test_config config) {
}
}
void filter_call_init_fails_pre_init(void) {
void filter_init_fails(grpc_end2end_test_config config) {
filter_init_fails_internal(config);
gpr_log(GPR_INFO, "Testing with channel filter init error");
g_channel_filter_init_failure = true;
filter_init_fails_internal(config);
g_channel_filter_init_failure = false;
}
void filter_init_fails_pre_init(void) {
grpc_register_plugin(init_plugin, destroy_plugin);
}
Loading…
Cancel
Save