From 16b39d6e6a558ff0723e9db582b71df1bfc3db10 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 6 Dec 2016 12:18:28 -0800 Subject: [PATCH 01/17] Cache poll threads in the poll-cv engine. This decreases the number of threads spawned by several orders of magnitude in some scenarios. --- src/core/lib/iomgr/ev_poll_posix.c | 398 +++++++++++++++++++++-------- src/core/lib/iomgr/wakeup_fd_cv.h | 6 +- 2 files changed, 296 insertions(+), 108 deletions(-) diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 21b28e5554e..e2de59dc13e 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -56,6 +56,7 @@ #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" +#include "src/core/lib/support/murmur_hash.h" /******************************************************************************* * FD declarations @@ -253,22 +254,43 @@ struct grpc_pollset_set { * condition variable polling definitions */ +#define POLLCV_THREAD_GRACE_MS 1000 #define CV_POLL_PERIOD_MS 1000 #define CV_DEFAULT_TABLE_SIZE 16 -typedef enum poll_status_t { INPROGRESS, COMPLETED, CANCELLED } poll_status_t; - -typedef struct poll_args { +typedef struct poll_result { gpr_refcount refcount; - gpr_cv *cv; + cv_node *watchers; + int watchcount; struct pollfd *fds; nfds_t nfds; - int timeout; int retval; int err; - gpr_atm status; + int completed; +} poll_result; + +typedef struct poll_args { + gpr_cv trigger; + int trigger_set; + struct pollfd *fds; + nfds_t nfds; + poll_result *result; + struct poll_args *next; + struct poll_args *prev; } poll_args; +// This is a 2-tiered cash, we mantain a hash table +// of active poll calls, so we can wait on the result +// of that call. We also maintain a freelist of innactive +// poll threads. +typedef struct poll_hash_table { + poll_args *free_pollers; + poll_args **active_pollers; + unsigned int size; + unsigned int count; +} poll_hash_table; + +poll_hash_table poll_cache; cv_fd_table g_cvfds; /******************************************************************************* @@ -1298,43 +1320,205 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, * Condition Variable polling extensions */ -static void decref_poll_args(poll_args *args) { - if (gpr_unref(&args->refcount)) { - gpr_free(args->fds); - gpr_cv_destroy(args->cv); - gpr_free(args->cv); - gpr_free(args); +static void run_poll(void *args); +static void cache_poller_locked(poll_args *args); + +static void cache_insert_locked(poll_args *args) { + uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd), + 0xDEADBEEF); + key = key % poll_cache.size; + if (poll_cache.active_pollers[key]) { + poll_cache.active_pollers[key]->prev = args; } + args->next = poll_cache.active_pollers[key]; + args->prev = NULL; + poll_cache.active_pollers[key] = args; + poll_cache.count++; } -// Poll in a background thread -static void run_poll(void *arg) { - int timeout, retval; - poll_args *pargs = (poll_args *)arg; - while (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - if (pargs->timeout < 0) { - timeout = CV_POLL_PERIOD_MS; - } else { - timeout = GPR_MIN(CV_POLL_PERIOD_MS, pargs->timeout); - pargs->timeout -= timeout; +void init_result(poll_args *pargs) { + pargs->result = gpr_malloc(sizeof(poll_result)); + gpr_ref_init(&pargs->result->refcount, 1); + pargs->result->watchers = NULL; + pargs->result->watchcount = 0; + pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds); + memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds); + pargs->result->nfds = pargs->nfds; + pargs->result->retval = 0; + pargs->result->err = 0; + pargs->result->completed = 0; +} + +// Creates a poll_args object for a given arguments to poll(). +// This object may return a poll_args in the cache. +static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) { + uint32_t key = + gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + poll_args *curr = poll_cache.active_pollers[key]; + while (curr) { + if (curr->nfds == count && + memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) { + gpr_free(fds); + return curr; } - retval = g_cvfds.poll(pargs->fds, pargs->nfds, timeout); - if (retval != 0 || pargs->timeout == 0) { - pargs->retval = retval; - pargs->err = errno; - break; + curr = curr->next; + } + + if (poll_cache.free_pollers) { + poll_args *pargs = poll_cache.free_pollers; + poll_cache.free_pollers = pargs->next; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = NULL; } + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + init_result(pargs); + cache_poller_locked(pargs); + return pargs; + } + + poll_args *pargs = gpr_malloc(sizeof(struct poll_args)); + gpr_cv_init(&pargs->trigger); + pargs->fds = fds; + pargs->nfds = count; + pargs->next = NULL; + pargs->prev = NULL; + pargs->trigger_set = 0; + init_result(pargs); + cache_poller_locked(pargs); + gpr_thd_id t_id; + gpr_thd_options opt = gpr_thd_options_default(); + gpr_ref(&g_cvfds.pollcount); + gpr_thd_options_set_detached(&opt); + gpr_thd_new(&t_id, &run_poll, pargs, &opt); + return pargs; +} + +static void cache_delete_locked(poll_args *args) { + if (!args->prev) { + uint32_t key = gpr_murmur_hash3( + args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF); + key = key % poll_cache.size; + GPR_ASSERT(poll_cache.active_pollers[key] == args); + poll_cache.active_pollers[key] = args->next; + } else { + args->prev->next = args->next; } - gpr_mu_lock(&g_cvfds.mu); - if (gpr_atm_no_barrier_load(&pargs->status) == INPROGRESS) { - // Signal main thread that the poll completed - gpr_atm_no_barrier_store(&pargs->status, COMPLETED); - gpr_cv_signal(pargs->cv); + + if (args->next) { + args->next->prev = args->prev; } - decref_poll_args(pargs); - g_cvfds.pollcount--; - if (g_cvfds.shutdown && g_cvfds.pollcount == 0) { - gpr_cv_signal(&g_cvfds.shutdown_complete); + + poll_cache.count--; + if (poll_cache.free_pollers) { + poll_cache.free_pollers->prev = args; + } + args->prev = NULL; + args->next = poll_cache.free_pollers; + gpr_free(args->fds); + poll_cache.free_pollers = args; +} + +static void cache_poller_locked(poll_args *args) { + if (poll_cache.count + 1 > poll_cache.size / 2) { + poll_args **old_active_pollers = poll_cache.active_pollers; + poll_cache.size = poll_cache.size * 2; + poll_cache.count = 0; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + for (unsigned int i = 0; i < poll_cache.size / 2; i++) { + poll_args *curr = old_active_pollers[i]; + poll_args *next = NULL; + while (curr) { + next = curr->next; + cache_insert_locked(curr); + curr = next; + } + } + gpr_free(old_active_pollers); + } + + cache_insert_locked(args); +} + +static void cache_destroy_locked(poll_args *args) { + if (args->next) { + args->next->prev = args->prev; + } + + if (args->prev) { + args->prev->next = args->next; + } else { + poll_cache.free_pollers = args->next; + } + + gpr_free(args); +} + +static void decref_poll_result(poll_result *res) { + if (gpr_unref(&res->refcount)) { + GPR_ASSERT(!res->watchers); + gpr_free(res->fds); + gpr_free(res); + } +} + +void remove_cvn(cv_node **head, cv_node *target) { + if (target->next) { + target->next->prev = target->prev; + } + + if (target->prev) { + target->prev->next = target->next; + } else { + *head = target->next; + } +} + +gpr_timespec thread_grace; + +// Poll in a background thread +static void run_poll(void *args) { + poll_args *pargs = (poll_args *)args; + while (1) { + poll_result *result = pargs->result; + int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS); + gpr_mu_lock(&g_cvfds.mu); + if (retval != 0) { + result->completed = 1; + result->retval = retval; + result->err = errno; + cv_node *watcher = result->watchers; + while (watcher) { + gpr_cv_signal(watcher->cv); + watcher = watcher->next; + } + } + if (result->watchcount == 0 || result->completed) { + cache_delete_locked(pargs); + decref_poll_result(result); + // Leave this polling thread alive for a grace period to do another poll() + // op + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + deadline = gpr_time_add(deadline, thread_grace); + pargs->trigger_set = 0; + gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline); + if (!pargs->trigger_set) { + cache_destroy_locked(pargs); + break; + } + } + gpr_mu_unlock(&g_cvfds.mu); + } + + // We still have the lock here + if (gpr_unref(&g_cvfds.pollcount)) { + gpr_cv_signal(&g_cvfds.shutdown_cv); } gpr_mu_unlock(&g_cvfds.mu); } @@ -1343,24 +1527,29 @@ static void run_poll(void *arg) { static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { unsigned int i; int res, idx; - gpr_cv *pollcv; - cv_node *cvn, *prev; + cv_node *pollcv; int skip_poll = 0; nfds_t nsockfds = 0; - gpr_thd_id t_id; - gpr_thd_options opt; - poll_args *pargs = NULL; + poll_result *result = NULL; gpr_mu_lock(&g_cvfds.mu); - pollcv = gpr_malloc(sizeof(gpr_cv)); - gpr_cv_init(pollcv); + pollcv = gpr_malloc(sizeof(cv_node)); + pollcv->next = NULL; + gpr_cv pollcv_cv; + gpr_cv_init(&pollcv_cv); + pollcv->cv = &pollcv_cv; + cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node)); + for (i = 0; i < nfds; i++) { fds[i].revents = 0; if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { idx = FD_TO_IDX(fds[i].fd); - cvn = gpr_malloc(sizeof(cv_node)); - cvn->cv = pollcv; - cvn->next = g_cvfds.cvfds[idx].cvs; - g_cvfds.cvfds[idx].cvs = cvn; + fd_cvs[i].cv = &pollcv_cv; + fd_cvs[i].prev = NULL; + fd_cvs[i].next = g_cvfds.cvfds[idx].cvs; + if (g_cvfds.cvfds[idx].cvs) { + g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]); + } + g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]); // Don't bother polling if a wakeup fd is ready if (g_cvfds.cvfds[idx].is_set) { skip_poll = 1; @@ -1370,81 +1559,68 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { } } + gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); + if (timeout < 0) { + deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + } else { + deadline = + gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); + } + res = 0; if (!skip_poll && nsockfds > 0) { - pargs = gpr_malloc(sizeof(struct poll_args)); - // Both the main thread and calling thread get a reference - gpr_ref_init(&pargs->refcount, 2); - pargs->cv = pollcv; - pargs->fds = gpr_malloc(sizeof(struct pollfd) * nsockfds); - pargs->nfds = nsockfds; - pargs->timeout = timeout; - pargs->retval = 0; - pargs->err = 0; - gpr_atm_no_barrier_store(&pargs->status, INPROGRESS); + struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd >= 0) { - pargs->fds[idx].fd = fds[i].fd; - pargs->fds[idx].events = fds[i].events; - pargs->fds[idx].revents = 0; + pollfds[idx].fd = fds[i].fd; + pollfds[idx].events = fds[i].events; + pollfds[idx].revents = 0; idx++; } } - g_cvfds.pollcount++; - opt = gpr_thd_options_default(); - gpr_thd_options_set_detached(&opt); - gpr_thd_new(&t_id, &run_poll, pargs, &opt); - // We want the poll() thread to trigger the deadline, so wait forever here - gpr_cv_wait(pollcv, &g_cvfds.mu, gpr_inf_future(GPR_CLOCK_MONOTONIC)); - if (gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - res = pargs->retval; - errno = pargs->err; - } else { - errno = 0; - gpr_atm_no_barrier_store(&pargs->status, CANCELLED); + poll_args *pargs = get_poller_locked(pollfds, nsockfds); + result = pargs->result; + pollcv->next = result->watchers; + pollcv->prev = NULL; + if (result->watchers) { + result->watchers->prev = pollcv; } + result->watchers = pollcv; + result->watchcount++; + gpr_ref(&result->refcount); + + pargs->trigger_set = 1; + gpr_cv_signal(&pargs->trigger); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); + res = result->retval; + errno = result->err; + result->watchcount--; + remove_cvn(&result->watchers, pollcv); } else if (!skip_poll) { - gpr_timespec deadline = gpr_now(GPR_CLOCK_REALTIME); - deadline = - gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN)); - gpr_cv_wait(pollcv, &g_cvfds.mu, deadline); + gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline); } idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd < 0 && (fds[i].events & POLLIN)) { - cvn = g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs; - prev = NULL; - while (cvn->cv != pollcv) { - prev = cvn; - cvn = cvn->next; - GPR_ASSERT(cvn); - } - if (!prev) { - g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs = cvn->next; - } else { - prev->next = cvn->next; - } - gpr_free(cvn); - + remove_cvn(&g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i])); if (g_cvfds.cvfds[FD_TO_IDX(fds[i].fd)].is_set) { fds[i].revents = POLLIN; if (res >= 0) res++; } - } else if (!skip_poll && fds[i].fd >= 0 && - gpr_atm_no_barrier_load(&pargs->status) == COMPLETED) { - fds[i].revents = pargs->fds[idx].revents; + } else if (!skip_poll && fds[i].fd >= 0 && result->completed) { + fds[i].revents = result->fds[idx].revents; idx++; } } - if (pargs) { - decref_poll_args(pargs); - } else { - gpr_cv_destroy(pollcv); - gpr_free(pollcv); + gpr_free(fd_cvs); + gpr_free(pollcv); + if (result) { + decref_poll_result(result); } + gpr_mu_unlock(&g_cvfds.mu); return res; @@ -1453,12 +1629,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { static void global_cv_fd_table_init() { gpr_mu_init(&g_cvfds.mu); gpr_mu_lock(&g_cvfds.mu); - gpr_cv_init(&g_cvfds.shutdown_complete); - g_cvfds.shutdown = 0; - g_cvfds.pollcount = 0; + gpr_cv_init(&g_cvfds.shutdown_cv); + gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = NULL; + thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = NULL; @@ -1468,23 +1644,35 @@ static void global_cv_fd_table_init() { // Override the poll function with one that supports cvfds g_cvfds.poll = grpc_poll_function; grpc_poll_function = &cvfd_poll; + + // Initialize the cache + poll_cache.size = 32; + poll_cache.count = 0; + poll_cache.free_pollers = NULL; + poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32); + for (unsigned int i = 0; i < poll_cache.size; i++) { + poll_cache.active_pollers[i] = NULL; + } + gpr_mu_unlock(&g_cvfds.mu); } static void global_cv_fd_table_shutdown() { gpr_mu_lock(&g_cvfds.mu); - g_cvfds.shutdown = 1; // Attempt to wait for all abandoned poll() threads to terminate // Not doing so will result in reported memory leaks - if (g_cvfds.pollcount > 0) { - int res = gpr_cv_wait(&g_cvfds.shutdown_complete, &g_cvfds.mu, + if (!gpr_unref(&g_cvfds.pollcount)) { + int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(3, GPR_TIMESPAN))); GPR_ASSERT(res == 0); } - gpr_cv_destroy(&g_cvfds.shutdown_complete); + gpr_cv_destroy(&g_cvfds.shutdown_cv); grpc_poll_function = g_cvfds.poll; gpr_free(g_cvfds.cvfds); + + gpr_free(poll_cache.active_pollers); + gpr_mu_unlock(&g_cvfds.mu); gpr_mu_destroy(&g_cvfds.mu); } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.h b/src/core/lib/iomgr/wakeup_fd_cv.h index ac16be1750a..5c93b28ef03 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.h +++ b/src/core/lib/iomgr/wakeup_fd_cv.h @@ -58,6 +58,7 @@ typedef struct cv_node { gpr_cv* cv; struct cv_node* next; + struct cv_node* prev; } cv_node; typedef struct fd_node { @@ -68,9 +69,8 @@ typedef struct fd_node { typedef struct cv_fd_table { gpr_mu mu; - int pollcount; - int shutdown; - gpr_cv shutdown_complete; + gpr_refcount pollcount; + gpr_cv shutdown_cv; fd_node* cvfds; fd_node* free_fds; unsigned int size; From 4ea8ce19ea2c6d2f9629e06955b0ce62e82266da Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 7 Aug 2017 16:26:02 +0200 Subject: [PATCH 02/17] add GrpcEnvironment.ShuttingDown event --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 28 +++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 0663ee92150..a466c9fa376 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -43,13 +43,15 @@ namespace Grpc.Core static readonly HashSet registeredChannels = new HashSet(); static readonly HashSet registeredServers = new HashSet(); + static EventHandler shuttingDown; + static ILogger logger = new NullLogger(); readonly GrpcThreadPool threadPool; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); - bool isClosed; + bool isShutdown; /// /// Returns a reference-counted instance of initialized gRPC environment. @@ -237,6 +239,21 @@ namespace Grpc.Core } } + /// + /// Occurs when GrpcEnvironment is about the start the shutdown logic. + /// + public static event EventHandler ShuttingDown + { + add + { + shuttingDown += value; + } + remove + { + shuttingDown -= value; + } + } + /// /// Creates gRPC environment. /// @@ -311,13 +328,16 @@ namespace Grpc.Core /// private async Task ShutdownAsync() { - if (isClosed) + if (isShutdown) { - throw new InvalidOperationException("Close has already been called"); + throw new InvalidOperationException("ShutdownAsync has already been called"); } + + await Task.Run(() => shuttingDown.Invoke(this, null)); + await threadPool.StopAsync().ConfigureAwait(false); GrpcNativeShutdown(); - isClosed = true; + isShutdown = true; debugStats.CheckOK(); } From ae607078fcd00d7b11eacca4ea1937b428547304 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 10:58:15 +0200 Subject: [PATCH 03/17] address comments --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index a466c9fa376..9fad5c6a54e 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -43,8 +43,6 @@ namespace Grpc.Core static readonly HashSet registeredChannels = new HashSet(); static readonly HashSet registeredServers = new HashSet(); - static EventHandler shuttingDown; - static ILogger logger = new NullLogger(); readonly GrpcThreadPool threadPool; @@ -242,17 +240,7 @@ namespace Grpc.Core /// /// Occurs when GrpcEnvironment is about the start the shutdown logic. /// - public static event EventHandler ShuttingDown - { - add - { - shuttingDown += value; - } - remove - { - shuttingDown -= value; - } - } + public static event EventHandler ShuttingDown; /// /// Creates gRPC environment. @@ -333,7 +321,7 @@ namespace Grpc.Core throw new InvalidOperationException("ShutdownAsync has already been called"); } - await Task.Run(() => shuttingDown.Invoke(this, null)); + await Task.Run(() => ShuttingDown?.Invoke(this, null)); await threadPool.StopAsync().ConfigureAwait(false); GrpcNativeShutdown(); From a7ca8668872aab7ff302c464b929a06d29b8ddf9 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 13:58:59 +0200 Subject: [PATCH 04/17] add test for shuttingdown event --- src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 0f3a82c605f..fc9d5599f21 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -18,6 +18,7 @@ using System; using System.Linq; +using System.Threading; using Grpc.Core; using NUnit.Framework; @@ -75,5 +76,19 @@ namespace Grpc.Core.Tests var parts = coreVersion.Split('.'); Assert.AreEqual(3, parts.Length); } + + [Test] + public void ShuttingDownEventIsFired() + { + var cts = new CancellationTokenSource(); + var handler = new EventHandler((sender, args) => { cts.Cancel(); }); + + GrpcEnvironment.ShuttingDown += handler; + var env = GrpcEnvironment.AddRef(); + GrpcEnvironment.ReleaseAsync().Wait(); + GrpcEnvironment.ShuttingDown -= handler; + + Assert.IsTrue(cts.Token.IsCancellationRequested); + } } } From 5250e06890ce737801b01ee7db756c2511d202cc Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 14:00:09 +0200 Subject: [PATCH 05/17] add ConfigureAwait --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 9fad5c6a54e..cf5e47e3d56 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -321,7 +321,7 @@ namespace Grpc.Core throw new InvalidOperationException("ShutdownAsync has already been called"); } - await Task.Run(() => ShuttingDown?.Invoke(this, null)); + await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); await threadPool.StopAsync().ConfigureAwait(false); GrpcNativeShutdown(); From e64825c302e1eacac43d2b21049fddaa25e11902 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 16:10:23 +0200 Subject: [PATCH 06/17] Populate trailers in RpcException --- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 8 +++---- src/csharp/Grpc.Core/RpcException.cs | 28 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 6e6ca7cd53c..17109de587b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -329,7 +329,7 @@ namespace Grpc.Core.Internal protected override Exception GetRpcExceptionClientOnly() { - return new RpcException(finishedStatus.Value.Status); + return new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers); } protected override Task CheckSendAllowedOrEarlyResult() @@ -348,7 +348,7 @@ namespace Grpc.Core.Internal // Writing after the call has finished is not a programming error because server can close // the call anytime, so don't throw directly, but let the write task finish with an error. var tcs = new TaskCompletionSource(); - tcs.SetException(new RpcException(finishedStatus.Value.Status)); + tcs.SetException(new RpcException(finishedStatus.Value.Status, finishedStatus.Value.Trailers)); return tcs.Task; } @@ -468,7 +468,7 @@ namespace Grpc.Core.Internal var status = receivedStatus.Status; if (status.StatusCode != StatusCode.OK) { - unaryResponseTcs.SetException(new RpcException(status)); + unaryResponseTcs.SetException(new RpcException(status, receivedStatus.Trailers)); return; } @@ -506,7 +506,7 @@ namespace Grpc.Core.Internal var status = receivedStatus.Status; if (status.StatusCode != StatusCode.OK) { - streamingResponseCallFinishedTcs.SetException(new RpcException(status)); + streamingResponseCallFinishedTcs.SetException(new RpcException(status, receivedStatus.Trailers)); return; } diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs index 01b9e4fb1a1..d2c912e73af 100644 --- a/src/csharp/Grpc.Core/RpcException.cs +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -17,6 +17,7 @@ #endregion using System; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -26,6 +27,7 @@ namespace Grpc.Core public class RpcException : Exception { private readonly Status status; + private readonly Metadata trailers; /// /// Creates a new RpcException associated with given status. @@ -34,6 +36,7 @@ namespace Grpc.Core public RpcException(Status status) : base(status.ToString()) { this.status = status; + this.trailers = Metadata.Empty; } /// @@ -44,6 +47,18 @@ namespace Grpc.Core public RpcException(Status status, string message) : base(message) { this.status = status; + this.trailers = Metadata.Empty; + } + + /// + /// Creates a new RpcException associated with given status and trailing response metadata. + /// + /// Resulting status of a call. + /// Response trailing metadata. + public RpcException(Status status, Metadata trailers) : base(status.ToString()) + { + this.status = status; + this.trailers = GrpcPreconditions.CheckNotNull(trailers); } /// @@ -56,5 +71,18 @@ namespace Grpc.Core return status; } } + + /// + /// Gets the call trailing metadata. + /// Trailers only have meaningful content for client-side calls (in which case they represent the trailing metadata sent by the server when closing the call). + /// Instances of RpcException thrown by the server-side part of the stack will have trailers always set to empty. + /// + public Metadata Trailers + { + get + { + return trailers; + } + } } } From 2565e732f79da9392cc8656229ea97dd045fbb4f Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 16:53:31 +0200 Subject: [PATCH 07/17] merging of trailers on server-side --- .../Grpc.Core/Internal/ServerCallHandler.cs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 36702a3fab4..638a8d8a1fe 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -76,7 +76,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e); + status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); } try { @@ -133,7 +133,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e); + status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); } try @@ -191,7 +191,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e); + status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); } try @@ -247,7 +247,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e); + status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); } try { @@ -292,11 +292,20 @@ namespace Grpc.Core.Internal internal static class HandlerUtils { - public static Status StatusFromException(Exception e) + public static Status StatusFromException(Exception e, Metadata callContextResponseTrailers) { var rpcException = e as RpcException; if (rpcException != null) { + // There are two sources of metadata entries on the server-side: + // 1. serverCallContext.ResponseTrailers + // 2. trailers in RpcException thrown by user code in server side handler. + // As metadata allows duplicate keys, the logical thing to do is + // to just merge trailers from RpcException into serverCallContext.ResponseTrailers. + foreach (var entry in rpcException.Trailers) + { + callContextResponseTrailers.Add(entry); + } // use the status thrown by handler. return rpcException.Status; } From a87f6d8239bed7f539e62a6e726a85b698f7bfcd Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Tue, 8 Aug 2017 16:35:51 +0200 Subject: [PATCH 08/17] add tests --- .../Grpc.Core.Tests/ClientServerTest.cs | 74 ++++++++++++++++++- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index c74d04c8294..0cb9288131d 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -92,9 +92,33 @@ namespace Grpc.Core.Tests var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); + Assert.AreEqual(0, ex.Trailers.Count); var ex2 = Assert.ThrowsAsync(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); + Assert.AreEqual(0, ex.Trailers.Count); + } + + [Test] + public void UnaryCall_ServerHandlerThrowsRpcExceptionWithTrailers() + { + helper.UnaryHandler = new UnaryServerMethod((request, context) => + { + var trailers = new Metadata { {"xyz", "xyz-value"} }; + throw new RpcException(new Status(StatusCode.Unauthenticated, ""), trailers); + }); + + var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); + Assert.AreEqual(1, ex.Trailers.Count); + Assert.AreEqual("xyz", ex.Trailers[0].Key); + Assert.AreEqual("xyz-value", ex.Trailers[0].Value); + + var ex2 = Assert.ThrowsAsync(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); + Assert.AreEqual(1, ex2.Trailers.Count); + Assert.AreEqual("xyz", ex2.Trailers[0].Key); + Assert.AreEqual("xyz-value", ex2.Trailers[0].Value); } [Test] @@ -108,9 +132,34 @@ namespace Grpc.Core.Tests var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); + Assert.AreEqual(0, ex.Trailers.Count); + + var ex2 = Assert.ThrowsAsync(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); + Assert.AreEqual(0, ex2.Trailers.Count); + } + + [Test] + public void UnaryCall_ServerHandlerSetsStatusAndTrailers() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + context.Status = new Status(StatusCode.Unauthenticated, ""); + context.ResponseTrailers.Add("xyz", "xyz-value"); + return ""; + }); + + var ex = Assert.Throws(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "abc")); + Assert.AreEqual(StatusCode.Unauthenticated, ex.Status.StatusCode); + Assert.AreEqual(1, ex.Trailers.Count); + Assert.AreEqual("xyz", ex.Trailers[0].Key); + Assert.AreEqual("xyz-value", ex.Trailers[0].Value); var ex2 = Assert.ThrowsAsync(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "abc")); Assert.AreEqual(StatusCode.Unauthenticated, ex2.Status.StatusCode); + Assert.AreEqual(1, ex2.Trailers.Count); + Assert.AreEqual("xyz", ex2.Trailers[0].Key); + Assert.AreEqual("xyz-value", ex2.Trailers[0].Value); } [Test] @@ -148,7 +197,7 @@ namespace Grpc.Core.Tests CollectionAssert.AreEqual(new string[] { "A", "B", "C" }, await call.ResponseStream.ToListAsync()); Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); - Assert.IsNotNull("xyz", call.GetTrailers()[0].Key); + Assert.AreEqual("xyz", call.GetTrailers()[0].Key); } [Test] @@ -182,6 +231,27 @@ namespace Grpc.Core.Tests Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode); } + [Test] + public async Task ServerStreamingCall_TrailersFromMultipleSourcesGetConcatenated() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) => + { + context.ResponseTrailers.Add("xyz", "xyz-value"); + throw new RpcException(new Status(StatusCode.InvalidArgument, ""), new Metadata { {"abc", "abc-value"} }); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + + var ex = Assert.ThrowsAsync(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode); + Assert.AreEqual(2, call.GetTrailers().Count); + Assert.AreEqual(2, ex.Trailers.Count); + Assert.AreEqual("xyz", ex.Trailers[0].Key); + Assert.AreEqual("xyz-value", ex.Trailers[0].Value); + Assert.AreEqual("abc", ex.Trailers[1].Key); + Assert.AreEqual("abc-value", ex.Trailers[1].Value); + } + [Test] public async Task DuplexStreamingCall() { @@ -199,7 +269,7 @@ namespace Grpc.Core.Tests CollectionAssert.AreEqual(new string[] { "A", "B", "C" }, await call.ResponseStream.ToListAsync()); Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode); - Assert.IsNotNull("xyz-value", call.GetTrailers()[0].Value); + Assert.AreEqual("xyz-value", call.GetTrailers()[0].Value); } [Test] From 40ea2d3a046c7ad76685183b546be7b1b94fa501 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 9 Aug 2017 10:23:15 +0200 Subject: [PATCH 09/17] improve examples in CustomErrorDetails test --- .../CustomErrorDetailsTest.cs | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs b/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs index be996f91e03..374c6fc23f0 100644 --- a/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/CustomErrorDetailsTest.cs @@ -65,7 +65,7 @@ namespace Grpc.IntegrationTesting } [Test] - public async Task UnaryCall() + public async Task ErrorDetailsFromCallObject() { var call = client.UnaryCallAsync(new SimpleRequest { ResponseSize = 10 }); @@ -83,7 +83,24 @@ namespace Grpc.IntegrationTesting } } - private DebugInfo GetDebugInfo(Metadata trailers) + [Test] + public async Task ErrorDetailsFromRpcException() + { + try + { + await client.UnaryCallAsync(new SimpleRequest { ResponseSize = 10 }); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + var debugInfo = GetDebugInfo(e.Trailers); + Assert.AreEqual(debugInfo.Detail, ExceptionDetail); + Assert.IsNotEmpty(debugInfo.StackEntries); + } + } + + private static DebugInfo GetDebugInfo(Metadata trailers) { var entry = trailers.First((e) => e.Key == DebugInfoTrailerName); return DebugInfo.Parser.ParseFrom(entry.ValueBytes); From a0a7b57ec08b0697892555930aaf700da8517252 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 9 Aug 2017 10:33:07 -0700 Subject: [PATCH 10/17] Fix use-after-free in oauth2_credentials --- .../security/credentials/oauth2/oauth2_credentials.c | 12 +++++++++++- .../security/credentials/oauth2/oauth2_credentials.h | 2 ++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index ffa941bb9e9..c59e55136cb 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -109,6 +109,8 @@ static void oauth2_token_fetcher_destruct(grpc_exec_ctx *exec_ctx, (grpc_oauth2_token_fetcher_credentials *)creds; GRPC_MDELEM_UNREF(exec_ctx, c->access_token_md); gpr_mu_destroy(&c->mu); + grpc_pollset_set_destroy(exec_ctx, + grpc_polling_entity_pollset_set(&c->pollent)); grpc_httpcli_context_destroy(exec_ctx, &c->httpcli_context); } @@ -238,6 +240,9 @@ static void on_oauth2_token_fetcher_http_response(grpc_exec_ctx *exec_ctx, "Error occured when fetching oauth2 token.", &error, 1); } GRPC_CLOSURE_SCHED(exec_ctx, pending_request->on_request_metadata, error); + grpc_polling_entity_del_from_pollset_set( + exec_ctx, pending_request->pollent, + grpc_polling_entity_pollset_set(&c->pollent)); grpc_oauth2_pending_get_request_metadata *prev = pending_request; pending_request = pending_request->next; gpr_free(prev); @@ -278,6 +283,9 @@ static bool oauth2_token_fetcher_get_request_metadata( sizeof(*pending_request)); pending_request->md_array = md_array; pending_request->on_request_metadata = on_request_metadata; + pending_request->pollent = pollent; + grpc_polling_entity_add_to_pollset_set( + exec_ctx, pollent, grpc_polling_entity_pollset_set(&c->pollent)); pending_request->next = c->pending_requests; c->pending_requests = pending_request; bool start_fetch = false; @@ -289,7 +297,7 @@ static bool oauth2_token_fetcher_get_request_metadata( if (start_fetch) { grpc_call_credentials_ref(creds); c->fetch_func(exec_ctx, grpc_credentials_metadata_request_create(creds), - &c->httpcli_context, pollent, + &c->httpcli_context, &c->pollent, on_oauth2_token_fetcher_http_response, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold)); } @@ -334,6 +342,8 @@ static void init_oauth2_token_fetcher(grpc_oauth2_token_fetcher_credentials *c, gpr_mu_init(&c->mu); c->token_expiration = gpr_inf_past(GPR_CLOCK_REALTIME); c->fetch_func = fetch_func; + c->pollent = + grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); grpc_httpcli_context_init(&c->httpcli_context); } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index 9d041a20eaf..d9ad6691b84 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -62,6 +62,7 @@ typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx, typedef struct grpc_oauth2_pending_get_request_metadata { grpc_credentials_mdelem_array *md_array; grpc_closure *on_request_metadata; + grpc_polling_entity *pollent; struct grpc_oauth2_pending_get_request_metadata *next; } grpc_oauth2_pending_get_request_metadata; @@ -74,6 +75,7 @@ typedef struct { grpc_oauth2_pending_get_request_metadata *pending_requests; grpc_httpcli_context httpcli_context; grpc_fetch_oauth2_func fetch_func; + grpc_polling_entity pollent; } grpc_oauth2_token_fetcher_credentials; // Google refresh token credentials. From 9dc182161dfb35ed8f7bb45150bb53e097fcebe1 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 9 Aug 2017 19:36:50 +0200 Subject: [PATCH 11/17] better name for StatusFromException --- src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 638a8d8a1fe..6019f8e7934 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -76,7 +76,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); + status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } try { @@ -133,7 +133,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); + status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } try @@ -191,7 +191,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); + status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } try @@ -247,7 +247,7 @@ namespace Grpc.Core.Internal { Logger.Warning(e, "Exception occured in handler."); } - status = HandlerUtils.StatusFromException(e, context.ResponseTrailers); + status = HandlerUtils.GetStatusFromExceptionAndMergeTrailers(e, context.ResponseTrailers); } try { @@ -292,7 +292,7 @@ namespace Grpc.Core.Internal internal static class HandlerUtils { - public static Status StatusFromException(Exception e, Metadata callContextResponseTrailers) + public static Status GetStatusFromExceptionAndMergeTrailers(Exception e, Metadata callContextResponseTrailers) { var rpcException = e as RpcException; if (rpcException != null) From 25d6fd82d13ea8363ce09bc482e1f3365084d2a2 Mon Sep 17 00:00:00 2001 From: jiangtaoli2016 Date: Wed, 9 Aug 2017 11:15:04 -0700 Subject: [PATCH 12/17] split tsi library into two --- BUILD | 26 +++++++++++----- CMakeLists.txt | 4 +-- Makefile | 4 +-- binding.gyp | 2 +- build.yaml | 21 +++++++++---- config.m4 | 2 +- config.w32 | 2 +- gRPC-Core.podspec | 6 ++-- grpc.gemspec | 4 +-- package.xml | 4 +-- src/python/grpcio/grpc_core_dependencies.py | 2 +- .../generated/sources_and_headers.json | 30 ++++++++++++++----- vsprojects/vcxproj/grpc/grpc.vcxproj | 6 ++-- vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 12 ++++---- 14 files changed, 81 insertions(+), 44 deletions(-) diff --git a/BUILD b/BUILD index 19d019bc51d..49c340d070c 100644 --- a/BUILD +++ b/BUILD @@ -1411,14 +1411,30 @@ grpc_cc_library( ], ) +grpc_cc_library( + name = "tsi_interface", + srcs = [ + "src/core/tsi/transport_security.c", + "src/core/tsi/transport_security_adapter.c", + ], + hdrs = [ + "src/core/tsi/transport_security.h", + "src/core/tsi/transport_security_adapter.h", + "src/core/tsi/transport_security_interface.h", + ], + language = "c", + deps = [ + "gpr", + "grpc_trace", + ], +) + grpc_cc_library( name = "tsi", srcs = [ "src/core/tsi/fake_transport_security.c", "src/core/tsi/gts_transport_security.c", "src/core/tsi/ssl_transport_security.c", - "src/core/tsi/transport_security.c", - "src/core/tsi/transport_security_adapter.c", "src/core/tsi/transport_security_grpc.c", ], hdrs = [ @@ -1426,19 +1442,15 @@ grpc_cc_library( "src/core/tsi/gts_transport_security.h", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", - "src/core/tsi/transport_security.h", - "src/core/tsi/transport_security_adapter.h", "src/core/tsi/transport_security_grpc.h", - "src/core/tsi/transport_security_interface.h", ], external_deps = [ "libssl", ], language = "c", deps = [ - "gpr", "grpc_base", - "grpc_trace", + "tsi_interface", ], ) diff --git a/CMakeLists.txt b/CMakeLists.txt index 128f44c277d..8dc4758d238 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1132,9 +1132,9 @@ add_library(grpc src/core/tsi/fake_transport_security.c src/core/tsi/gts_transport_security.c src/core/tsi/ssl_transport_security.c + src/core/tsi/transport_security_grpc.c src/core/tsi/transport_security.c src/core/tsi/transport_security_adapter.c - src/core/tsi/transport_security_grpc.c src/core/ext/transport/chttp2/server/chttp2_server.c src/core/ext/transport/chttp2/client/secure/secure_channel_create.c src/core/ext/filters/client_channel/channel_connectivity.c @@ -1504,9 +1504,9 @@ add_library(grpc_cronet src/core/tsi/fake_transport_security.c src/core/tsi/gts_transport_security.c src/core/tsi/ssl_transport_security.c + src/core/tsi/transport_security_grpc.c src/core/tsi/transport_security.c src/core/tsi/transport_security_adapter.c - src/core/tsi/transport_security_grpc.c src/core/ext/transport/chttp2/client/chttp2_connector.c src/core/ext/filters/load_reporting/load_reporting.c src/core/ext/filters/load_reporting/load_reporting_filter.c diff --git a/Makefile b/Makefile index 0d5dad2a4b1..74f05f592dc 100644 --- a/Makefile +++ b/Makefile @@ -3079,9 +3079,9 @@ LIBGRPC_SRC = \ src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ - src/core/tsi/transport_security_grpc.c \ src/core/ext/transport/chttp2/server/chttp2_server.c \ src/core/ext/transport/chttp2/client/secure/secure_channel_create.c \ src/core/ext/filters/client_channel/channel_connectivity.c \ @@ -3449,9 +3449,9 @@ LIBGRPC_CRONET_SRC = \ src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ - src/core/tsi/transport_security_grpc.c \ src/core/ext/transport/chttp2/client/chttp2_connector.c \ src/core/ext/filters/load_reporting/load_reporting.c \ src/core/ext/filters/load_reporting/load_reporting_filter.c \ diff --git a/binding.gyp b/binding.gyp index 226e745844c..bbefd05c20b 100644 --- a/binding.gyp +++ b/binding.gyp @@ -841,9 +841,9 @@ 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', - 'src/core/tsi/transport_security_grpc.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', 'src/core/ext/filters/client_channel/channel_connectivity.c', diff --git a/build.yaml b/build.yaml index 0c326fee028..a459b9d6883 100644 --- a/build.yaml +++ b/build.yaml @@ -925,24 +925,33 @@ filegroups: - src/core/tsi/gts_transport_security.h - src/core/tsi/ssl_transport_security.h - src/core/tsi/ssl_types.h - - src/core/tsi/transport_security.h - - src/core/tsi/transport_security_adapter.h - src/core/tsi/transport_security_grpc.h - - src/core/tsi/transport_security_interface.h src: - src/core/tsi/fake_transport_security.c - src/core/tsi/gts_transport_security.c - src/core/tsi/ssl_transport_security.c - - src/core/tsi/transport_security.c - - src/core/tsi/transport_security_adapter.c - src/core/tsi/transport_security_grpc.c deps: - gpr plugin: grpc_tsi_gts secure: true uses: - - grpc_trace + - tsi_interface - grpc_base + - grpc_trace +- name: tsi_interface + headers: + - src/core/tsi/transport_security.h + - src/core/tsi/transport_security_adapter.h + - src/core/tsi/transport_security_interface.h + src: + - src/core/tsi/transport_security.c + - src/core/tsi/transport_security_adapter.c + deps: + - gpr + secure: true + uses: + - grpc_trace - name: grpc++_codegen_base language: c++ public_headers: diff --git a/config.m4 b/config.m4 index 066339423c2..f6f8531b2fd 100644 --- a/config.m4 +++ b/config.m4 @@ -270,9 +270,9 @@ if test "$PHP_GRPC" != "no"; then src/core/tsi/fake_transport_security.c \ src/core/tsi/gts_transport_security.c \ src/core/tsi/ssl_transport_security.c \ + src/core/tsi/transport_security_grpc.c \ src/core/tsi/transport_security.c \ src/core/tsi/transport_security_adapter.c \ - src/core/tsi/transport_security_grpc.c \ src/core/ext/transport/chttp2/server/chttp2_server.c \ src/core/ext/transport/chttp2/client/secure/secure_channel_create.c \ src/core/ext/filters/client_channel/channel_connectivity.c \ diff --git a/config.w32 b/config.w32 index 12db12cae87..1d1a0a4b63f 100644 --- a/config.w32 +++ b/config.w32 @@ -247,9 +247,9 @@ if (PHP_GRPC != "no") { "src\\core\\tsi\\fake_transport_security.c " + "src\\core\\tsi\\gts_transport_security.c " + "src\\core\\tsi\\ssl_transport_security.c " + + "src\\core\\tsi\\transport_security_grpc.c " + "src\\core\\tsi\\transport_security.c " + "src\\core\\tsi\\transport_security_adapter.c " + - "src\\core\\tsi\\transport_security_grpc.c " + "src\\core\\ext\\transport\\chttp2\\server\\chttp2_server.c " + "src\\core\\ext\\transport\\chttp2\\client\\secure\\secure_channel_create.c " + "src\\core\\ext\\filters\\client_channel\\channel_connectivity.c " + diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 0815e83467b..4b1a8f38a34 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -290,9 +290,9 @@ Pod::Spec.new do |s| 'src/core/tsi/gts_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/ssl_types.h', + 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_adapter.h', - 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security_interface.h', 'src/core/ext/transport/chttp2/server/chttp2_server.h', 'src/core/ext/filters/client_channel/client_channel.h', @@ -649,9 +649,9 @@ Pod::Spec.new do |s| 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', - 'src/core/tsi/transport_security_grpc.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', 'src/core/ext/filters/client_channel/channel_connectivity.c', @@ -784,9 +784,9 @@ Pod::Spec.new do |s| 'src/core/tsi/gts_transport_security.h', 'src/core/tsi/ssl_transport_security.h', 'src/core/tsi/ssl_types.h', + 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security.h', 'src/core/tsi/transport_security_adapter.h', - 'src/core/tsi/transport_security_grpc.h', 'src/core/tsi/transport_security_interface.h', 'src/core/ext/transport/chttp2/server/chttp2_server.h', 'src/core/ext/filters/client_channel/client_channel.h', diff --git a/grpc.gemspec b/grpc.gemspec index 089d6d32143..f04a14167b1 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -222,9 +222,9 @@ Gem::Specification.new do |s| s.files += %w( src/core/tsi/gts_transport_security.h ) s.files += %w( src/core/tsi/ssl_transport_security.h ) s.files += %w( src/core/tsi/ssl_types.h ) + s.files += %w( src/core/tsi/transport_security_grpc.h ) s.files += %w( src/core/tsi/transport_security.h ) s.files += %w( src/core/tsi/transport_security_adapter.h ) - s.files += %w( src/core/tsi/transport_security_grpc.h ) s.files += %w( src/core/tsi/transport_security_interface.h ) s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.h ) s.files += %w( src/core/ext/filters/client_channel/client_channel.h ) @@ -581,9 +581,9 @@ Gem::Specification.new do |s| s.files += %w( src/core/tsi/fake_transport_security.c ) s.files += %w( src/core/tsi/gts_transport_security.c ) s.files += %w( src/core/tsi/ssl_transport_security.c ) + s.files += %w( src/core/tsi/transport_security_grpc.c ) s.files += %w( src/core/tsi/transport_security.c ) s.files += %w( src/core/tsi/transport_security_adapter.c ) - s.files += %w( src/core/tsi/transport_security_grpc.c ) s.files += %w( src/core/ext/transport/chttp2/server/chttp2_server.c ) s.files += %w( src/core/ext/transport/chttp2/client/secure/secure_channel_create.c ) s.files += %w( src/core/ext/filters/client_channel/channel_connectivity.c ) diff --git a/package.xml b/package.xml index 09f549832d4..4e288b3c682 100644 --- a/package.xml +++ b/package.xml @@ -236,9 +236,9 @@ + - @@ -595,9 +595,9 @@ + - diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index bf831d063bf..dc4d28f95bd 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -246,9 +246,9 @@ CORE_SOURCE_FILES = [ 'src/core/tsi/fake_transport_security.c', 'src/core/tsi/gts_transport_security.c', 'src/core/tsi/ssl_transport_security.c', + 'src/core/tsi/transport_security_grpc.c', 'src/core/tsi/transport_security.c', 'src/core/tsi/transport_security_adapter.c', - 'src/core/tsi/transport_security_grpc.c', 'src/core/ext/transport/chttp2/server/chttp2_server.c', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', 'src/core/ext/filters/client_channel/channel_connectivity.c', diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index c1d68d273f5..30fef6fc0a5 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9095,17 +9095,15 @@ "deps": [ "gpr", "grpc_base", - "grpc_trace" + "grpc_trace", + "tsi_interface" ], "headers": [ "src/core/tsi/fake_transport_security.h", "src/core/tsi/gts_transport_security.h", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", - "src/core/tsi/transport_security.h", - "src/core/tsi/transport_security_adapter.h", - "src/core/tsi/transport_security_grpc.h", - "src/core/tsi/transport_security_interface.h" + "src/core/tsi/transport_security_grpc.h" ], "is_filegroup": true, "language": "c", @@ -9118,12 +9116,30 @@ "src/core/tsi/ssl_transport_security.c", "src/core/tsi/ssl_transport_security.h", "src/core/tsi/ssl_types.h", + "src/core/tsi/transport_security_grpc.c", + "src/core/tsi/transport_security_grpc.h" + ], + "third_party": false, + "type": "filegroup" + }, + { + "deps": [ + "gpr", + "grpc_trace" + ], + "headers": [ + "src/core/tsi/transport_security.h", + "src/core/tsi/transport_security_adapter.h", + "src/core/tsi/transport_security_interface.h" + ], + "is_filegroup": true, + "language": "c", + "name": "tsi_interface", + "src": [ "src/core/tsi/transport_security.c", "src/core/tsi/transport_security.h", "src/core/tsi/transport_security_adapter.c", "src/core/tsi/transport_security_adapter.h", - "src/core/tsi/transport_security_grpc.c", - "src/core/tsi/transport_security_grpc.h", "src/core/tsi/transport_security_interface.h" ], "third_party": false, diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 6eab48c62b4..8a659280a46 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -347,9 +347,9 @@ + - @@ -893,12 +893,12 @@ + + - - diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index eaae1c6693a..6879047f55b 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -556,13 +556,13 @@ src\core\tsi - + src\core\tsi - + src\core\tsi - + src\core\tsi @@ -1013,13 +1013,13 @@ src\core\tsi - + src\core\tsi - + src\core\tsi - + src\core\tsi From 309c9de041c0c0d492bbbe3bf4dd5a46830a77b1 Mon Sep 17 00:00:00 2001 From: Yuchen Zeng Date: Wed, 9 Aug 2017 16:12:57 -0700 Subject: [PATCH 13/17] Update credentials_test --- test/core/security/credentials_test.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index e60e398767e..441c4311353 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -311,6 +311,7 @@ typedef struct { grpc_credentials_mdelem_array md_array; grpc_closure on_request_metadata; grpc_call_credentials *creds; + grpc_polling_entity pollent; } request_metadata_state; static void check_metadata(const expected_md *expected, @@ -355,6 +356,8 @@ static void check_request_metadata(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(state->md_array.size == state->expected_size); check_metadata(state->expected, &state->md_array); grpc_credentials_mdelem_array_destroy(exec_ctx, &state->md_array); + grpc_pollset_set_destroy(exec_ctx, + grpc_polling_entity_pollset_set(&state->pollent)); gpr_free(state); } @@ -365,6 +368,8 @@ static request_metadata_state *make_request_metadata_state( state->expected_error = expected_error; state->expected = expected; state->expected_size = expected_size; + state->pollent = + grpc_polling_entity_create_from_pollset_set(grpc_pollset_set_create()); GRPC_CLOSURE_INIT(&state->on_request_metadata, check_request_metadata, state, grpc_schedule_on_exec_ctx); return state; @@ -376,7 +381,7 @@ static void run_request_metadata_test(grpc_exec_ctx *exec_ctx, request_metadata_state *state) { grpc_error *error = GRPC_ERROR_NONE; if (grpc_call_credentials_get_request_metadata( - exec_ctx, creds, NULL, auth_md_ctx, &state->md_array, + exec_ctx, creds, &state->pollent, auth_md_ctx, &state->md_array, &state->on_request_metadata, &error)) { // Synchronous result. Invoke the callback directly. check_request_metadata(exec_ctx, state, error); From e1418e4ab10344469bc975c32f340f07a3becb45 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 9 Aug 2017 22:03:16 -0700 Subject: [PATCH 14/17] Switch to idiomatic Slice API --- test/cpp/qps/server_async.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 122976c397f..8b00bcfeaf5 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -550,8 +550,7 @@ static Status ProcessGenericRPC(const PayloadConfig &payload_config, ByteBuffer *response) { int resp_size = payload_config.bytebuf_params().resp_size(); std::unique_ptr buf(new char[resp_size]); - grpc_slice s = grpc_slice_from_copied_buffer(buf.get(), resp_size); - Slice slice(s, Slice::STEAL_REF); + Slice slice(buf.get(), resp_size); *response = ByteBuffer(&slice, 1); return Status::OK; } From bd34477e6ea1ca1d320c1921d3d90f220c89e2b4 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Wed, 9 Aug 2017 23:00:47 -0700 Subject: [PATCH 15/17] Address PR feedback --- src/core/lib/iomgr/ev_poll_posix.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 73e9c5203d4..9472a8e5208 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -265,9 +265,9 @@ typedef struct poll_args { struct poll_args *prev; } poll_args; -// This is a 2-tiered cash, we mantain a hash table +// This is a 2-tiered cache, we mantain a hash table // of active poll calls, so we can wait on the result -// of that call. We also maintain a freelist of innactive +// of that call. We also maintain a freelist of inactive // poll threads. typedef struct poll_hash_table { poll_args *free_pollers; @@ -1315,7 +1315,7 @@ static void cache_insert_locked(poll_args *args) { poll_cache.count++; } -void init_result(poll_args *pargs) { +static void init_result(poll_args *pargs) { pargs->result = gpr_malloc(sizeof(poll_result)); gpr_ref_init(&pargs->result->refcount, 1); pargs->result->watchers = NULL; From f602869c9719d7de1cd09ef1f9c140df1ef15eef Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 10 Aug 2017 10:14:39 +0200 Subject: [PATCH 16/17] improve comment --- src/csharp/Grpc.Core/GrpcEnvironment.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index cf5e47e3d56..cbc7d2078c3 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -239,6 +239,7 @@ namespace Grpc.Core /// /// Occurs when GrpcEnvironment is about the start the shutdown logic. + /// If GrpcEnvironment is later initialized and shutdown, the event will be fired again (unless unregistered first). /// public static event EventHandler ShuttingDown; From 7fb682f3bd1d41b437a31f05b5e26b8e273ddf40 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 10 Aug 2017 13:39:35 +0200 Subject: [PATCH 17/17] add windows portability build_only job --- ...c_portability_master.cfg => grpc_portability_build_only.cfg} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename tools/internal_ci/windows/{grpc_portability_master.cfg => grpc_portability_build_only.cfg} (93%) diff --git a/tools/internal_ci/windows/grpc_portability_master.cfg b/tools/internal_ci/windows/grpc_portability_build_only.cfg similarity index 93% rename from tools/internal_ci/windows/grpc_portability_master.cfg rename to tools/internal_ci/windows/grpc_portability_build_only.cfg index c395cb4a949..b2b58ece2d7 100644 --- a/tools/internal_ci/windows/grpc_portability_master.cfg +++ b/tools/internal_ci/windows/grpc_portability_build_only.cfg @@ -26,5 +26,5 @@ action { env_vars { key: "RUN_TESTS_FLAGS" - value: "-f portability windows -j 1 --inner_jobs 8 --internal_ci" + value: "-f portability windows --internal_ci --build_only" }