From ccdea1900fdad3d507617c8b1b639c7f5914d06b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 16 Feb 2016 08:06:46 -0800 Subject: [PATCH 1/9] Separate timer checking from pollsets --- BUILD | 3 - build.yaml | 1 - gRPC.podspec | 2 - grpc.gemspec | 1 - package.json | 1 - src/core/iomgr/iocp_windows.c | 2 +- src/core/iomgr/iomgr.c | 2 +- src/core/iomgr/pollset_posix.c | 11 ---- src/core/iomgr/pollset_windows.c | 4 -- src/core/iomgr/timer.c | 1 - src/core/iomgr/timer.h | 20 ++++++ src/core/iomgr/timer_internal.h | 61 ------------------- src/core/surface/completion_queue.c | 28 ++++++++- test/core/iomgr/timer_list_test.c | 1 - tools/doxygen/Doxyfile.core.internal | 1 - tools/run_tests/sources_and_headers.json | 4 -- vsprojects/vcxproj/grpc/grpc.vcxproj | 1 - vsprojects/vcxproj/grpc/grpc.vcxproj.filters | 3 - .../grpc_unsecure/grpc_unsecure.vcxproj | 1 - .../grpc_unsecure.vcxproj.filters | 3 - 20 files changed, 48 insertions(+), 103 deletions(-) delete mode 100644 src/core/iomgr/timer_internal.h diff --git a/BUILD b/BUILD index 72d5caa8d43..24e3e540661 100644 --- a/BUILD +++ b/BUILD @@ -231,7 +231,6 @@ cc_library( "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", @@ -533,7 +532,6 @@ cc_library( "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", @@ -1490,7 +1488,6 @@ objc_library( "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", diff --git a/build.yaml b/build.yaml index 7f33ef3f0e5..f8fc4883832 100644 --- a/build.yaml +++ b/build.yaml @@ -307,7 +307,6 @@ filegroups: - src/core/iomgr/time_averaged_stats.h - src/core/iomgr/timer.h - src/core/iomgr/timer_heap.h - - src/core/iomgr/timer_internal.h - src/core/iomgr/udp_server.h - src/core/iomgr/wakeup_fd_pipe.h - src/core/iomgr/wakeup_fd_posix.h diff --git a/gRPC.podspec b/gRPC.podspec index 5b4d24e4820..13c303a8c7b 100644 --- a/gRPC.podspec +++ b/gRPC.podspec @@ -235,7 +235,6 @@ Pod::Spec.new do |s| 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/timer.h', 'src/core/iomgr/timer_heap.h', - 'src/core/iomgr/timer_internal.h', 'src/core/iomgr/udp_server.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', @@ -541,7 +540,6 @@ Pod::Spec.new do |s| 'src/core/iomgr/time_averaged_stats.h', 'src/core/iomgr/timer.h', 'src/core/iomgr/timer_heap.h', - 'src/core/iomgr/timer_internal.h', 'src/core/iomgr/udp_server.h', 'src/core/iomgr/wakeup_fd_pipe.h', 'src/core/iomgr/wakeup_fd_posix.h', diff --git a/grpc.gemspec b/grpc.gemspec index 32fe4932a9f..4485b440d6a 100755 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -231,7 +231,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/iomgr/time_averaged_stats.h ) s.files += %w( src/core/iomgr/timer.h ) s.files += %w( src/core/iomgr/timer_heap.h ) - s.files += %w( src/core/iomgr/timer_internal.h ) s.files += %w( src/core/iomgr/udp_server.h ) s.files += %w( src/core/iomgr/wakeup_fd_pipe.h ) s.files += %w( src/core/iomgr/wakeup_fd_posix.h ) diff --git a/package.json b/package.json index 8cbfb290559..3042c91680e 100644 --- a/package.json +++ b/package.json @@ -176,7 +176,6 @@ "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 759340e00ef..807729708ea 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -42,7 +42,7 @@ #include #include -#include "src/core/iomgr/timer_internal.h" +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_windows.h" diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 212ce5534dd..3283b586b06 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,7 +43,7 @@ #include #include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/timer_internal.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" static gpr_mu g_mu; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 19ee6650f00..1063727248a 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,7 +42,6 @@ #include #include -#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/fd_posix.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_utils_posix.h" @@ -274,16 +273,6 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); goto done; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ - if (grpc_timer_check(exec_ctx, now, &deadline)) { - GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0); - gpr_mu_unlock(&pollset->mu); - locked = 0; - goto done; - } /* If we're shutting down then we don't execute any extended work */ if (pollset->shutting_down) { GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 02c66783631..35a956b27fd 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -38,7 +38,6 @@ #include #include -#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" @@ -136,9 +135,6 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker->kicked = 0; worker->pollset = pollset; gpr_cv_init(&worker->cv); - if (grpc_timer_check(exec_ctx, now, &deadline)) { - goto done; - } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index a33d8f63a05..5e7fadb7906 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -34,7 +34,6 @@ #include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer_heap.h" -#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/time_averaged_stats.h" #include #include diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 720c9d5ab94..2f74b6e5d3d 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -86,4 +86,24 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, Requires: cancel() must happen after add() on a given timer */ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); +/* iomgr internal api for dealing with timers */ + +/* Check for timers to be run, and run them. + Return non zero if timer callbacks were executed. + Drops drop_mu if it is non-null before executing callbacks. + If next is non-null, TRY to update *next with the next running timer + IF that timer occurs before *next current value. + *next is never guaranteed to be updated on any given execution; however, + with high probability at least one thread in the system will see an update + at any time slice. */ + +int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, + gpr_timespec* next); +void grpc_timer_list_init(gpr_timespec now); +void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); + +/* the following must be implemented by each iomgr implementation */ + +void grpc_kick_poller(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_H */ diff --git a/src/core/iomgr/timer_internal.h b/src/core/iomgr/timer_internal.h deleted file mode 100644 index f182e737646..00000000000 --- a/src/core/iomgr/timer_internal.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H -#define GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H - -#include "src/core/iomgr/exec_ctx.h" -#include -#include - -/* iomgr internal api for dealing with timers */ - -/* Check for timers to be run, and run them. - Return non zero if timer callbacks were executed. - Drops drop_mu if it is non-null before executing callbacks. - If next is non-null, TRY to update *next with the next running timer - IF that timer occurs before *next current value. - *next is never guaranteed to be updated on any given execution; however, - with high probability at least one thread in the system will see an update - at any time slice. */ - -int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, - gpr_timespec* next); -void grpc_timer_list_init(gpr_timespec now); -void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); - -/* the following must be implemented by each iomgr implementation */ - -void grpc_kick_poller(void); - -#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 75298eb795c..6597c83cdca 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -323,7 +323,19 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } first_loop = 0; - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline); + /* Check alarms - these are a global resource so we just ping + each time through on every pollset. + May update deadline to ensure timely wakeups. + TODO(ctiller): can this work be localized? */ + gpr_timespec iteration_deadline = deadline; + if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { + GPR_TIMER_MARK("alarm_triggered", 0); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_exec_ctx_flush(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + continue; + } + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -427,7 +439,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, break; } first_loop = 0; - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline); + /* Check alarms - these are a global resource so we just ping + each time through on every pollset. + May update deadline to ensure timely wakeups. + TODO(ctiller): can this work be localized? */ + gpr_timespec iteration_deadline = deadline; + if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { + GPR_TIMER_MARK("alarm_triggered", 0); + gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_exec_ctx_flush(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + continue; + } + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); del_plucker(cc, tag, &worker); } done: diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 15de87c5a1a..487527fbf5d 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -35,7 +35,6 @@ #include -#include "src/core/iomgr/timer_internal.h" #include #include "test/core/util/test_config.h" diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index b6268432335..ffc40dfc19b 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -850,7 +850,6 @@ src/core/iomgr/tcp_windows.h \ src/core/iomgr/time_averaged_stats.h \ src/core/iomgr/timer.h \ src/core/iomgr/timer_heap.h \ -src/core/iomgr/timer_internal.h \ src/core/iomgr/udp_server.h \ src/core/iomgr/wakeup_fd_pipe.h \ src/core/iomgr/wakeup_fd_posix.h \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 6538ddc37e1..fdba0417ca6 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -3032,7 +3032,6 @@ "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", @@ -3251,7 +3250,6 @@ "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.c", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.c", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_eventfd.c", @@ -3557,7 +3555,6 @@ "src/core/iomgr/time_averaged_stats.h", "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_pipe.h", "src/core/iomgr/wakeup_fd_posix.h", @@ -3760,7 +3757,6 @@ "src/core/iomgr/timer.h", "src/core/iomgr/timer_heap.c", "src/core/iomgr/timer_heap.h", - "src/core/iomgr/timer_internal.h", "src/core/iomgr/udp_server.c", "src/core/iomgr/udp_server.h", "src/core/iomgr/wakeup_fd_eventfd.c", diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj index 76975322be8..8e8f29aee99 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj @@ -359,7 +359,6 @@ - diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters index 4660572f979..e55c6673cea 100644 --- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters +++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters @@ -734,9 +734,6 @@ src\core\iomgr - - src\core\iomgr - src\core\iomgr diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj index 541000af404..af89435885c 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj @@ -335,7 +335,6 @@ - diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters index 48814f997e1..809ea59c5f4 100644 --- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters @@ -629,9 +629,6 @@ src\core\iomgr - - src\core\iomgr - src\core\iomgr From eb62c943389d4b34e8a246a8fe63783aed885a01 Mon Sep 17 00:00:00 2001 From: yang-g Date: Wed, 17 Feb 2016 15:37:25 -0800 Subject: [PATCH 2/9] Add a way to override channel arguments for server creation --- include/grpc++/server.h | 6 ++++-- src/cpp/server/server.cc | 14 ++++++-------- src/cpp/server/server_builder.cc | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/grpc++/server.h b/include/grpc++/server.h index 2a71073a7e2..c177805236b 100644 --- a/include/grpc++/server.h +++ b/include/grpc++/server.h @@ -79,6 +79,8 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary { class GlobalCallbacks { public: virtual ~GlobalCallbacks() {} + /// Called before server is created. + virtual void UpdateArguments(ChannelArguments* args) {} /// Called before application callback for each synchronous server request virtual void PreSynchronousRequest(ServerContext* context) = 0; /// Called after application callback for each synchronous server request @@ -108,7 +110,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary { /// \param max_message_size Maximum message length that the channel can /// receive. Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size, const ChannelArguments& args); + int max_message_size, ChannelArguments* args); /// Register a service. This call does not take ownership of the service. /// The service must exist for the lifetime of the Server instance. @@ -177,7 +179,7 @@ class Server GRPC_FINAL : public ServerInterface, private GrpcLibrary { bool has_generic_service_; // Pointer to the c grpc server. - grpc_server* const server_; + grpc_server* server_; ThreadPoolInterface* thread_pool_; // Whether the thread pool is created and owned by the server. diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 0d31140924b..6d31a608c80 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -272,27 +272,25 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -static grpc_server* CreateServer(const ChannelArguments& args) { - grpc_channel_args channel_args; - args.SetChannelArgs(&channel_args); - return grpc_server_create(&channel_args, nullptr); -} - static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size, const ChannelArguments& args) + int max_message_size, ChannelArguments* args) : max_message_size_(max_message_size), started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list), has_generic_service_(false), - server_(CreateServer(args)), + server_(nullptr), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; + global_callbacks_->UpdateArguments(args); + grpc_channel_args channel_args; + args->SetChannelArgs(&channel_args); + server_ = grpc_server_create(&channel_args, nullptr); grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c54cf6474f1..134e5f1d5ff 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -103,7 +103,7 @@ std::unique_ptr ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, compression_options_.enabled_algorithms_bitset); std::unique_ptr server( - new Server(thread_pool.release(), true, max_message_size_, args)); + new Server(thread_pool.release(), true, max_message_size_, &args)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr); From 311445fd32956e9383823400c82ce3fcd71b2b31 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Feb 2016 07:31:39 -0800 Subject: [PATCH 3/9] Fix tcp_client_posix_test --- src/core/iomgr/timer.c | 4 ++-- src/core/iomgr/timer.h | 8 ++++---- src/core/surface/completion_queue.c | 6 ++++-- test/core/iomgr/tcp_client_posix_test.c | 26 +++++++++++++++++-------- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 5e7fadb7906..8379fffad02 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -335,8 +335,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, return (int)n; } -int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { +bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_timers( exec_ctx, now, next, diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 2f74b6e5d3d..906255ddfb9 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -89,7 +89,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); /* iomgr internal api for dealing with timers */ /* Check for timers to be run, and run them. - Return non zero if timer callbacks were executed. + Return true if timer callbacks were executed. Drops drop_mu if it is non-null before executing callbacks. If next is non-null, TRY to update *next with the next running timer IF that timer occurs before *next current value. @@ -97,10 +97,10 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); with high probability at least one thread in the system will see an update at any time slice. */ -int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, - gpr_timespec* next); +bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); -void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); +void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); /* the following must be implemented by each iomgr implementation */ diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 6597c83cdca..de295ab9410 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -335,7 +335,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); continue; } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + iteration_deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -451,7 +452,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); continue; } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, iteration_deadline); + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + iteration_deadline); del_plucker(cc, tag, &worker); } done: diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 9725d8a3b64..b57478059f0 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -45,6 +45,7 @@ #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/iomgr/timer.h" #include "test/core/util/test_config.h" static grpc_pollset_set g_pollset_set; @@ -125,11 +126,13 @@ void test_succeeds(void) { gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + + grpc_exec_ctx_finish(&exec_ctx); } void test_fails(void) { @@ -159,14 +162,18 @@ void test_fails(void) { /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), test_deadline()); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec polling_deadline = test_deadline(); + if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); } void test_times_out(void) { @@ -243,15 +250,18 @@ void test_times_out(void) { GPR_ASSERT(g_connections_complete == connections_complete_before + is_after_deadline); } - grpc_pollset_work(&exec_ctx, &g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10); + if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); + } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + close(svr_fd); for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { close(client_fd[i]); From cb6c7f66d3b08eab81c3d51de4e489fe24c63e4c Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Feb 2016 08:02:45 -0800 Subject: [PATCH 4/9] Fix test --- test/core/iomgr/timer_list_test.c | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/test/core/iomgr/timer_list_test.c b/test/core/iomgr/timer_list_test.c index 487527fbf5d..7a21fdd5c10 100644 --- a/test/core/iomgr/timer_list_test.c +++ b/test/core/iomgr/timer_list_test.c @@ -71,20 +71,19 @@ static void add_test(void) { } /* collect timers. Only the first batch should be ready. */ - GPR_ASSERT(10 == grpc_timer_check(&exec_ctx, - gpr_time_add(start, gpr_time_from_millis( - 500, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(grpc_timer_check( + &exec_ctx, gpr_time_add(start, gpr_time_from_millis(500, GPR_TIMESPAN)), + NULL)); grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 20; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_timer_check(&exec_ctx, - gpr_time_add(start, gpr_time_from_millis( - 600, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(!grpc_timer_check( + &exec_ctx, + gpr_time_add(start, gpr_time_from_millis(600, GPR_TIMESPAN)), + NULL)); grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); @@ -92,20 +91,19 @@ static void add_test(void) { } /* collect the rest of the timers */ - GPR_ASSERT(10 == grpc_timer_check( - &exec_ctx, gpr_time_add(start, gpr_time_from_millis( - 1500, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(grpc_timer_check( + &exec_ctx, gpr_time_add(start, gpr_time_from_millis(1500, GPR_TIMESPAN)), + NULL)); grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_timer_check(&exec_ctx, - gpr_time_add(start, gpr_time_from_millis( - 1600, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(!grpc_timer_check( + &exec_ctx, + gpr_time_add(start, gpr_time_from_millis(1600, GPR_TIMESPAN)), + NULL)); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); From f315bc198009eff04a53e48cec9b9506c85a2417 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 18 Feb 2016 08:25:15 -0800 Subject: [PATCH 5/9] Fix copyrights --- src/core/iomgr/iomgr.c | 2 +- src/core/iomgr/timer.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 3283b586b06..04580150f3a 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 906255ddfb9..9ad1e92f42e 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without From 23a329838588eb3dc7bcfee365007c5194288912 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sun, 21 Feb 2016 22:43:21 -0800 Subject: [PATCH 6/9] Fix plucking problem --- src/core/surface/completion_queue.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index de295ab9410..0a80680f02e 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -333,10 +333,10 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - continue; + } else { + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + iteration_deadline); } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, - iteration_deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); GRPC_CQ_INTERNAL_UNREF(cc, "next"); @@ -450,10 +450,10 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - continue; + } else { + grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + iteration_deadline); } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, - iteration_deadline); del_plucker(cc, tag, &worker); } done: From ad0df7bf1fa9a2ad6302016cfbe0b86793f810c5 Mon Sep 17 00:00:00 2001 From: yang-g Date: Mon, 22 Feb 2016 10:00:20 -0800 Subject: [PATCH 7/9] Discard the read buffer on stream error --- src/core/transport/chttp2_transport.c | 5 +++++ test/cpp/end2end/async_end2end_test.cc | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 617d98875c3..c3efc36cc5d 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -1019,6 +1019,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { + while (stream_global->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + grpc_byte_stream_destroy(exec_ctx, bs); + } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a15cbd7ee2e..9ca3bf98f85 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -989,6 +989,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { if (server_try_cancel == CANCEL_AFTER_PROCESSING) { ServerTryCancel(&srv_ctx); + + // Client reads may fail bacause it is notified that the stream is + // cancelled. + ignore_cq_result = true; } // Client attemts to read the three messages from the server From 276e32d0fbbfab490fd26dcbfb4e65c3c87f31ae Mon Sep 17 00:00:00 2001 From: yang-g Date: Mon, 22 Feb 2016 13:15:30 -0800 Subject: [PATCH 8/9] Fix race between add_writing_stalled and destroy stream --- src/core/transport/chttp2/internal.h | 9 +++++++-- src/core/transport/chttp2/stream_lists.c | 22 ++++++++++++++++++---- src/core/transport/chttp2/writing.c | 10 +++++----- src/core/transport/chttp2_transport.c | 2 +- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 0e1e2c42650..d76d31be23f 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -485,7 +485,8 @@ struct grpc_chttp2_stream { /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, +int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing, int is_parsing); void grpc_chttp2_perform_writes( @@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, bool is_window_available); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, + bool is_window_available); +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 2f31a47cb3d..b284c788183 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), + grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { + GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); + } + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, bool is_window_available) { grpc_chttp2_stream *stream; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); @@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( if (is_window_available) { grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); } else { - stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, + "chttp2_writing_stalled"); } } +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing) { + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); +} + int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index cafecf10465..356fd8174a7 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_unlocking_check_writes( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, int is_parsing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; @@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing, - is_window_available); + grpc_chttp2_list_flush_writing_stalled_by_transport( + exec_ctx, transport_writing, is_window_available); /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } else { - grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); } } if (stream_global->send_trailing_metadata) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 617d98875c3..89e64876af7 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_BEGIN("unlock", 0); if (!t->writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(&t->global, &t->writing, + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, t->parsing_active)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); From 5fc09525ed01c8f889247e61efe9a4d9fb8ef2b5 Mon Sep 17 00:00:00 2001 From: Kailash Sethuraman Date: Mon, 22 Feb 2016 17:28:12 -0800 Subject: [PATCH 9/9] Add initial set of SoC ideas --- summerofcode/ideas.md | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/summerofcode/ideas.md b/summerofcode/ideas.md index 073262339b1..b14d3f7b6ad 100644 --- a/summerofcode/ideas.md +++ b/summerofcode/ideas.md @@ -1,4 +1,22 @@ -Google Summer of Code 2016 gRPC Ideas -===================================== +# gRPC Summer of Code Project Ideas -(Skeleton for now.) +C Core: + +1. Port gRPC to one of (Free, Net, Open) BSD platforms and create packages for them. Add kqueue support in the process. +2. Fix gRPC C-core's URI parser. The current parser does not qualify as a standard parser according to [RFC3986]( https://tools.ietf.org/html/rfc3986). Write test suites to verify this and make changes necessary to make the URI parser compliant. +3. HPACK compression efficiency evaluation - Figure out how to benchmark gRPC's compression efficiency (both in terms of bytes on the wire and cpu cycles). Implement benchmarks. Potentially extend this to other standalone implementations -- Java and Go. + + +gRPC Python: + + 1. Evaluate the port of gRPC's Python implementation to PyPy. Investigate the state of [Cython support](http://docs.cython.org/src/userguide/pypy.html) to do this or potentially explore cffi + 2. Develop and test Python 3.5 Support for gRPC. Make necessary changes to port gRPC and package it for supported platforms. + +gRPC Ruby/Java: + +1. jRuby support for gRPC. Develop a jRuby wrapper for gRPC based on grpc-java and ensure that it is API compatible with the existing Ruby implementation and passes all tests. + + +Other: + +1. Develop a Wireshark plugin for the gRPC protocol. Provide documentation and tutorials for this plugin. Bonus: consider set-up and use with the mobile clients.