From 9b83bd7c95b76c883e494214197ad06ceb3858b7 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 5 Jan 2016 09:50:04 -0800 Subject: [PATCH 1/3] Use specific ruby thread to handle auth metadata plugin callbacks --- src/ruby/ext/grpc/rb_call_credentials.c | 26 ++-- src/ruby/ext/grpc/rb_event_thread.c | 158 ++++++++++++++++++++++++ src/ruby/ext/grpc/rb_event_thread.h | 37 ++++++ 3 files changed, 209 insertions(+), 12 deletions(-) create mode 100644 src/ruby/ext/grpc/rb_event_thread.c create mode 100644 src/ruby/ext/grpc/rb_event_thread.h diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index acc54727990..17999404683 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,8 +38,10 @@ #include #include +#include #include "rb_call.h" +#include "rb_event_thread.h" #include "rb_grpc.h" /* grpc_rb_cCallCredentials is the ruby class that proxies @@ -87,7 +89,7 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args, return result; } -static void *grpc_rb_call_credentials_callback_with_gil(void *param) { +static void grpc_rb_call_credentials_callback_with_gil(void *param) { callback_params *const params = (callback_params *)param; VALUE auth_uri = rb_str_new_cstr(params->context.service_url); /* Pass the arguments to the proc in a hash, which currently only has they key @@ -113,21 +115,19 @@ static void *grpc_rb_call_credentials_callback_with_gil(void *param) { params->callback(params->user_data, md_ary.metadata, md_ary.count, status, error_details); grpc_metadata_array_destroy(&md_ary); - - return NULL; } static void grpc_rb_call_credentials_plugin_get_metadata( void *state, grpc_auth_metadata_context context, grpc_credentials_plugin_metadata_cb cb, void *user_data) { - callback_params params; - params.get_metadata = (VALUE)state; - params.context = context; - params.user_data = user_data; - params.callback = cb; - - rb_thread_call_with_gvl(grpc_rb_call_credentials_callback_with_gil, - (void*)(¶ms)); + callback_params *params = gpr_malloc(sizeof(callback_params)); + params->get_metadata = (VALUE)state; + params->context = context; + params->user_data = user_data; + params->callback = cb; + + grpc_rb_event_queue_enqueue(grpc_rb_call_credentials_callback_with_gil, + (void*)(params)); } static void grpc_rb_call_credentials_plugin_destroy(void *state) { @@ -300,6 +300,8 @@ void Init_grpc_call_credentials() { grpc_rb_call_credentials_compose, -1); id_callback = rb_intern("__callback"); + + grpc_rb_event_queue_thread_start(); } /* Gets the wrapped grpc_call_credentials from the ruby wrapper */ diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c new file mode 100644 index 00000000000..c5891306404 --- /dev/null +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -0,0 +1,158 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "rb_event_thread.h" + +#include + +#include +#include +#include +#include +#include +#include + +typedef struct grpc_rb_event { + // callback will be called with argument while holding the GVL + void (*callback)(void*); + void *argument; + + struct grpc_rb_event *next; +} grpc_rb_event; + +typedef struct grpc_rb_event_queue { + grpc_rb_event *head; + grpc_rb_event *tail; + + gpr_mu *mu; + gpr_cv *cv; + + // Indicates that the thread should stop waiting + bool abort; +} grpc_rb_event_queue; + +static grpc_rb_event_queue event_queue; + +void grpc_rb_event_queue_enqueue(void (*callback)(void*), + void *argument) { + grpc_rb_event *event = gpr_malloc(sizeof(grpc_rb_event)); + event->callback = callback; + event->argument = argument; + event->next = NULL; + gpr_mu_lock(event_queue.mu); + if (event_queue.tail == NULL) { + event_queue.head = event_queue.tail = event; + } else { + event_queue.tail->next = event; + event_queue.tail = event; + } + gpr_mu_unlock(event_queue.mu); + gpr_cv_signal(event_queue.cv); +} + +static grpc_rb_event *grpc_rb_event_queue_dequeue() { + grpc_rb_event *event; + if (event_queue.head == NULL) { + event = NULL; + } else { + event = event_queue.head; + if (event_queue.head->next == NULL) { + event_queue.head = event_queue.tail = NULL; + } else { + event_queue.head = event_queue.head->next; + } + } + return event; +} + +static void grpc_rb_event_queue_destroy() { + gpr_mu_destroy(event_queue.mu); + gpr_cv_destroy(event_queue.cv); + gpr_free(event_queue.mu); + gpr_free(event_queue.cv); +} + +static void *grpc_rb_wait_for_event_no_gil(void *param) { + grpc_rb_event *event = NULL; + gpr_mu_lock(event_queue.mu); + while ((event = grpc_rb_event_queue_dequeue()) == NULL) { + gpr_cv_wait(event_queue.cv, + event_queue.mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + if (event_queue.abort) { + gpr_mu_unlock(event_queue.mu); + return NULL; + } + } + gpr_mu_unlock(event_queue.mu); + return event; +} + +static void grpc_rb_event_unblocking_func(void *arg) { + gpr_mu_lock(event_queue.mu); + event_queue.abort = true; + gpr_mu_unlock(event_queue.mu); + gpr_cv_signal(event_queue.cv); +} + +/* This is the implementation of the thread that handles auth metadata plugin + * events */ +static VALUE grpc_rb_event_thread(VALUE arg) { + grpc_rb_event *event; + while(true) { + event = (grpc_rb_event*)rb_thread_call_without_gvl( + grpc_rb_wait_for_event_no_gil, NULL, + grpc_rb_event_unblocking_func, NULL); + if (event == NULL) { + // Indicates that the thread needs to shut down + break; + } else { + event->callback(event->argument); + gpr_free(event); + } + } + grpc_rb_event_queue_destroy(); + return Qnil; +} + +void grpc_rb_event_queue_thread_start() { + + event_queue.mu = gpr_malloc(sizeof(gpr_mu)); + event_queue.cv = gpr_malloc(sizeof(gpr_cv)); + event_queue.head = event_queue.tail = NULL; + event_queue.abort = false; + gpr_mu_init(event_queue.mu); + gpr_cv_init(event_queue.cv); + + rb_thread_create(grpc_rb_event_thread, NULL); +} diff --git a/src/ruby/ext/grpc/rb_event_thread.h b/src/ruby/ext/grpc/rb_event_thread.h new file mode 100644 index 00000000000..46638bfcf53 --- /dev/null +++ b/src/ruby/ext/grpc/rb_event_thread.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +void grpc_rb_event_queue_thread_start(); + +void grpc_rb_event_queue_enqueue(void (*callback)(void*), + void *argument); From f58404e1d5ee614e70d216d285f90970abfde5c8 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Tue, 5 Jan 2016 09:57:44 -0800 Subject: [PATCH 2/3] Free what we alloc --- src/ruby/ext/grpc/rb_call_credentials.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 17999404683..4d719d75417 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -115,6 +115,7 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) { params->callback(params->user_data, md_ary.metadata, md_ary.count, status, error_details); grpc_metadata_array_destroy(&md_ary); + gpr_free(params); } static void grpc_rb_call_credentials_plugin_get_metadata( From cc2b8d423b1ce3c8305e5c36496d299c7270cfe3 Mon Sep 17 00:00:00 2001 From: murgatroid99 Date: Thu, 7 Jan 2016 17:32:34 -0800 Subject: [PATCH 3/3] Fixed mutex and cond var usage --- src/ruby/ext/grpc/rb_event_thread.c | 39 +++++++++++++---------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c index c5891306404..95af091317c 100644 --- a/src/ruby/ext/grpc/rb_event_thread.c +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -54,8 +54,8 @@ typedef struct grpc_rb_event_queue { grpc_rb_event *head; grpc_rb_event *tail; - gpr_mu *mu; - gpr_cv *cv; + gpr_mu mu; + gpr_cv cv; // Indicates that the thread should stop waiting bool abort; @@ -69,15 +69,15 @@ void grpc_rb_event_queue_enqueue(void (*callback)(void*), event->callback = callback; event->argument = argument; event->next = NULL; - gpr_mu_lock(event_queue.mu); + gpr_mu_lock(&event_queue.mu); if (event_queue.tail == NULL) { event_queue.head = event_queue.tail = event; } else { event_queue.tail->next = event; event_queue.tail = event; } - gpr_mu_unlock(event_queue.mu); - gpr_cv_signal(event_queue.cv); + gpr_cv_signal(&event_queue.cv); + gpr_mu_unlock(&event_queue.mu); } static grpc_rb_event *grpc_rb_event_queue_dequeue() { @@ -96,33 +96,31 @@ static grpc_rb_event *grpc_rb_event_queue_dequeue() { } static void grpc_rb_event_queue_destroy() { - gpr_mu_destroy(event_queue.mu); - gpr_cv_destroy(event_queue.cv); - gpr_free(event_queue.mu); - gpr_free(event_queue.cv); + gpr_mu_destroy(&event_queue.mu); + gpr_cv_destroy(&event_queue.cv); } static void *grpc_rb_wait_for_event_no_gil(void *param) { grpc_rb_event *event = NULL; - gpr_mu_lock(event_queue.mu); + gpr_mu_lock(&event_queue.mu); while ((event = grpc_rb_event_queue_dequeue()) == NULL) { - gpr_cv_wait(event_queue.cv, - event_queue.mu, + gpr_cv_wait(&event_queue.cv, + &event_queue.mu, gpr_inf_future(GPR_CLOCK_REALTIME)); if (event_queue.abort) { - gpr_mu_unlock(event_queue.mu); + gpr_mu_unlock(&event_queue.mu); return NULL; } } - gpr_mu_unlock(event_queue.mu); + gpr_mu_unlock(&event_queue.mu); return event; } static void grpc_rb_event_unblocking_func(void *arg) { - gpr_mu_lock(event_queue.mu); + gpr_mu_lock(&event_queue.mu); event_queue.abort = true; - gpr_mu_unlock(event_queue.mu); - gpr_cv_signal(event_queue.cv); + gpr_cv_signal(&event_queue.cv); + gpr_mu_unlock(&event_queue.mu); } /* This is the implementation of the thread that handles auth metadata plugin @@ -146,13 +144,10 @@ static VALUE grpc_rb_event_thread(VALUE arg) { } void grpc_rb_event_queue_thread_start() { - - event_queue.mu = gpr_malloc(sizeof(gpr_mu)); - event_queue.cv = gpr_malloc(sizeof(gpr_cv)); event_queue.head = event_queue.tail = NULL; event_queue.abort = false; - gpr_mu_init(event_queue.mu); - gpr_cv_init(event_queue.cv); + gpr_mu_init(&event_queue.mu); + gpr_cv_init(&event_queue.cv); rb_thread_create(grpc_rb_event_thread, NULL); }