From df7ad5f91c0a32356e7648f88091e55e04a6f8a1 Mon Sep 17 00:00:00 2001 From: Juanli Shen Date: Mon, 19 Aug 2019 14:58:01 -0700 Subject: [PATCH 1/7] Repo stats tracking --- tools/gcp/github_stats_tracking/app.yaml | 13 +++ .../github_stats_tracking/appengine_config.py | 19 ++++ tools/gcp/github_stats_tracking/cron.yaml | 4 + tools/gcp/github_stats_tracking/fetch_data.py | 94 +++++++++++++++++++ tools/gcp/github_stats_tracking/main.py | 29 ++++++ 5 files changed, 159 insertions(+) create mode 100644 tools/gcp/github_stats_tracking/app.yaml create mode 100644 tools/gcp/github_stats_tracking/appengine_config.py create mode 100644 tools/gcp/github_stats_tracking/cron.yaml create mode 100644 tools/gcp/github_stats_tracking/fetch_data.py create mode 100644 tools/gcp/github_stats_tracking/main.py diff --git a/tools/gcp/github_stats_tracking/app.yaml b/tools/gcp/github_stats_tracking/app.yaml new file mode 100644 index 00000000000..b0fa5573649 --- /dev/null +++ b/tools/gcp/github_stats_tracking/app.yaml @@ -0,0 +1,13 @@ +runtime: python27 +api_version: 1 +threadsafe: true + +service: github-stats-tracking + +handlers: +- url: /.* + script: main.app + +libraries: +- name: ssl + version: latest diff --git a/tools/gcp/github_stats_tracking/appengine_config.py b/tools/gcp/github_stats_tracking/appengine_config.py new file mode 100644 index 00000000000..086be2aefff --- /dev/null +++ b/tools/gcp/github_stats_tracking/appengine_config.py @@ -0,0 +1,19 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# appengine_config.py +from google.appengine.ext import vendor + +# Add any libraries install in the "lib" folder. +vendor.add('lib') diff --git a/tools/gcp/github_stats_tracking/cron.yaml b/tools/gcp/github_stats_tracking/cron.yaml new file mode 100644 index 00000000000..b5b36be92c6 --- /dev/null +++ b/tools/gcp/github_stats_tracking/cron.yaml @@ -0,0 +1,4 @@ +cron: +- description: "daily github stats tracking job" + url: /daily + schedule: every 24 hours diff --git a/tools/gcp/github_stats_tracking/fetch_data.py b/tools/gcp/github_stats_tracking/fetch_data.py new file mode 100644 index 00000000000..ed183a15a25 --- /dev/null +++ b/tools/gcp/github_stats_tracking/fetch_data.py @@ -0,0 +1,94 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from github import Github, Label +from datetime import datetime, timedelta +from time import time +from google.cloud import bigquery + +ACCESS_TOKEN = "" + + +def get_stats_from_github(): + # Please set the access token properly before deploying. + assert ACCESS_TOKEN + g = Github(ACCESS_TOKEN) + print g.rate_limiting + repo = g.get_repo('grpc/grpc') + + LABEL_LANG = set(label for label in repo.get_labels() + if label.name.split('/')[0] == 'lang') + LABEL_KIND_BUG = repo.get_label('kind/bug') + LABEL_PRIORITY_P0 = repo.get_label('priority/P0') + LABEL_PRIORITY_P1 = repo.get_label('priority/P1') + LABEL_PRIORITY_P2 = repo.get_label('priority/P2') + + def is_untriaged(issue): + key_labels = set() + for label in issue.labels: + label_kind = label.name.split('/')[0] + if label_kind in ('lang', 'kind', 'priority'): + key_labels.add(label_kind) + return len(key_labels) < 3 + + untriaged_open_issues = [ + issue for issue in repo.get_issues(state='open') + if issue.pull_request is None and is_untriaged(issue) + ] + total_bugs = [ + issue + for issue in repo.get_issues(state='all', labels=[LABEL_KIND_BUG]) + if issue.pull_request is None + ] + + lang_to_stats = {} + for lang in LABEL_LANG: + lang_bugs = filter(lambda bug: lang in bug.labels, total_bugs) + closed_bugs = filter(lambda bug: bug.state == 'closed', lang_bugs) + open_bugs = filter(lambda bug: bug.state == 'open', lang_bugs) + open_p0_bugs = filter(lambda bug: LABEL_PRIORITY_P0 in bug.labels, + open_bugs) + open_p1_bugs = filter(lambda bug: LABEL_PRIORITY_P1 in bug.labels, + open_bugs) + open_p2_bugs = filter(lambda bug: LABEL_PRIORITY_P2 in bug.labels, + open_bugs) + lang_to_stats[lang] = [ + len(lang_bugs), + len(closed_bugs), + len(open_bugs), + len(open_p0_bugs), + len(open_p1_bugs), + len(open_p2_bugs) + ] + return len(untriaged_open_issues), lang_to_stats + + +def insert_stats_to_db(untriaged_open_issues, lang_to_stats): + timestamp = time() + client = bigquery.Client() + dataset_ref = client.dataset('github_issues') + table_ref = dataset_ref.table('untriaged_issues') + table = client.get_table(table_ref) + errors = client.insert_rows(table, [(timestamp, untriaged_open_issues)]) + table_ref = dataset_ref.table('bug_stats') + table = client.get_table(table_ref) + rows = [] + for lang, stats in lang_to_stats.iteritems(): + rows.append((timestamp, lang.name[5:]) + tuple(stats)) + errors = client.insert_rows(table, rows) + + +def fetch(): + untriaged_open_issues, lang_to_stats = get_stats_from_github() + insert_stats_to_db(untriaged_open_issues, lang_to_stats) diff --git a/tools/gcp/github_stats_tracking/main.py b/tools/gcp/github_stats_tracking/main.py new file mode 100644 index 00000000000..f1e7ca6d981 --- /dev/null +++ b/tools/gcp/github_stats_tracking/main.py @@ -0,0 +1,29 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import webapp2 +from fetch_data import fetch + + +class DailyCron(webapp2.RequestHandler): + + def get(self): + fetch() + self.response.status = 204 + + +app = webapp2.WSGIApplication( + [ + ('/daily', DailyCron), + ], debug=True) From 257737f2c63b85ec760e0375f430dccb6e71420d Mon Sep 17 00:00:00 2001 From: Hope Casey-Allen Date: Tue, 20 Aug 2019 11:18:45 -0700 Subject: [PATCH 2/7] Fix warning in client_idle_filter.cc to support gcc8 --- src/core/ext/filters/client_idle/client_idle_filter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/filters/client_idle/client_idle_filter.cc b/src/core/ext/filters/client_idle/client_idle_filter.cc index 13c35ae3730..0e598b0649a 100644 --- a/src/core/ext/filters/client_idle/client_idle_filter.cc +++ b/src/core/ext/filters/client_idle/client_idle_filter.cc @@ -370,7 +370,7 @@ void ChannelData::EnterIdle() { // Hold a ref to the channel stack for the transport op. GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op"); // Initialize the transport op. - memset(&idle_transport_op_, 0, sizeof(idle_transport_op_)); + idle_transport_op_ = {}; idle_transport_op_.disconnect_with_error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"), GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE); From 5bf71fa4b77083bf4c48155bcddaf73e33b5b650 Mon Sep 17 00:00:00 2001 From: Hope Casey-Allen Date: Tue, 20 Aug 2019 11:50:58 -0700 Subject: [PATCH 3/7] Add a developer trick to the installation doc, reorganize a bit --- BUILDING.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/BUILDING.md b/BUILDING.md index 615b371db6e..e4269a25956 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -14,6 +14,7 @@ If you plan to build from source and run tests, install the following as well: $ [sudo] apt-get install libgflags-dev libgtest-dev $ [sudo] apt-get install clang libc++-dev ``` +Lastly, see the Protoc section below if you do not yet have the protoc compiler installed. ## MacOS @@ -46,6 +47,7 @@ installed by `brew` is being used: ```sh $ LIBTOOL=glibtool LIBTOOLIZE=glibtoolize make ``` +Lastly, see the Protoc section below if you do not yet have the protoc compiler. ## Windows @@ -112,6 +114,12 @@ From the grpc repository root ```sh $ make ``` +NOTE: if you get an error on linux such as 'aclocal-1.15: command not found', which can happen if you ran 'make' before installing the pre-reqs, try the following: +```sh +$ git clean -f -d -x && git submodule foreach --recursive git clean -f -d -x +$ [sudo] apt-get install build-essential autoconf libtool pkg-config +$ make +``` ## bazel From 09a270d6ad5632538b4f0e11a9cda9898b3491d1 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 20 Aug 2019 13:38:55 -0700 Subject: [PATCH 4/7] Gracefully handle errors from callbacks. In https://github.com/grpc/grpc/issues/19910, it was pointed out that raising an exception from a Future callback would cause the channel spin thread to terminate. If there are outstanding events on the channel, this will cause calls to Channel.close() to block indefinitely. This commit ensures that the channel spin thread does not die. Instead, exceptions will be logged at ERROR level. --- src/python/grpcio/grpc/__init__.py | 3 + src/python/grpcio/grpc/_channel.py | 11 ++- .../tests/unit/_channel_close_test.py | 67 ++++++++++++++++--- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 7dae90c89e8..e14db7906da 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -192,6 +192,9 @@ class Future(six.with_metaclass(abc.ABCMeta)): If the computation has already completed, the callback will be called immediately. + Exceptions raised in the callback will be logged at ERROR level, but + will not terminate any threads of execution. + Args: fn: A callable taking this Future object as its single parameter. """ diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 0bf8e03b5ce..b19c64d3a6e 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -159,7 +159,14 @@ def _event_handler(state, response_deserializer): state.condition.notify_all() done = not state.due for callback in callbacks: - callback() + # TODO(gnossen): Are these *only* user callbacks? + try: + callback() + except Exception as e: # pylint: disable=broad-except + # NOTE(rbellevi): We suppress but log errors here so as not to + # kill the channel spin thread. + logging.error('Exception in callback %s: %s', repr( + callback.func), repr(e)) return done and state.fork_epoch >= cygrpc.get_fork_epoch() return handle_event @@ -338,7 +345,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): # pylint: disable=too def add_done_callback(self, fn): with self._state.condition: if self._state.code is None: - self._state.callbacks.append(lambda: fn(self)) + self._state.callbacks.append(functools.partial(fn, self)) return fn(self) diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py index 82fa1657109..571504c6e3f 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_close_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -27,8 +27,11 @@ _BEAT = 0.5 _SOME_TIME = 5 _MORE_TIME = 10 +_STREAM_URI = 'Meffod' +_UNARY_URI = 'MeffodMan' -class _MethodHandler(grpc.RpcMethodHandler): + +class _StreamingMethodHandler(grpc.RpcMethodHandler): request_streaming = True response_streaming = True @@ -40,13 +43,28 @@ class _MethodHandler(grpc.RpcMethodHandler): yield request * 2 -_METHOD_HANDLER = _MethodHandler() +class _UnaryMethodHandler(grpc.RpcMethodHandler): + + request_streaming = False + response_streaming = False + request_deserializer = None + response_serializer = None + + def unary_unary(self, request, servicer_context): + return request * 2 + + +_STREAMING_METHOD_HANDLER = _StreamingMethodHandler() +_UNARY_METHOD_HANDLER = _UnaryMethodHandler() class _GenericHandler(grpc.GenericRpcHandler): def service(self, handler_call_details): - return _METHOD_HANDLER + if handler_call_details.method == _STREAM_URI: + return _STREAMING_METHOD_HANDLER + else: + return _UNARY_METHOD_HANDLER _GENERIC_HANDLER = _GenericHandler() @@ -94,6 +112,24 @@ class _Pipe(object): self.close() +class EndlessIterator(object): + + def __init__(self, msg): + self._msg = msg + + def __iter__(self): + return self + + def _next(self): + return self._msg + + def __next__(self): + return self._next() + + def next(self): + return self._next() + + class ChannelCloseTest(unittest.TestCase): def setUp(self): @@ -108,7 +144,7 @@ class ChannelCloseTest(unittest.TestCase): def test_close_immediately_after_call_invocation(self): channel = grpc.insecure_channel('localhost:{}'.format(self._port)) - multi_callable = channel.stream_stream('Meffod') + multi_callable = channel.stream_stream(_STREAM_URI) request_iterator = _Pipe(()) response_iterator = multi_callable(request_iterator) channel.close() @@ -118,7 +154,7 @@ class ChannelCloseTest(unittest.TestCase): def test_close_while_call_active(self): channel = grpc.insecure_channel('localhost:{}'.format(self._port)) - multi_callable = channel.stream_stream('Meffod') + multi_callable = channel.stream_stream(_STREAM_URI) request_iterator = _Pipe((b'abc',)) response_iterator = multi_callable(request_iterator) next(response_iterator) @@ -130,7 +166,7 @@ class ChannelCloseTest(unittest.TestCase): def test_context_manager_close_while_call_active(self): with grpc.insecure_channel('localhost:{}'.format( self._port)) as channel: # pylint: disable=bad-continuation - multi_callable = channel.stream_stream('Meffod') + multi_callable = channel.stream_stream(_STREAM_URI) request_iterator = _Pipe((b'abc',)) response_iterator = multi_callable(request_iterator) next(response_iterator) @@ -141,7 +177,7 @@ class ChannelCloseTest(unittest.TestCase): def test_context_manager_close_while_many_calls_active(self): with grpc.insecure_channel('localhost:{}'.format( self._port)) as channel: # pylint: disable=bad-continuation - multi_callable = channel.stream_stream('Meffod') + multi_callable = channel.stream_stream(_STREAM_URI) request_iterators = tuple( _Pipe((b'abc',)) for _ in range(test_constants.THREAD_CONCURRENCY)) @@ -158,7 +194,7 @@ class ChannelCloseTest(unittest.TestCase): def test_many_concurrent_closes(self): channel = grpc.insecure_channel('localhost:{}'.format(self._port)) - multi_callable = channel.stream_stream('Meffod') + multi_callable = channel.stream_stream(_STREAM_URI) request_iterator = _Pipe((b'abc',)) response_iterator = multi_callable(request_iterator) next(response_iterator) @@ -181,6 +217,21 @@ class ChannelCloseTest(unittest.TestCase): self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + def test_exception_in_callback(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: + stream_multi_callable = channel.stream_stream(_STREAM_URI) + request_iterator = (str(i).encode('ascii') for i in range(9999)) + endless_iterator = EndlessIterator(b'abc') + stream_response_iterator = stream_multi_callable(endless_iterator) + future = channel.unary_unary(_UNARY_URI).future(b'abc') + + def on_done_callback(future): + raise Exception("This should not cause a deadlock.") + + future.add_done_callback(on_done_callback) + future.result() + if __name__ == '__main__': logging.basicConfig() From ee99f9aa4c45e9d29c7649ac5a4e0690c26534f8 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 20 Aug 2019 14:25:14 -0700 Subject: [PATCH 5/7] Remove TODO --- src/python/grpcio/grpc/_channel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index b19c64d3a6e..f1584940a86 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -159,7 +159,6 @@ def _event_handler(state, response_deserializer): state.condition.notify_all() done = not state.due for callback in callbacks: - # TODO(gnossen): Are these *only* user callbacks? try: callback() except Exception as e: # pylint: disable=broad-except From d273fdf41d557f126fe227a0c8858397c0effca7 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 20 Aug 2019 14:26:22 -0700 Subject: [PATCH 6/7] Remove line of dead code --- src/python/grpcio_tests/tests/unit/_channel_close_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py index 571504c6e3f..6004c25cc30 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_close_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -221,7 +221,6 @@ class ChannelCloseTest(unittest.TestCase): with grpc.insecure_channel('localhost:{}'.format( self._port)) as channel: stream_multi_callable = channel.stream_stream(_STREAM_URI) - request_iterator = (str(i).encode('ascii') for i in range(9999)) endless_iterator = EndlessIterator(b'abc') stream_response_iterator = stream_multi_callable(endless_iterator) future = channel.unary_unary(_UNARY_URI).future(b'abc') From 662919cf901287618c5d56f2a216cb386607d831 Mon Sep 17 00:00:00 2001 From: Richard Belleville Date: Tue, 20 Aug 2019 14:49:57 -0700 Subject: [PATCH 7/7] Simplify with itertools --- .../tests/unit/_channel_close_test.py | 21 ++----------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py index 6004c25cc30..47f52b4890e 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_close_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests server and client side compression.""" +import itertools import logging import threading import time @@ -112,24 +113,6 @@ class _Pipe(object): self.close() -class EndlessIterator(object): - - def __init__(self, msg): - self._msg = msg - - def __iter__(self): - return self - - def _next(self): - return self._msg - - def __next__(self): - return self._next() - - def next(self): - return self._next() - - class ChannelCloseTest(unittest.TestCase): def setUp(self): @@ -221,7 +204,7 @@ class ChannelCloseTest(unittest.TestCase): with grpc.insecure_channel('localhost:{}'.format( self._port)) as channel: stream_multi_callable = channel.stream_stream(_STREAM_URI) - endless_iterator = EndlessIterator(b'abc') + endless_iterator = itertools.repeat(b'abc') stream_response_iterator = stream_multi_callable(endless_iterator) future = channel.unary_unary(_UNARY_URI).future(b'abc')