Merge branch 'master' into bm_cq_multi_threads

pull/10467/head
Sree Kuchibhotla 8 years ago
commit 137dd89534
  1. 20
      src/core/ext/filters/client_channel/client_channel.c
  2. 3
      src/core/ext/filters/client_channel/subchannel.c
  3. 4
      src/core/lib/iomgr/ev_epoll_linux.c
  4. 4
      src/core/lib/iomgr/ev_poll_posix.c
  5. 4
      src/core/lib/iomgr/pollset.h
  6. 4
      src/core/lib/iomgr/pollset_windows.c
  7. 5
      src/core/lib/surface/completion_queue.c
  8. 2
      src/csharp/global.json
  9. 4
      templates/tools/dockerfile/csharp_dotnetcli_deps.include
  10. 286
      test/core/end2end/tests/filter_call_init_fails.c
  11. 4
      tools/dockerfile/interoptest/grpc_interop_csharpcoreclr/Dockerfile
  12. 4
      tools/dockerfile/test/csharp_coreclr_x64/Dockerfile

@ -914,14 +914,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
.arena = calld->arena};
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, new_error);
} else {
retry_waiting_locked(exec_ctx, calld);
}
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, calld);
}
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@ -1152,16 +1152,16 @@ static void start_transport_stream_op_batch_locked_inner(
.arena = calld->arena};
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call);
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error));
grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error);
} else {
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
}
gpr_atm_rel_store(&calld->subchannel_call,
(gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, calld);
/* recurse to retry */
start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem);
/* early out */
return;
}

@ -769,7 +769,7 @@ grpc_error *grpc_connected_subchannel_create_call(
*call = gpr_arena_alloc(
args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
(*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
const grpc_call_element_args call_args = {.call_stack = callstk,
.server_transport_data = NULL,
.context = NULL,
@ -784,7 +784,6 @@ grpc_error *grpc_connected_subchannel_create_call(
gpr_log(GPR_ERROR, "error: %s", error_string);
return error;
}
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent);
return GRPC_ERROR_NONE;
}

@ -1717,7 +1717,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
worker.pt_id = pthread_self();
gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
*worker_hdl = &worker;
if (worker_hdl) *worker_hdl = &worker;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
@ -1795,7 +1795,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->po.mu);
}
*worker_hdl = NULL;
if (worker_hdl) *worker_hdl = NULL;
gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
gpr_tls_set(&g_current_thread_worker, (intptr_t)0);

@ -871,7 +871,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
if (worker_hdl) *worker_hdl = &worker;
grpc_error *error = GRPC_ERROR_NONE;
/* Avoid malloc for small number of elements. */
@ -1092,7 +1092,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_mu_lock(&pollset->mu);
}
}
*worker_hdl = NULL;
if (worker_hdl) *worker_hdl = NULL;
GPR_TIMER_END("pollset_work", 0);
GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
return error;

@ -75,6 +75,10 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
and it is guaranteed that it will not be released by grpc_pollset_work
AFTER worker has been destroyed.
It's legal for worker to be NULL: in that case, this specific thread can not
be directly woken with a kick, but maybe be indirectly (with a kick against
the pollset as a whole).
Tries not to block past deadline.
May call grpc_closure_list_run on grpc_closure_list, without holding the
pollset

@ -120,7 +120,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
*worker_hdl = &worker;
if (worker_hdl) *worker_hdl = &worker;
int added_worker = 0;
worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
@ -185,7 +185,7 @@ done:
remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET);
}
gpr_cv_destroy(&worker.cv);
*worker_hdl = NULL;
if (worker_hdl) *worker_hdl = NULL;
return GRPC_ERROR_NONE;
}

@ -345,7 +345,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_pollset_worker *worker = NULL;
gpr_timespec now;
GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
@ -426,8 +425,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_mu_lock(cc->mu);
continue;
} else {
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc),
&worker, now, iteration_deadline);
grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL,
now, iteration_deadline);
if (err != GRPC_ERROR_NONE) {
gpr_mu_unlock(cc->mu);
const char *msg = grpc_error_string(err);

@ -1,5 +1,5 @@
{
"sdk": {
"version": "1.0.0-preview2-003121"
"version": "1.0.0-preview2-003131"
}
}

@ -1,7 +1,7 @@
# Install dotnet SDK based on https://www.microsoft.com/net/core#debian
RUN apt-get update && apt-get install -y curl libunwind8 gettext
# dotnet-dev-1.0.0-preview2-003121
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=809130
# dotnet-dev-1.0.0-preview2-003131
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=827530
RUN mkdir -p /opt/dotnet && tar zxf dotnet100.tar.gz -C /opt/dotnet
# dotnet-dev-1.0.1
RUN curl -sSL -o dotnet101.tar.gz https://go.microsoft.com/fwlink/?LinkID=843453

@ -49,7 +49,9 @@
enum { TIMEOUT = 200000 };
static bool g_enable_filter = false;
static bool g_enable_server_channel_filter = false;
static bool g_enable_client_channel_filter = false;
static bool g_enable_client_subchannel_filter = false;
static void *tag(intptr_t t) { return (void *)t; }
@ -105,9 +107,9 @@ static void end_test(grpc_end2end_test_fixture *f) {
grpc_completion_queue_destroy(f->cq);
}
// Simple request via a server filter that always fails to initialize
// the call.
static void test_request(grpc_end2end_test_config config) {
// Simple request via a SERVER_CHANNEL filter that always fails to
// initialize the call.
static void test_server_channel_filter(grpc_end2end_test_config config) {
grpc_call *c;
grpc_call *s;
grpc_slice request_payload_slice =
@ -201,6 +203,211 @@ static void test_request(grpc_end2end_test_config config) {
config.tear_down_data(&f);
}
// Simple request via a CLIENT_CHANNEL or CLIENT_DIRECT_CHANNEL filter
// that always fails to initialize the call.
static void test_client_channel_filter(grpc_end2end_test_config config) {
grpc_call *c;
grpc_slice request_payload_slice =
grpc_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_from_now();
grpc_end2end_test_fixture f =
begin_test(config, "filter_call_init_fails", NULL, NULL);
cq_verifier *cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_byte_buffer *request_payload_recv = NULL;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
c = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"),
get_host_override_slice("foo.test.google.fr:1234", config), deadline,
NULL);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->data.send_initial_metadata.metadata = NULL;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_destroy(c);
cq_verifier_destroy(cqv);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
end_test(&f);
config.tear_down_data(&f);
}
// Simple request via a CLIENT_SUBCHANNEL filter that always fails to
// initialize the call.
static void test_client_subchannel_filter(grpc_end2end_test_config config) {
grpc_call *c;
grpc_slice request_payload_slice =
grpc_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_from_now();
grpc_end2end_test_fixture f =
begin_test(config, "filter_call_init_fails", NULL, NULL);
cq_verifier *cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op *op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_byte_buffer *request_payload_recv = NULL;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
c = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"),
get_host_override_slice("foo.test.google.fr:1234", config), deadline,
NULL);
GPR_ASSERT(c);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->data.send_initial_metadata.metadata = NULL;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_MESSAGE;
op->data.send_message.send_message = request_payload;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = NULL;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = NULL;
op++;
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
// Reset and create a new call. (The first call uses a different code
// path in client_channel.c than subsequent calls on the same channel,
// and we need to test both.)
grpc_call_destroy(c);
status = GRPC_STATUS_OK;
grpc_slice_unref(details);
details = grpc_empty_slice();
c = grpc_channel_create_call(
f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"),
get_host_override_slice("foo.test.google.fr:1234", config), deadline,
NULL);
GPR_ASSERT(c);
error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(2), NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(2), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_PERMISSION_DENIED);
GPR_ASSERT(0 == grpc_slice_str_cmp(details, "access denied"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_destroy(c);
cq_verifier_destroy(cqv);
grpc_byte_buffer_destroy(request_payload);
grpc_byte_buffer_destroy(request_payload_recv);
end_test(&f);
config.tear_down_data(&f);
}
/*******************************************************************************
* Test filter - always fails to initialize a call
*/
@ -244,9 +451,30 @@ static const grpc_channel_filter test_filter = {
* Registration
*/
static bool maybe_add_filter(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder, void *arg) {
if (g_enable_filter) {
static bool maybe_add_server_channel_filter(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
void *arg) {
if (g_enable_server_channel_filter) {
// Want to add the filter as close to the end as possible, to make
// sure that all of the filters work well together. However, we
// can't add it at the very end, because the connected channel filter
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator *it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, &test_filter, NULL, NULL);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
} else {
return true;
}
}
static bool maybe_add_client_channel_filter(grpc_exec_ctx *exec_ctx,
grpc_channel_stack_builder *builder,
void *arg) {
if (g_enable_client_channel_filter) {
// Want to add the filter as close to the end as possible, to make
// sure that all of the filters work well together. However, we
// can't add it at the very end, because the connected channel filter
@ -263,17 +491,53 @@ static bool maybe_add_filter(grpc_exec_ctx *exec_ctx,
}
}
static bool maybe_add_client_subchannel_filter(
grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) {
if (g_enable_client_subchannel_filter) {
// Want to add the filter as close to the end as possible, to make
// sure that all of the filters work well together. However, we
// can't add it at the very end, because the client channel filter
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator *it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, &test_filter, NULL, NULL);
grpc_channel_stack_builder_iterator_destroy(it);
return retval;
} else {
return true;
}
}
static void init_plugin(void) {
grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
maybe_add_filter, NULL);
maybe_add_server_channel_filter, NULL);
grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
maybe_add_client_channel_filter, NULL);
grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
maybe_add_client_subchannel_filter, NULL);
grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
maybe_add_client_channel_filter, NULL);
}
static void destroy_plugin(void) {}
void filter_call_init_fails(grpc_end2end_test_config config) {
g_enable_filter = true;
test_request(config);
g_enable_filter = false;
gpr_log(GPR_INFO, "Testing SERVER_CHANNEL filter.");
g_enable_server_channel_filter = true;
test_server_channel_filter(config);
g_enable_server_channel_filter = false;
gpr_log(GPR_INFO, "Testing CLIENT_CHANNEL / CLIENT_DIRECT_CHANNEL filter.");
g_enable_client_channel_filter = true;
test_client_channel_filter(config);
g_enable_client_channel_filter = false;
if (config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL) {
gpr_log(GPR_INFO, "Testing CLIENT_SUBCHANNEL filter.");
g_enable_client_subchannel_filter = true;
test_client_subchannel_filter(config);
g_enable_client_subchannel_filter = false;
}
}
void filter_call_init_fails_pre_init(void) {

@ -99,8 +99,8 @@ RUN nuget update -self
# Install dotnet SDK based on https://www.microsoft.com/net/core#debian
RUN apt-get update && apt-get install -y curl libunwind8 gettext
# dotnet-dev-1.0.0-preview2-003121
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=809130
# dotnet-dev-1.0.0-preview2-003131
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=827530
RUN mkdir -p /opt/dotnet && tar zxf dotnet100.tar.gz -C /opt/dotnet
# dotnet-dev-1.0.1
RUN curl -sSL -o dotnet101.tar.gz https://go.microsoft.com/fwlink/?LinkID=843453

@ -99,8 +99,8 @@ RUN nuget update -self
# Install dotnet SDK based on https://www.microsoft.com/net/core#debian
RUN apt-get update && apt-get install -y curl libunwind8 gettext
# dotnet-dev-1.0.0-preview2-003121
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=809130
# dotnet-dev-1.0.0-preview2-003131
RUN curl -sSL -o dotnet100.tar.gz https://go.microsoft.com/fwlink/?LinkID=827530
RUN mkdir -p /opt/dotnet && tar zxf dotnet100.tar.gz -C /opt/dotnet
# dotnet-dev-1.0.1
RUN curl -sSL -o dotnet101.tar.gz https://go.microsoft.com/fwlink/?LinkID=843453

Loading…
Cancel
Save