From 29f2cb868741967ed34fcb293451784f7effe2d8 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Sat, 21 May 2016 13:55:03 +0000 Subject: [PATCH 1/6] Add a Cython-level cancel-many-calls test --- src/python/grpcio/tests/tests.json | 1 + .../unit/_cython/_cancel_many_calls_test.py | 222 ++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 691062f25ac..1beb619f879 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -5,6 +5,7 @@ "_base_interface_test.SyncPeasyTest", "_beta_features_test.BetaFeaturesTest", "_beta_features_test.ContextManagementAndLifecycleTest", + "_cancel_many_calls_test.CancelManyCallsTest", "_channel_test.ChannelTest", "_connectivity_channel_test.ChannelConnectivityTest", "_core_over_links_base_interface_test.AsyncEasyTest", diff --git a/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py new file mode 100644 index 00000000000..c1de7790145 --- /dev/null +++ b/src/python/grpcio/tests/unit/_cython/_cancel_many_calls_test.py @@ -0,0 +1,222 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test making many calls and immediately cancelling most of them.""" + +import threading +import unittest + +from grpc._cython import cygrpc +from grpc.framework.foundation import logging_pool +from tests.unit.framework.common import test_constants + +_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) +_EMPTY_FLAGS = 0 +_EMPTY_METADATA = cygrpc.Metadata(()) + +_SERVER_SHUTDOWN_TAG = 'server_shutdown' +_REQUEST_CALL_TAG = 'request_call' +_RECEIVE_CLOSE_ON_SERVER_TAG = 'receive_close_on_server' +_RECEIVE_MESSAGE_TAG = 'receive_message' +_SERVER_COMPLETE_CALL_TAG = 'server_complete_call' + +_SUCCESS_CALL_FRACTION = 1.0 / 8.0 + + +class _State(object): + + def __init__(self): + self.condition = threading.Condition() + self.handlers_released = False + self.parked_handlers = 0 + self.handled_rpcs = 0 + + +def _is_cancellation_event(event): + return ( + event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and + event.batch_operations[0].received_cancelled) + + +class _Handler(object): + + def __init__(self, state, completion_queue, rpc_event): + self._state = state + self._lock = threading.Lock() + self._completion_queue = completion_queue + self._call = rpc_event.operation_call + + def __call__(self): + with self._state.condition: + self._state.parked_handlers += 1 + if self._state.parked_handlers == test_constants.THREAD_CONCURRENCY: + self._state.condition.notify_all() + while not self._state.handlers_released: + self._state.condition.wait() + + with self._lock: + self._call.start_batch( + cygrpc.Operations( + (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)), + _RECEIVE_CLOSE_ON_SERVER_TAG) + self._call.start_batch( + cygrpc.Operations((cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + _RECEIVE_MESSAGE_TAG) + first_event = self._completion_queue.poll() + if _is_cancellation_event(first_event): + self._completion_queue.poll() + else: + with self._lock: + operations = ( + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', + _EMPTY_FLAGS), + ) + self._call.start_batch( + cygrpc.Operations(operations), _SERVER_COMPLETE_CALL_TAG) + self._completion_queue.poll() + self._completion_queue.poll() + + +def _serve(state, server, server_completion_queue, thread_pool): + for _ in range(test_constants.RPC_CONCURRENCY): + call_completion_queue = cygrpc.CompletionQueue() + server.request_call( + call_completion_queue, server_completion_queue, _REQUEST_CALL_TAG) + rpc_event = server_completion_queue.poll() + thread_pool.submit(_Handler(state, call_completion_queue, rpc_event)) + with state.condition: + state.handled_rpcs += 1 + if test_constants.RPC_CONCURRENCY <= state.handled_rpcs: + state.condition.notify_all() + server_completion_queue.poll() + + +class _QueueDriver(object): + + def __init__(self, condition, completion_queue, due): + self._condition = condition + self._completion_queue = completion_queue + self._due = due + self._events = [] + self._returned = False + + def start(self): + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events.append(event) + self._due.remove(event.tag) + self._condition.notify_all() + if not self._due: + self._returned = True + return + thread = threading.Thread(target=in_thread) + thread.start() + + def events(self, at_least): + with self._condition: + while len(self._events) < at_least: + self._condition.wait() + return tuple(self._events) + + +class CancelManyCallsTest(unittest.TestCase): + + def testCancelManyCalls(self): + server_thread_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + + server_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server() + server.register_completion_queue(server_completion_queue) + port = server.add_http2_port('[::]:0') + server.start() + channel = cygrpc.Channel('localhost:{}'.format(port)) + + state = _State() + + server_thread_args = ( + state, server, server_completion_queue, server_thread_pool,) + server_thread = threading.Thread(target=_serve, args=server_thread_args) + server_thread.start() + + client_condition = threading.Condition() + client_due = set() + client_completion_queue = cygrpc.CompletionQueue() + client_driver = _QueueDriver( + client_condition, client_completion_queue, client_due) + client_driver.start() + + with client_condition: + client_calls = [] + for index in range(test_constants.RPC_CONCURRENCY): + client_call = channel.create_call( + None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, + _INFINITE_FUTURE) + operations = ( + cygrpc.operation_send_initial_metadata( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), + cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + ) + tag = 'client_complete_call_{0:04d}_tag'.format(index) + client_call.start_batch(cygrpc.Operations(operations), tag) + client_due.add(tag) + client_calls.append(client_call) + + with state.condition: + while True: + if state.parked_handlers < test_constants.THREAD_CONCURRENCY: + state.condition.wait() + elif state.handled_rpcs < test_constants.RPC_CONCURRENCY: + state.condition.wait() + else: + state.handlers_released = True + state.condition.notify_all() + break + + client_driver.events( + test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) + with client_condition: + for client_call in client_calls: + client_call.cancel() + + with state.condition: + server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG) + + +if __name__ == '__main__': + unittest.main(verbosity=2) From de5039d0846dfd1b3dd46e2748b3a170868ef175 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 11:55:53 -0700 Subject: [PATCH 2/6] Fix sanity --- include/grpc/impl/codegen/log.h | 4 ---- src/core/lib/iomgr/error.c | 4 ++++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/grpc/impl/codegen/log.h b/include/grpc/impl/codegen/log.h index c3f30a91479..aa86fc4c179 100644 --- a/include/grpc/impl/codegen/log.h +++ b/include/grpc/impl/codegen/log.h @@ -43,10 +43,6 @@ extern "C" { #endif -#ifdef GPR_WIN32 -#include -#endif - /* GPR log API. Usage (within grpc): diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 6da0b005825..b38584100df 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -42,6 +42,10 @@ #include #include +#ifdef GPR_WIN32 +#include +#endif + static void destroy_integer(void *key) {} static void *copy_integer(void *key) { return key; } From 85861205dd22cafec2f7808dfbc14c88a193ff3f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:12:34 -0700 Subject: [PATCH 3/6] Fix sanity --- .../lib/security/credentials/composite/composite_credentials.h | 3 ++- src/core/lib/security/credentials/fake/fake_credentials.h | 2 +- .../credentials/google_default/google_default_credentials.h | 3 ++- src/core/lib/security/credentials/iam/iam_credentials.h | 2 +- src/core/lib/security/credentials/jwt/jwt_credentials.h | 2 +- src/core/lib/security/credentials/oauth2/oauth2_credentials.h | 2 +- src/core/lib/security/credentials/plugin/plugin_credentials.h | 2 +- src/core/lib/security/util/json_util.h | 2 +- 8 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/core/lib/security/credentials/composite/composite_credentials.h b/src/core/lib/security/credentials/composite/composite_credentials.h index 43974a96236..0d8966f464d 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.h +++ b/src/core/lib/security/credentials/composite/composite_credentials.h @@ -68,4 +68,5 @@ typedef struct { grpc_call_credentials_array inner; } grpc_composite_call_credentials; -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_COMPOSITE_COMPOSITE_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_COMPOSITE_COMPOSITE_CREDENTIALS_H \ + */ diff --git a/src/core/lib/security/credentials/fake/fake_credentials.h b/src/core/lib/security/credentials/fake/fake_credentials.h index adeba94bdb7..9cf38084a3d 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.h +++ b/src/core/lib/security/credentials/fake/fake_credentials.h @@ -53,4 +53,4 @@ typedef struct { int is_async; } grpc_md_only_test_credentials; -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_FAKE_FAKE_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_FAKE_FAKE_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.h b/src/core/lib/security/credentials/google_default/google_default_credentials.h index 0d14cbfdad4..fac4377e2c8 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.h +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.h @@ -42,4 +42,5 @@ void grpc_flush_cached_google_default_credentials(void); -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_GOOGLE_DEFAULT_GOOGLE_DEFAULT_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_GOOGLE_DEFAULT_GOOGLE_DEFAULT_CREDENTIALS_H \ + */ diff --git a/src/core/lib/security/credentials/iam/iam_credentials.h b/src/core/lib/security/credentials/iam/iam_credentials.h index e989bfc28e3..af54faa5868 100644 --- a/src/core/lib/security/credentials/iam/iam_credentials.h +++ b/src/core/lib/security/credentials/iam/iam_credentials.h @@ -41,4 +41,4 @@ typedef struct { grpc_credentials_md_store *iam_md; } grpc_google_iam_credentials; -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_IAM_IAM_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_IAM_IAM_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.h b/src/core/lib/security/credentials/jwt/jwt_credentials.h index dfa4a1ebd9d..d5726061794 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.h +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.h @@ -59,4 +59,4 @@ grpc_call_credentials * grpc_service_account_jwt_access_credentials_create_from_auth_json_key( grpc_auth_json_key key, gpr_timespec token_lifetime); -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_JWT_JWT_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_JWT_JWT_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h index be7bc81626b..7a56668d2af 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h @@ -106,4 +106,4 @@ grpc_oauth2_token_fetcher_credentials_parse_server_response( const struct grpc_http_response *response, grpc_credentials_md_store **token_md, gpr_timespec *token_lifetime); -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_OAUTH2_OAUTH2_CREDENTIALS_H */ diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.h b/src/core/lib/security/credentials/plugin/plugin_credentials.h index b5dd3ad5e2c..89073cb3d1b 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.h +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.h @@ -42,4 +42,4 @@ typedef struct { grpc_credentials_md_store *plugin_md; } grpc_plugin_credentials; -#endif // GRPC_CORE_LIB_SECURITY_CREDENTIALS_PLUGIN_PLUGIN_CREDENTIALS_H +#endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_PLUGIN_PLUGIN_CREDENTIALS_H */ diff --git a/src/core/lib/security/util/json_util.h b/src/core/lib/security/util/json_util.h index 5959626a5fe..137900593f6 100644 --- a/src/core/lib/security/util/json_util.h +++ b/src/core/lib/security/util/json_util.h @@ -52,4 +52,4 @@ const char *grpc_json_get_string_property(const grpc_json *json, bool grpc_copy_json_string_property(const grpc_json *json, const char *prop_name, char **copied_value); -#endif // GRPC_CORE_LIB_SECURITY_UTIL_JSON_UTIL_H +#endif /* GRPC_CORE_LIB_SECURITY_UTIL_JSON_UTIL_H */ From 081f1a3dbc321abf5d9c975730ccfb713985217d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:16:42 -0700 Subject: [PATCH 4/6] Update comments --- src/core/lib/iomgr/closure.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 344b7d3c3ea..08e59a168e1 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -53,8 +53,8 @@ typedef struct grpc_closure_list { /** gRPC Callback definition. * * \param arg Arbitrary input. - * \param success An indication on the state of the iomgr. On false, cleanup - * actions should be taken (eg, shutdown). */ + * \param error GRPC_ERROR_NONE if no error occurred, otherwise some grpc_error + * describing what went wrong */ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -66,8 +66,11 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; + /** Once queued, the result of the closure. Before then: scratch space */ grpc_error *error; + /** Once queued, next indicates the next queued closure; before then, scratch + * space */ union { grpc_closure *next; uintptr_t scratch; @@ -84,8 +87,8 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } -/** add \a closure to the end of \a list and set \a closure's success to \a - * success */ +/** add \a closure to the end of \a list + and set \a closure's result to \a error */ void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, grpc_error *error); From 332f1b35d534f6910093729e9ecc2cacf8bb9688 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:21:21 -0700 Subject: [PATCH 5/6] Rename functions --- src/core/ext/client_config/client_channel.c | 20 +++++------ src/core/ext/client_config/subchannel.c | 2 +- .../client_config/subchannel_call_holder.c | 4 +-- .../ext/lb_policy/pick_first/pick_first.c | 26 +++++++------- .../ext/lb_policy/round_robin/round_robin.c | 12 +++---- .../ext/resolver/dns/native/dns_resolver.c | 4 +-- .../ext/resolver/sockaddr/sockaddr_resolver.c | 4 +-- .../chttp2/server/insecure/server_chttp2.c | 2 +- .../chttp2/transport/chttp2_transport.c | 36 +++++++++---------- .../ext/transport/chttp2/transport/writing.c | 2 +- .../cronet/transport/cronet_transport.c | 4 +-- src/core/lib/http/httpcli.c | 2 +- src/core/lib/iomgr/ev_poll_posix.c | 10 +++--- src/core/lib/iomgr/exec_ctx.c | 2 +- src/core/lib/iomgr/exec_ctx.h | 6 ++-- src/core/lib/iomgr/pollset_windows.c | 4 +-- src/core/lib/iomgr/resolve_address_posix.c | 2 +- src/core/lib/iomgr/resolve_address_windows.c | 2 +- src/core/lib/iomgr/socket_windows.c | 4 +-- src/core/lib/iomgr/tcp_client_posix.c | 10 +++--- src/core/lib/iomgr/tcp_client_windows.c | 4 +-- src/core/lib/iomgr/tcp_posix.c | 8 ++--- src/core/lib/iomgr/tcp_server_posix.c | 2 +- src/core/lib/iomgr/tcp_server_windows.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 16 ++++----- src/core/lib/iomgr/timer.c | 8 ++--- src/core/lib/iomgr/workqueue.h | 4 +-- src/core/lib/iomgr/workqueue_posix.c | 2 +- .../lib/security/transport/secure_endpoint.c | 4 +-- .../security/transport/server_auth_filter.c | 6 ++-- src/core/lib/surface/call.c | 6 ++-- src/core/lib/surface/lame_client.c | 6 ++-- src/core/lib/surface/server.c | 22 ++++++------ src/core/lib/transport/connectivity_state.c | 10 +++--- src/core/lib/transport/transport.c | 8 ++--- test/core/end2end/fuzzers/api_fuzzer.c | 12 +++---- test/core/end2end/tests/filter_causes_close.c | 2 +- test/core/internal_api_canaries/iomgr.c | 2 +- test/core/iomgr/workqueue_test.c | 4 +-- test/core/security/credentials_test.c | 12 +++---- test/core/security/jwt_verifier_test.c | 10 +++--- test/core/util/mock_endpoint.c | 8 ++--- test/core/util/passthru_endpoint.c | 12 +++---- 43 files changed, 164 insertions(+), 164 deletions(-) diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index d00b9668122..7da998e5e24 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -277,7 +277,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -296,9 +296,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_exec_ctx_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing"), + NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -354,11 +354,11 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); } @@ -387,8 +387,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE("Pick cancelled"), NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -423,8 +423,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_push(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), + NULL); } gpr_mu_unlock(&chand->mu_config); return 0; diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index dd61caea94d..1e1b30c4c08 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -290,7 +290,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(subchannel_destroy, c), + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 07b7813482a..14022e90954 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -222,8 +222,8 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), + GRPC_ERROR_NONE, NULL); } static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index b5c2c426dea..deb3f519fa8 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -121,7 +121,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); pp = next; } @@ -140,8 +140,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -166,8 +166,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -306,16 +306,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_push(exec_ctx, - grpc_closure_create(destroy_subchannels, p), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, + grpc_closure_create(destroy_subchannels, p), + GRPC_ERROR_NONE, NULL); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -368,8 +368,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + NULL); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -421,8 +421,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), + NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 95305e9cdd3..76d9d83c9b3 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -239,7 +239,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } @@ -267,7 +267,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +293,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -412,7 +412,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -466,7 +466,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } @@ -521,7 +521,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_push(exec_ctx, closure, + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index adb9c3af843..e35aaba815f 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -111,7 +111,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } @@ -227,7 +227,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 90da6a93a01..1f7cce2f43a 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -92,7 +92,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -133,7 +133,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index bf4c817e2bf..c06d3eb60ae 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -75,7 +75,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5264b9e1f85..be517154e62 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -190,8 +190,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_push(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed"), NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -643,7 +643,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_exec_ctx_push(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } check_read_ops(exec_ctx, &t->global); @@ -907,7 +907,7 @@ void grpc_chttp2_complete_closure_step( stream_global->collecting_stats); stream_global->collecting_stats = NULL; } - grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); } *pclosure = NULL; } @@ -1127,7 +1127,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_push(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1160,7 +1160,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1234,8 +1234,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { @@ -1248,13 +1248,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } } @@ -1643,7 +1643,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_push(exec_ctx, &t->parsing_action, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { post_reading_action_locked(exec_ctx, t, s_unused, arg); } @@ -1870,10 +1870,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), - NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { bs->on_next = arg->on_complete; bs->next = arg->slice; @@ -1930,7 +1930,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { *bs->next = arg->slice; - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, arg->slice); @@ -1968,7 +1968,7 @@ static void incoming_byte_stream_finished_failed_locked( grpc_chttp2_incoming_byte_stream *bs = a->bs; grpc_error *error = a->error; gpr_free(a); - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ca56debc841..ed6536b3c71 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -187,7 +187,7 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_push(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index ea0f2bcba4c..c5318f092e0 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -155,11 +155,11 @@ static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void enqueue_callbacks(grpc_closure *callback_list[]) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (callback_list[0]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); callback_list[0] = NULL; } if (callback_list[1]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); callback_list[1] = NULL; } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 4bda734bfaa..bcd9c8c8213 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -102,7 +102,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); - grpc_exec_ctx_push(exec_ctx, req->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 52efb0d6b29..e5eb16c3bea 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -373,7 +373,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { if (!fd->released) { close(fd->fd); } - grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -438,8 +438,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -462,7 +462,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); + grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -811,7 +811,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } static void work_combine_error(grpc_error **composite, grpc_error *error) { diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index fcf39ffd54d..9652f826c1a 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -82,7 +82,7 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 617f48f1b8f..38f27d9b136 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -94,9 +94,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null); /** Returns true if we'd like to leave this execution context as soon as possible: useful for deciding whether to do something more or not depending on outside context */ diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 5882d8d71db..d9f0f05ba6e 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { pollset->on_shutdown = closure; } @@ -167,7 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_push(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, NULL); pollset->on_shutdown = NULL; } diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 9a320dc9520..4e9f978584a 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -163,7 +163,7 @@ typedef struct { static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { request *r = rp; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out), NULL); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 22a9feb72db..fac4d13a275 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, } else { GRPC_ERROR_REF(error); } - grpc_exec_ctx_push(exec_ctx, r->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 8919452751f..fbc1bb5301a 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -123,7 +123,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { info->closure = closure; } @@ -146,7 +146,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 9c4ca1617f0..078696c1269 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -221,7 +221,7 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -250,7 +250,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } if (dsmode == GRPC_DSMODE_IPV4) { @@ -260,7 +260,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_len = sizeof(addr4_copy); } if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } @@ -276,13 +276,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index dbf116422f7..6134d4476bd 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -121,7 +121,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect_unlock_and_cleanup(ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - grpc_exec_ctx_push(exec_ctx, on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); } /* Tries to issue one async connection, then schedules both an IOCP @@ -225,7 +225,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_exec_ctx_push(exec_ctx, on_done, final_error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 48a19bed84b..2f7c50ce358 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -175,7 +175,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } #define MAX_READ_IOVEC 4 @@ -277,7 +277,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } @@ -410,7 +410,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); return; } tcp->outgoing_buffer = buf; @@ -422,7 +422,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index e5c6a3c5498..6b10d3ebebb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -154,7 +154,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 481111c9f92..b4f28a4c3b1 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -121,7 +121,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } /* Now that the accepts have been aborted, we can destroy the sockets. diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 88c9354f6e5..383af9f12b1 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -183,7 +183,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -197,7 +197,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -222,7 +222,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); return; } @@ -235,7 +235,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } @@ -267,7 +267,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } /* Initiates a write. */ @@ -285,7 +285,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -317,7 +317,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); return; } @@ -334,7 +334,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; } diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index b46c6df287d..7b0ef550a7b 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -186,7 +186,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, &timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization"), NULL); @@ -195,7 +195,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); return; } @@ -247,7 +247,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -313,7 +313,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 6e8b1218beb..7b6402e3a42 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -78,7 +78,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error); +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 297e8d31022..d2e3b5acb3a 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -137,7 +137,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } } -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 23b51f33f60..072d4684150 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -138,7 +138,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, ep->read_cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } @@ -319,7 +319,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, cb, grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Wrap failed"), result), NULL); diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index d45ec4020c6..acbce769fb5 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -128,7 +128,7 @@ static void on_md_processing_done( grpc_metadata_batch_filter(calld->recv_initial_metadata, remove_consumed_md, elem); grpc_metadata_array_destroy(&calld->md); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); } else { gpr_slice message; grpc_transport_stream_op close_op; @@ -146,7 +146,7 @@ static void on_md_processing_done( calld->transport_op.send_trailing_metadata = NULL; grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE(error_details), GRPC_ERROR_INT_GRPC_STATUS, status), NULL); @@ -169,7 +169,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, return; } } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), NULL); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3915ba57a28..09167efb5c0 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -743,7 +743,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_push(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); return GRPC_CALL_OK; } @@ -970,7 +970,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ - grpc_exec_ctx_push(exec_ctx, bctl->notify_tag, bctl->error, NULL); + grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -1129,7 +1129,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_push(exec_ctx, saved_rsr_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); } gpr_mu_unlock(&call->mu); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 5c15b676f8f..8a0dcdcc077 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -94,14 +94,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; - grpc_exec_ctx_push(exec_ctx, op->on_connectivity_state_change, + grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, + grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index f2613217507..eae70717a5a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -344,8 +344,8 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } @@ -526,7 +526,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -571,8 +571,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -757,8 +757,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv_initial_metadata, error, - NULL); + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, + NULL); } static void server_mutate_op(grpc_call_element *elem, @@ -794,8 +794,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -1339,8 +1339,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 3286af9fb2f..f714e385dea 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -79,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); } - grpc_exec_ctx_push(exec_ctx, w->notify, error, NULL); + grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL); gpr_free(w); } GRPC_ERROR_UNREF(tracker->current_error); @@ -114,7 +114,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -122,7 +122,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -133,7 +133,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_push(exec_ctx, notify, + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); @@ -179,7 +179,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_push(exec_ctx, w->notify, + grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index fdf0f4b2aab..10d58153afd 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -60,7 +60,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_push(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); } } @@ -146,11 +146,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { - grpc_exec_ctx_push(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->recv_initial_metadata_ready, + grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->on_complete, error, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index aa715c841cf..d00a67ffc94 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -200,9 +200,9 @@ static void finish_resolve(grpc_exec_ctx *exec_ctx, void *arg, addrs->addrs = gpr_malloc(sizeof(*addrs->addrs)); addrs->addrs[0].len = 0; *r->addrs = addrs; - grpc_exec_ctx_push(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); } else { - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, GRPC_ERROR_CREATE_REFERENCING("Resolution failed", &error, 1), NULL); } @@ -247,7 +247,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { future_connect *fc = arg; if (error != GRPC_ERROR_NONE) { *fc->ep = NULL; - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); } else if (g_server != NULL) { grpc_endpoint *client; grpc_endpoint *server; @@ -259,7 +259,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_server_setup_transport(exec_ctx, g_server, transport, NULL, NULL); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); } else { sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline); } @@ -270,8 +270,8 @@ static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline) { if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) { *ep = NULL; - grpc_exec_ctx_push(exec_ctx, closure, - GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); + grpc_exec_ctx_sched(exec_ctx, closure, + GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); return; } diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 8fd847b878b..71c0dbfb581 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -216,7 +216,7 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, &message); grpc_call_next_op(exec_ctx, elem, &op); } - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, calld->recv_im_ready, GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1), NULL); } diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 71a1bb14369..5e86c423095 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -72,7 +72,7 @@ static void test_code(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); - grpc_exec_ctx_push(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); + grpc_exec_ctx_sched(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); grpc_exec_ctx_enqueue_list(&exec_ctx, &closure_list, NULL); /* endpoint.h */ diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 9a359cd7992..76ecfae74b8 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -73,7 +73,7 @@ static void test_add_closure(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_workqueue_push(&exec_ctx, wq, &c, GRPC_ERROR_NONE); + grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); gpr_mu_lock(g_mu); @@ -103,7 +103,7 @@ static void test_flush(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_exec_ctx_push(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); grpc_workqueue_flush(&exec_ctx, wq); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index ec417b84dca..e703dbdeb61 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -560,7 +560,7 @@ static int compute_engine_httpcli_get_success_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -570,7 +570,7 @@ static int compute_engine_httpcli_get_failure_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -663,7 +663,7 @@ static int refresh_token_httpcli_post_success( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -673,7 +673,7 @@ static int refresh_token_httpcli_post_failure( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -915,7 +915,7 @@ static int default_creds_gce_detection_httpcli_get_success_override( response->hdrs = headers; GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -973,7 +973,7 @@ static int default_creds_gce_detection_httpcli_get_failure_override( GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); *response = http_response(200, ""); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index 23b46958f4b..36b331a777b 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -294,7 +294,7 @@ static int httpcli_get_google_keys_for_email( "/robot/v1/metadata/x509/" "777-abaslkan11hlb6nmim3bpspl31ud@developer." "gserviceaccount.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -338,7 +338,7 @@ static int httpcli_get_custom_keys_for_email( GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/jwk/foo@bar.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -372,7 +372,7 @@ static int httpcli_get_jwk_set(grpc_exec_ctx *exec_ctx, GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/oauth2/v3/certs") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -387,7 +387,7 @@ static int httpcli_get_openid_config(grpc_exec_ctx *exec_ctx, GPR_ASSERT(strcmp(request->http.path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0); grpc_httpcli_set_override(httpcli_get_jwk_set, httpcli_post_should_not_be_called); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -427,7 +427,7 @@ static int httpcli_get_bad_json(grpc_exec_ctx *exec_ctx, grpc_httpcli_response *response) { *response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}")); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index deef68ef596..e3df5b1841c 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -51,7 +51,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -65,7 +65,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); } - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -78,7 +78,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } @@ -116,7 +116,7 @@ void grpc_mock_endpoint_put_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->on_read != NULL) { gpr_slice_buffer_add(m->on_read_out, slice); - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { gpr_slice_buffer_add(&m->read_buffer, slice); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 93753be2519..bf897d8f7e3 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -59,11 +59,11 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -87,7 +87,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { gpr_slice_buffer_add(m->on_read_out, gpr_slice_ref(slices->slices[i])); } - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { for (size_t i = 0; i < slices->count; i++) { @@ -95,7 +95,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } gpr_mu_unlock(&m->parent->mu); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -109,13 +109,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } From 77c983dc87dcf19b3e869247f90063ef4c10af3e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:23:14 -0700 Subject: [PATCH 6/6] clang-format --- src/core/ext/client_config/subchannel.c | 2 +- src/core/ext/lb_policy/round_robin/round_robin.c | 12 +++++++----- src/core/ext/resolver/dns/native/dns_resolver.c | 2 +- src/core/ext/transport/chttp2/transport/writing.c | 2 +- src/core/lib/iomgr/exec_ctx.c | 4 ++-- src/core/lib/iomgr/pollset_windows.c | 2 +- src/core/lib/iomgr/tcp_client_posix.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 8 ++++---- src/core/lib/iomgr/workqueue_posix.c | 2 +- src/core/lib/security/transport/server_auth_filter.c | 8 ++++---- src/core/lib/surface/lame_client.c | 4 ++-- src/core/lib/transport/connectivity_state.c | 4 ++-- src/core/lib/transport/transport.c | 4 ++-- test/core/util/mock_endpoint.c | 2 +- test/core/util/passthru_endpoint.c | 6 +++--- 15 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 1e1b30c4c08..c6c7b7a3a0b 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -291,7 +291,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 76d9d83c9b3..42356d8b0bd 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -240,7 +240,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Channel Shutdown"), NULL); + GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } grpc_connectivity_state_set( @@ -267,7 +267,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +294,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -467,7 +469,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + NULL); gpr_free(pp); } } else { @@ -522,7 +524,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, - GRPC_ERROR_CREATE("Round Robin not connected"), NULL); + GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index e35aaba815f..d6e7c89ef81 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -112,7 +112,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_sched(exec_ctx, r->next_completion, - GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); + GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ed6536b3c71..add76781827 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes( &transport_writing->done_cb); } else { grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, - NULL); + NULL); } } diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 9652f826c1a..c44aafcddf0 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -83,8 +83,8 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null) { + grpc_error *error, + grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index d9f0f05ba6e..626dd784b30 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -168,7 +168,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down && pollset->on_shutdown != NULL) { grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, - NULL); + NULL); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 078696c1269..80c7a3f1287 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -283,7 +283,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), - NULL); + NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 383af9f12b1..b2003675be4 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -198,7 +198,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -236,7 +236,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, - GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); + GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } } @@ -286,7 +286,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -335,7 +335,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), - NULL); + NULL); return; } } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index d2e3b5acb3a..45e0f6063b4 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -138,7 +138,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { + grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index acbce769fb5..34120aba421 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -147,9 +147,9 @@ static void on_md_processing_done( grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status), - NULL); + grpc_error_set_int(GRPC_ERROR_CREATE(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status), + NULL); } grpc_exec_ctx_finish(&exec_ctx); @@ -170,7 +170,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } } grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), - NULL); + NULL); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 8a0dcdcc077..6c94de555aa 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -95,14 +95,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel"), NULL); + GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index f714e385dea..062d0b3742a 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -134,7 +134,7 @@ int grpc_connectivity_state_notify_on_state_change( if (tracker->current_state != *current) { *current = tracker->current_state; grpc_exec_ctx_sched(exec_ctx, notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -180,7 +180,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, *w->current = tracker->current_state; tracker->watchers = w->next; grpc_exec_ctx_sched(exec_ctx, w->notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 10d58153afd..ec6833e323c 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -147,9 +147,9 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), - NULL); + NULL); grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error), NULL); + GRPC_ERROR_REF(error), NULL); grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index e3df5b1841c..ed9545e9df2 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -79,7 +79,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->mu); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, - GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); + GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } gpr_mu_unlock(&m->mu); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index bf897d8f7e3..a39f3dd66e2 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -60,7 +60,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), - NULL); + NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); @@ -110,13 +110,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { m->parent->shutdown = true; if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } gpr_mu_unlock(&m->parent->mu);