From 71094e25c5355190eed4463cf3b1e48db053c6e0 Mon Sep 17 00:00:00 2001 From: Lidi Zheng Date: Tue, 18 Dec 2018 11:43:53 -0800 Subject: [PATCH 01/16] Remove dependency of grpc.framework.foundation.callable_util * Used in _channel.py, _server.py, and _utilities.py * This API can trace back to 4 years ago * The code change ensures the logging info is exactly the same --- src/python/grpcio/grpc/_channel.py | 9 +++++---- src/python/grpcio/grpc/_server.py | 7 ++++--- src/python/grpcio/grpc/_utilities.py | 16 +++++++++++----- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 96118badada..e8279db51fd 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -22,7 +22,6 @@ import grpc from grpc import _common from grpc import _grpcio_metadata from grpc._cython import cygrpc -from grpc.framework.foundation import callable_util _LOGGER = logging.getLogger(__name__) @@ -871,9 +870,11 @@ def _deliver(state, initial_connectivity, initial_callbacks): while True: for callback in callbacks: cygrpc.block_if_fork_in_progress(state) - callable_util.call_logging_exceptions( - callback, _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE, - connectivity) + try: + callback(connectivity) + except Exception: # pylint: disable=broad-except + _LOGGER.exception( + _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE) with state.lock: callbacks = _deliveries(state) if callbacks: diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index eb750ef1a82..83ccf38232f 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -25,7 +25,6 @@ import grpc from grpc import _common from grpc import _interceptor from grpc._cython import cygrpc -from grpc.framework.foundation import callable_util _LOGGER = logging.getLogger(__name__) @@ -748,8 +747,10 @@ def _process_event_and_continue(state, event): else: rpc_state, callbacks = event.tag(event) for callback in callbacks: - callable_util.call_logging_exceptions(callback, - 'Exception calling callback!') + try: + callback() + except Exception: # pylint: disable=broad-except + _LOGGER.exception('Exception calling callback!') if rpc_state is not None: with state.lock: state.rpc_states.remove(rpc_state) diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index d90b34bcbd4..d1f465a83a6 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -16,12 +16,14 @@ import collections import threading import time +import logging import six import grpc from grpc import _common -from grpc.framework.foundation import callable_util + +_LOGGER = logging.getLogger(__name__) _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = ( 'Exception calling connectivity future "done" callback!') @@ -98,8 +100,10 @@ class _ChannelReadyFuture(grpc.Future): return for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + try: + done_callback(self) + except Exception: # pylint: disable=broad-except + _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) def cancel(self): with self._condition: @@ -113,8 +117,10 @@ class _ChannelReadyFuture(grpc.Future): return False for done_callback in done_callbacks: - callable_util.call_logging_exceptions( - done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + try: + done_callback(self) + except Exception: # pylint: disable=broad-except + _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE) return True From 6e94552a306e9cfcff834e39bc833bcb8055e6fe Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 10 Jan 2019 17:00:49 -0800 Subject: [PATCH 02/16] Add a caching interceptor to the keyvaluestore example --- examples/BUILD | 3 +- .../cpp/keyvaluestore/caching_interceptor.h | 128 ++++++++++++++++++ examples/cpp/keyvaluestore/client.cc | 19 ++- 3 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 examples/cpp/keyvaluestore/caching_interceptor.h diff --git a/examples/BUILD b/examples/BUILD index 4fee663bd9e..0a1ca94a649 100644 --- a/examples/BUILD +++ b/examples/BUILD @@ -101,7 +101,8 @@ cc_binary( cc_binary( name = "keyvaluestore_client", - srcs = ["cpp/keyvaluestore/client.cc"], + srcs = ["cpp/keyvaluestore/caching_interceptor.h", + "cpp/keyvaluestore/client.cc"], defines = ["BAZEL_BUILD"], deps = [":keyvaluestore", "//:grpc++"], ) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h new file mode 100644 index 00000000000..393212b83bb --- /dev/null +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -0,0 +1,128 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include + +#ifdef BAZEL_BUILD +#include "examples/protos/keyvaluestore.grpc.pb.h" +#else +#include "keyvaluestore.grpc.pb.h" +#endif + +// This is a naive implementation of a cache. A new cache is for each call. For +// each new key request, the key is first searched in the map and if found. Only +// if the key is not found in the cache do we make a request. +class CachingInterceptor : public grpc::experimental::Interceptor { + public: + CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} + + void Intercept( + ::grpc::experimental::InterceptorBatchMethods* methods) override { + bool hijack = false; + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints:: + PRE_SEND_INITIAL_METADATA)) { + // Hijack all calls + hijack = true; + // Create a stream on which this interceptor can make requests + stub_ = keyvaluestore::KeyValueStore::NewStub( + methods->GetInterceptedChannel()); + stream_ = stub_->GetValues(&context_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { + // We know that clients perform a Read and a Write in a loop, so we don't + // need to maintain a list of the responses. + std::string requested_key; + const keyvaluestore::Request* req_msg = + static_cast(methods->GetSendMessage()); + if (req_msg != nullptr) { + requested_key = req_msg->key(); + } else { + // The non-serialized form would not be available in certain scenarios, + // so add a fallback + keyvaluestore::Request req_msg; + auto* buffer = methods->GetSerializedSendMessage(); + auto copied_buffer = *buffer; + GPR_ASSERT( + grpc::SerializationTraits::Deserialize( + &copied_buffer, &req_msg) + .ok()); + requested_key = req_msg.key(); + } + + // Check if the key is present in the map + auto search = cached_map_.find(requested_key); + if (search != cached_map_.end()) { + std::cout << "Key " << requested_key << "found in map"; + response_ = search->second; + } else { + std::cout << "Key " << requested_key << "not found in cache"; + // Key was not found in the cache, so make a request + keyvaluestore::Request req; + req.set_key(requested_key); + stream_->Write(req); + keyvaluestore::Response resp; + stream_->Read(&resp); + response_ = resp.value(); + // Insert the pair in the cache for future requests + cached_map_.insert({requested_key, response_}); + } + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { + stream_->WritesDone(); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) { + keyvaluestore::Response* resp = + static_cast(methods->GetRecvMessage()); + resp->set_value(response_); + } + if (methods->QueryInterceptionHookPoint( + grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { + auto* status = methods->GetRecvStatus(); + *status = grpc::Status::OK; + } + if (hijack) { + methods->Hijack(); + } else { + methods->Proceed(); + } + } + + private: + grpc::ClientContext context_; + std::unique_ptr stub_; + std::unique_ptr< + grpc::ClientReaderWriter> + stream_; + std::map cached_map_; + std::string response_; +}; + +class CachingInterceptorFactory + : public grpc::experimental::ClientInterceptorFactoryInterface { + public: + grpc::experimental::Interceptor* CreateClientInterceptor( + grpc::experimental::ClientRpcInfo* info) override { + return new CachingInterceptor(info); + } +}; \ No newline at end of file diff --git a/examples/cpp/keyvaluestore/client.cc b/examples/cpp/keyvaluestore/client.cc index 17e407c273b..57c451cadf3 100644 --- a/examples/cpp/keyvaluestore/client.cc +++ b/examples/cpp/keyvaluestore/client.cc @@ -23,6 +23,8 @@ #include +#include "caching_interceptor.h" + #ifdef BAZEL_BUILD #include "examples/protos/keyvaluestore.grpc.pb.h" #else @@ -77,9 +79,20 @@ int main(int argc, char** argv) { // are created. This channel models a connection to an endpoint (in this case, // localhost at port 50051). We indicate that the channel isn't authenticated // (use of InsecureChannelCredentials()). - KeyValueStoreClient client(grpc::CreateChannel( - "localhost:50051", grpc::InsecureChannelCredentials())); - std::vector keys = {"key1", "key2", "key3", "key4", "key5"}; + // In this example, we are using a cache which has been added in as an + // interceptor. + grpc::ChannelArguments args; + std::vector< + std::unique_ptr> + interceptor_creators; + interceptor_creators.push_back(std::unique_ptr( + new CachingInterceptorFactory())); + auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( + "localhost:50051", grpc::InsecureChannelCredentials(), args, + std::move(interceptor_creators)); + KeyValueStoreClient client(channel); + std::vector keys = {"key1", "key2", "key3", "key4", + "key5", "key1", "key2", "key4"}; client.GetValues(keys); return 0; From 817fb588af4174fe5095a316e60a481f3be220df Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 10 Jan 2019 17:06:41 -0800 Subject: [PATCH 03/16] Adding a new line at the end of the file --- examples/cpp/keyvaluestore/caching_interceptor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h index 393212b83bb..a5d130da8dd 100644 --- a/examples/cpp/keyvaluestore/caching_interceptor.h +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -125,4 +125,4 @@ class CachingInterceptorFactory grpc::experimental::ClientRpcInfo* info) override { return new CachingInterceptor(info); } -}; \ No newline at end of file +}; From 9c51ff9b331a07938525c49c3146a7ebbe1d0e57 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 10 Jan 2019 22:32:06 +0100 Subject: [PATCH 04/16] Make C# ServerCallContext implementation agnostic --- .../TestServerCallContext.cs | 21 +--- .../Grpc.Core/Internal/CallSafeHandle.cs | 1 + .../Internal/IServerResponseStream.cs | 38 +++++++ .../Internal/ServerCallContextExtraData.cs | 97 +++++++++++++++++ .../Grpc.Core/Internal/ServerCallHandler.cs | 10 +- .../Internal/ServerResponseStream.cs | 2 +- src/csharp/Grpc.Core/ServerCallContext.cs | 101 +++++++----------- 7 files changed, 182 insertions(+), 88 deletions(-) create mode 100644 src/csharp/Grpc.Core/Internal/IServerResponseStream.cs create mode 100644 src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs diff --git a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs index 5418417d7ed..d72e98e75a2 100644 --- a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs +++ b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs @@ -37,22 +37,11 @@ namespace Grpc.Core.Testing Func writeHeadersFunc, Func writeOptionsGetter, Action writeOptionsSetter) { return new ServerCallContext(null, method, host, deadline, requestHeaders, cancellationToken, - writeHeadersFunc, new WriteOptionsHolder(writeOptionsGetter, writeOptionsSetter), - () => peer, () => authContext, () => contextPropagationToken); - } - - private class WriteOptionsHolder : IHasWriteOptions - { - Func writeOptionsGetter; - Action writeOptionsSetter; - - public WriteOptionsHolder(Func writeOptionsGetter, Action writeOptionsSetter) - { - this.writeOptionsGetter = writeOptionsGetter; - this.writeOptionsSetter = writeOptionsSetter; - } - - public WriteOptions WriteOptions { get => writeOptionsGetter(); set => writeOptionsSetter(value); } + (ctx, extraData, headers) => writeHeadersFunc(headers), + (ctx, extraData) => writeOptionsGetter(), + (ctx, extraData, options) => writeOptionsSetter(options), + (ctx, extraData) => peer, (ctx, callHandle) => authContext, + (ctx, callHandle, options) => contextPropagationToken); } } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index a3ef3e61ee1..7154ddae30b 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -18,6 +18,7 @@ using System; using System.Diagnostics; using System.Runtime.InteropServices; using System.Text; +using System.Threading; using Grpc.Core; using Grpc.Core.Utils; using Grpc.Core.Profiling; diff --git a/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs new file mode 100644 index 00000000000..874aae703a2 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IServerResponseStream.cs @@ -0,0 +1,38 @@ +#region Copyright notice and license +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#endregion + +using System; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// + /// Exposes non-generic members of ServerReponseStream. + /// + internal interface IServerResponseStream + { + /// + /// Asynchronously sends response headers for the current call to the client. See ServerCallContext.WriteResponseHeadersAsync for exact semantics. + /// + Task WriteResponseHeadersAsync(Metadata responseHeaders); + + /// + /// Gets or sets the write options. + /// + WriteOptions WriteOptions { get; set; } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs b/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs new file mode 100644 index 00000000000..97b95e66df9 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs @@ -0,0 +1,97 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal +{ + /// + /// Additional state for ServerCallContext. + /// Storing the extra state outside of ServerCallContext allows it to be implementation-agnostic. + /// + internal class ServerCallContextExtraData + { + readonly CallSafeHandle callHandle; + readonly IServerResponseStream serverResponseStream; + readonly Lazy cachedAuthContext; + + public ServerCallContextExtraData(CallSafeHandle callHandle, IServerResponseStream serverResponseStream) + { + this.callHandle = callHandle; + this.serverResponseStream = serverResponseStream; + // TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object. + this.cachedAuthContext = new Lazy(GetAuthContextEager); + } + + public ServerCallContext NewServerCallContext(ServerRpcNew newRpc, CancellationToken cancellationToken) + { + DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); + + return new ServerCallContext(this, newRpc.Method, newRpc.Host, realtimeDeadline, + newRpc.RequestMetadata, cancellationToken, + ServerCallContext_WriteHeadersFunc, ServerCallContext_WriteOptionsGetter, ServerCallContext_WriteOptionsSetter, + ServerCallContext_PeerGetter, ServerCallContext_AuthContextGetter, ServerCallContext_ContextPropagationTokenFactory); + } + + private AuthContext GetAuthContextEager() + { + using (var authContextNative = callHandle.GetAuthContext()) + { + return authContextNative.ToAuthContext(); + } + } + + // Implementors of ServerCallContext's members are pre-allocated to avoid unneccessary delegate allocations. + readonly static Func ServerCallContext_WriteHeadersFunc = (ctx, extraData, headers) => + { + return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteResponseHeadersAsync(headers); + }; + + readonly static Func ServerCallContext_WriteOptionsGetter = (ctx, extraData) => + { + + return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions; + }; + + readonly static Action ServerCallContext_WriteOptionsSetter = (ctx, extraData, options) => + { + ((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions = options; + }; + + readonly static Func ServerCallContext_PeerGetter = (ctx, extraData) => + { + // Getting the peer lazily is fine as the native call is guaranteed + // not to be disposed before user-supplied server side handler returns. + // Most users won't need to read this field anyway. + return ((ServerCallContextExtraData)extraData).callHandle.GetPeer(); + }; + + readonly static Func ServerCallContext_AuthContextGetter = (ctx, extraData) => + { + return ((ServerCallContextExtraData)extraData).cachedAuthContext.Value; + }; + + readonly static Func ServerCallContext_ContextPropagationTokenFactory = (ctx, extraData, options) => + { + var callHandle = ((ServerCallContextExtraData)extraData).callHandle; + return new ContextPropagationToken(callHandle, ctx.Deadline, ctx.CancellationToken, options); + }; + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index ec732e8c7f4..ae586f7d1c4 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -71,7 +71,7 @@ namespace Grpc.Core.Internal var response = await handler(request, context).ConfigureAwait(false); status = context.Status; responseWithFlags = new AsyncCallServer.ResponseWithFlags(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); - } + } catch (Exception e) { if (!(e is RpcException)) @@ -345,14 +345,12 @@ namespace Grpc.Core.Internal return writeOptions != null ? writeOptions.Flags : default(WriteFlags); } - public static ServerCallContext NewContext(ServerRpcNew newRpc, ServerResponseStream serverResponseStream, CancellationToken cancellationToken) - where TRequest : class - where TResponse : class + public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken) { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); - return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); + var contextExtraData = new ServerCallContextExtraData(newRpc.Call, serverResponseStream); + return contextExtraData.NewServerCallContext(newRpc, cancellationToken); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 352b98829c7..079849e4c61 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -23,7 +23,7 @@ namespace Grpc.Core.Internal /// /// Writes responses asynchronously to an underlying AsyncCallServer object. /// - internal class ServerResponseStream : IServerStreamWriter, IHasWriteOptions + internal class ServerResponseStream : IServerStreamWriter, IServerResponseStream where TRequest : class where TResponse : class { diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 74a7deabea0..05c20ca75f8 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -21,6 +21,7 @@ using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -29,45 +30,49 @@ namespace Grpc.Core /// public class ServerCallContext { - private readonly CallSafeHandle callHandle; + private readonly object extraData; private readonly string method; private readonly string host; private readonly DateTime deadline; private readonly Metadata requestHeaders; private readonly CancellationToken cancellationToken; private readonly Metadata responseTrailers = new Metadata(); - private readonly Func writeHeadersFunc; - private readonly IHasWriteOptions writeOptionsHolder; - private readonly Lazy authContext; - private readonly Func testingOnlyPeerGetter; - private readonly Func testingOnlyAuthContextGetter; - private readonly Func testingOnlyContextPropagationTokenFactory; + private readonly Func writeHeadersFunc; + private readonly Func writeOptionsGetter; + private readonly Action writeOptionsSetter; - private Status status = Status.DefaultSuccess; + private readonly Func peerGetter; + private readonly Func authContextGetter; + private readonly Func contextPropagationTokenFactory; - internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, - Func writeHeadersFunc, IHasWriteOptions writeOptionsHolder) - : this(callHandle, method, host, deadline, requestHeaders, cancellationToken, writeHeadersFunc, writeOptionsHolder, null, null, null) - { - } + private Status status = Status.DefaultSuccess; - // Additional constructor params should be used for testing only - internal ServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, - Func writeHeadersFunc, IHasWriteOptions writeOptionsHolder, - Func testingOnlyPeerGetter, Func testingOnlyAuthContextGetter, Func testingOnlyContextPropagationTokenFactory) + /// + /// Creates a new instance of ServerCallContext. + /// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally. + /// To provide state, this ServerCallContext instance and extraData will be passed to the member implementations. + /// + internal ServerCallContext(object extraData, + string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + Func writeHeadersFunc, + Func writeOptionsGetter, + Action writeOptionsSetter, + Func peerGetter, + Func authContextGetter, + Func contextPropagationTokenFactory) { - this.callHandle = callHandle; + this.extraData = extraData; this.method = method; this.host = host; this.deadline = deadline; this.requestHeaders = requestHeaders; this.cancellationToken = cancellationToken; - this.writeHeadersFunc = writeHeadersFunc; - this.writeOptionsHolder = writeOptionsHolder; - this.authContext = new Lazy(GetAuthContextEager); - this.testingOnlyPeerGetter = testingOnlyPeerGetter; - this.testingOnlyAuthContextGetter = testingOnlyAuthContextGetter; - this.testingOnlyContextPropagationTokenFactory = testingOnlyContextPropagationTokenFactory; + this.writeHeadersFunc = GrpcPreconditions.CheckNotNull(writeHeadersFunc); + this.writeOptionsGetter = GrpcPreconditions.CheckNotNull(writeOptionsGetter); + this.writeOptionsSetter = GrpcPreconditions.CheckNotNull(writeOptionsSetter); + this.peerGetter = GrpcPreconditions.CheckNotNull(peerGetter); + this.authContextGetter = GrpcPreconditions.CheckNotNull(authContextGetter); + this.contextPropagationTokenFactory = GrpcPreconditions.CheckNotNull(contextPropagationTokenFactory); } /// @@ -79,7 +84,7 @@ namespace Grpc.Core /// The task that finished once response headers have been written. public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - return writeHeadersFunc(responseHeaders); + return writeHeadersFunc(this, extraData, responseHeaders); } /// @@ -87,13 +92,9 @@ namespace Grpc.Core /// public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) { - if (testingOnlyContextPropagationTokenFactory != null) - { - return testingOnlyContextPropagationTokenFactory(); - } - return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); + return contextPropagationTokenFactory(this, extraData, options); } - + /// Name of method called in this RPC. public string Method { @@ -117,14 +118,7 @@ namespace Grpc.Core { get { - if (testingOnlyPeerGetter != null) - { - return testingOnlyPeerGetter(); - } - // Getting the peer lazily is fine as the native call is guaranteed - // not to be disposed before user-supplied server side handler returns. - // Most users won't need to read this field anyway. - return this.callHandle.GetPeer(); + return peerGetter(this, extraData); } } @@ -187,12 +181,12 @@ namespace Grpc.Core { get { - return writeOptionsHolder.WriteOptions; + return writeOptionsGetter(this, extraData); } set { - writeOptionsHolder.WriteOptions = value; + writeOptionsSetter(this, extraData, value); } } @@ -204,31 +198,8 @@ namespace Grpc.Core { get { - if (testingOnlyAuthContextGetter != null) - { - return testingOnlyAuthContextGetter(); - } - return authContext.Value; + return authContextGetter(this, extraData); } } - - private AuthContext GetAuthContextEager() - { - using (var authContextNative = callHandle.GetAuthContext()) - { - return authContextNative.ToAuthContext(); - } - } - } - - /// - /// Allows sharing write options between ServerCallContext and other objects. - /// - internal interface IHasWriteOptions - { - /// - /// Gets or sets the write options. - /// - WriteOptions WriteOptions { get; set; } } } From 7dd938d5f4b74b080f47a2cfa349486503292985 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Wed, 16 Jan 2019 14:27:10 -0800 Subject: [PATCH 05/16] Reviewer comments --- examples/cpp/keyvaluestore/caching_interceptor.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h index a5d130da8dd..8ecdafaf159 100644 --- a/examples/cpp/keyvaluestore/caching_interceptor.h +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -27,7 +27,7 @@ #endif // This is a naive implementation of a cache. A new cache is for each call. For -// each new key request, the key is first searched in the map and if found. Only +// each new key request, the key is first searched in the map and if found, the interceptor feeds in the value. Only // if the key is not found in the cache do we make a request. class CachingInterceptor : public grpc::experimental::Interceptor { public: @@ -102,8 +102,10 @@ class CachingInterceptor : public grpc::experimental::Interceptor { *status = grpc::Status::OK; } if (hijack) { + // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in the hook points methods->Hijack(); } else { + // Proceed is an indicator that the interceptor is done intercepting the batch. methods->Proceed(); } } From d80731d3e8098fc9068bc4a7f15128db42935d53 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 17 Jan 2019 09:54:26 +0100 Subject: [PATCH 06/16] revert unnecessary using --- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 7154ddae30b..a3ef3e61ee1 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -18,7 +18,6 @@ using System; using System.Diagnostics; using System.Runtime.InteropServices; using System.Text; -using System.Threading; using Grpc.Core; using Grpc.Core.Utils; using Grpc.Core.Profiling; From e358f567b0edc26ead1db3bd6c5e4d033bf69a7d Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 17 Jan 2019 14:16:41 +0100 Subject: [PATCH 07/16] make ServerCallContext an abstract base class --- .../TestServerCallContext.cs | 73 ++++++++- .../Internal/DefaultServerCallContext.cs | 111 ++++++++++++++ .../Internal/ServerCallContextExtraData.cs | 97 ------------ .../Grpc.Core/Internal/ServerCallHandler.cs | 4 +- src/csharp/Grpc.Core/ServerCallContext.cs | 143 +++++------------- 5 files changed, 220 insertions(+), 208 deletions(-) create mode 100644 src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs delete mode 100644 src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs diff --git a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs index d72e98e75a2..7a4fb15b4f9 100644 --- a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs +++ b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs @@ -36,12 +36,73 @@ namespace Grpc.Core.Testing string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken, Func writeHeadersFunc, Func writeOptionsGetter, Action writeOptionsSetter) { - return new ServerCallContext(null, method, host, deadline, requestHeaders, cancellationToken, - (ctx, extraData, headers) => writeHeadersFunc(headers), - (ctx, extraData) => writeOptionsGetter(), - (ctx, extraData, options) => writeOptionsSetter(options), - (ctx, extraData) => peer, (ctx, callHandle) => authContext, - (ctx, callHandle, options) => contextPropagationToken); + return new TestingServerCallContext(method, host, deadline, requestHeaders, cancellationToken, peer, + authContext, contextPropagationToken, writeHeadersFunc, writeOptionsGetter, writeOptionsSetter); + } + + private class TestingServerCallContext : ServerCallContext + { + private readonly string method; + private readonly string host; + private readonly DateTime deadline; + private readonly Metadata requestHeaders; + private readonly CancellationToken cancellationToken; + private readonly Metadata responseTrailers = new Metadata(); + private Status status; + private readonly string peer; + private readonly AuthContext authContext; + private readonly ContextPropagationToken contextPropagationToken; + private readonly Func writeHeadersFunc; + private readonly Func writeOptionsGetter; + private readonly Action writeOptionsSetter; + + public TestingServerCallContext(string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, + string peer, AuthContext authContext, ContextPropagationToken contextPropagationToken, + Func writeHeadersFunc, Func writeOptionsGetter, Action writeOptionsSetter) + { + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestHeaders = requestHeaders; + this.cancellationToken = cancellationToken; + this.responseTrailers = new Metadata(); + this.status = Status.DefaultSuccess; + this.peer = peer; + this.authContext = authContext; + this.contextPropagationToken = contextPropagationToken; + this.writeHeadersFunc = writeHeadersFunc; + this.writeOptionsGetter = writeOptionsGetter; + this.writeOptionsSetter = writeOptionsSetter; + } + + protected override string MethodInternal => method; + + protected override string HostInternal => host; + + protected override string PeerInternal => peer; + + protected override DateTime DeadlineInternal => deadline; + + protected override Metadata RequestHeadersInternal => requestHeaders; + + protected override CancellationToken CancellationTokenInternal => cancellationToken; + + protected override Metadata ResponseTrailersInternal => responseTrailers; + + protected override Status StatusInternal { get => status; set => status = value; } + protected override WriteOptions WriteOptionsInternal { get => writeOptionsGetter(); set => writeOptionsSetter(value); } + + protected override AuthContext AuthContextInternal => authContext; + + protected override ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options) + { + return contextPropagationToken; + } + + protected override Task WriteResponseHeadersInternalAsync(Metadata responseHeaders) + { + return writeHeadersFunc(responseHeaders); + } } } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs new file mode 100644 index 00000000000..1e484bdcf2d --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs @@ -0,0 +1,111 @@ +#region Copyright notice and license + +// Copyright 2019 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core +{ + /// + /// Default implementation of ServerCallContext. + /// + internal class DefaultServerCallContext : ServerCallContext + { + private readonly CallSafeHandle callHandle; + private readonly string method; + private readonly string host; + private readonly DateTime deadline; + private readonly Metadata requestHeaders; + private readonly CancellationToken cancellationToken; + private readonly Metadata responseTrailers; + private Status status; + private readonly IServerResponseStream serverResponseStream; + private readonly Lazy authContext; + + /// + /// Creates a new instance of ServerCallContext. + /// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally. + /// To provide state, this ServerCallContext instance and extraData will be passed to the member implementations. + /// + internal DefaultServerCallContext(CallSafeHandle callHandle, string method, string host, DateTime deadline, + Metadata requestHeaders, CancellationToken cancellationToken, IServerResponseStream serverResponseStream) + { + this.callHandle = callHandle; + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestHeaders = requestHeaders; + this.cancellationToken = cancellationToken; + this.responseTrailers = new Metadata(); + this.status = Status.DefaultSuccess; + this.serverResponseStream = serverResponseStream; + // TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object + this.authContext = new Lazy(GetAuthContextEager); + } + + protected override ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options) + { + return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); + } + + protected override Task WriteResponseHeadersInternalAsync(Metadata responseHeaders) + { + return serverResponseStream.WriteResponseHeadersAsync(responseHeaders); + } + + protected override string MethodInternal => method; + + protected override string HostInternal => host; + + protected override string PeerInternal => callHandle.GetPeer(); + + protected override DateTime DeadlineInternal => deadline; + + protected override Metadata RequestHeadersInternal => requestHeaders; + + protected override CancellationToken CancellationTokenInternal => cancellationToken; + + protected override Metadata ResponseTrailersInternal => responseTrailers; + + protected override Status StatusInternal + { + get => status; + set => status = value; + } + + protected override WriteOptions WriteOptionsInternal + { + get => serverResponseStream.WriteOptions; + set => serverResponseStream.WriteOptions = value; + } + + protected override AuthContext AuthContextInternal => authContext.Value; + + private AuthContext GetAuthContextEager() + { + using (var authContextNative = callHandle.GetAuthContext()) + { + return authContextNative.ToAuthContext(); + } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs b/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs deleted file mode 100644 index 97b95e66df9..00000000000 --- a/src/csharp/Grpc.Core/Internal/ServerCallContextExtraData.cs +++ /dev/null @@ -1,97 +0,0 @@ -#region Copyright notice and license - -// Copyright 2019 The gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Grpc.Core.Internal -{ - /// - /// Additional state for ServerCallContext. - /// Storing the extra state outside of ServerCallContext allows it to be implementation-agnostic. - /// - internal class ServerCallContextExtraData - { - readonly CallSafeHandle callHandle; - readonly IServerResponseStream serverResponseStream; - readonly Lazy cachedAuthContext; - - public ServerCallContextExtraData(CallSafeHandle callHandle, IServerResponseStream serverResponseStream) - { - this.callHandle = callHandle; - this.serverResponseStream = serverResponseStream; - // TODO(jtattermusch): avoid unnecessary allocation of factory function and the lazy object. - this.cachedAuthContext = new Lazy(GetAuthContextEager); - } - - public ServerCallContext NewServerCallContext(ServerRpcNew newRpc, CancellationToken cancellationToken) - { - DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); - - return new ServerCallContext(this, newRpc.Method, newRpc.Host, realtimeDeadline, - newRpc.RequestMetadata, cancellationToken, - ServerCallContext_WriteHeadersFunc, ServerCallContext_WriteOptionsGetter, ServerCallContext_WriteOptionsSetter, - ServerCallContext_PeerGetter, ServerCallContext_AuthContextGetter, ServerCallContext_ContextPropagationTokenFactory); - } - - private AuthContext GetAuthContextEager() - { - using (var authContextNative = callHandle.GetAuthContext()) - { - return authContextNative.ToAuthContext(); - } - } - - // Implementors of ServerCallContext's members are pre-allocated to avoid unneccessary delegate allocations. - readonly static Func ServerCallContext_WriteHeadersFunc = (ctx, extraData, headers) => - { - return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteResponseHeadersAsync(headers); - }; - - readonly static Func ServerCallContext_WriteOptionsGetter = (ctx, extraData) => - { - - return ((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions; - }; - - readonly static Action ServerCallContext_WriteOptionsSetter = (ctx, extraData, options) => - { - ((ServerCallContextExtraData)extraData).serverResponseStream.WriteOptions = options; - }; - - readonly static Func ServerCallContext_PeerGetter = (ctx, extraData) => - { - // Getting the peer lazily is fine as the native call is guaranteed - // not to be disposed before user-supplied server side handler returns. - // Most users won't need to read this field anyway. - return ((ServerCallContextExtraData)extraData).callHandle.GetPeer(); - }; - - readonly static Func ServerCallContext_AuthContextGetter = (ctx, extraData) => - { - return ((ServerCallContextExtraData)extraData).cachedAuthContext.Value; - }; - - readonly static Func ServerCallContext_ContextPropagationTokenFactory = (ctx, extraData, options) => - { - var callHandle = ((ServerCallContextExtraData)extraData).callHandle; - return new ContextPropagationToken(callHandle, ctx.Deadline, ctx.CancellationToken, options); - }; - } -} diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index ae586f7d1c4..c3859f1de27 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -348,9 +348,7 @@ namespace Grpc.Core.Internal public static ServerCallContext NewContext(ServerRpcNew newRpc, IServerResponseStream serverResponseStream, CancellationToken cancellationToken) { DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); - - var contextExtraData = new ServerCallContextExtraData(newRpc.Call, serverResponseStream); - return contextExtraData.NewServerCallContext(newRpc, cancellationToken); + return new DefaultServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream); } } } diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 05c20ca75f8..4a2fdf32c71 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -28,51 +28,13 @@ namespace Grpc.Core /// /// Context for a server-side call. /// - public class ServerCallContext + public abstract class ServerCallContext { - private readonly object extraData; - private readonly string method; - private readonly string host; - private readonly DateTime deadline; - private readonly Metadata requestHeaders; - private readonly CancellationToken cancellationToken; - private readonly Metadata responseTrailers = new Metadata(); - private readonly Func writeHeadersFunc; - private readonly Func writeOptionsGetter; - private readonly Action writeOptionsSetter; - - private readonly Func peerGetter; - private readonly Func authContextGetter; - private readonly Func contextPropagationTokenFactory; - - private Status status = Status.DefaultSuccess; - /// /// Creates a new instance of ServerCallContext. - /// To allow reuse of ServerCallContext API by different gRPC implementations, the implementation of some members is provided externally. - /// To provide state, this ServerCallContext instance and extraData will be passed to the member implementations. /// - internal ServerCallContext(object extraData, - string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, - Func writeHeadersFunc, - Func writeOptionsGetter, - Action writeOptionsSetter, - Func peerGetter, - Func authContextGetter, - Func contextPropagationTokenFactory) + protected ServerCallContext() { - this.extraData = extraData; - this.method = method; - this.host = host; - this.deadline = deadline; - this.requestHeaders = requestHeaders; - this.cancellationToken = cancellationToken; - this.writeHeadersFunc = GrpcPreconditions.CheckNotNull(writeHeadersFunc); - this.writeOptionsGetter = GrpcPreconditions.CheckNotNull(writeOptionsGetter); - this.writeOptionsSetter = GrpcPreconditions.CheckNotNull(writeOptionsSetter); - this.peerGetter = GrpcPreconditions.CheckNotNull(peerGetter); - this.authContextGetter = GrpcPreconditions.CheckNotNull(authContextGetter); - this.contextPropagationTokenFactory = GrpcPreconditions.CheckNotNull(contextPropagationTokenFactory); } /// @@ -84,7 +46,7 @@ namespace Grpc.Core /// The task that finished once response headers have been written. public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - return writeHeadersFunc(this, extraData, responseHeaders); + return WriteResponseHeadersInternalAsync(responseHeaders); } /// @@ -92,83 +54,41 @@ namespace Grpc.Core /// public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) { - return contextPropagationTokenFactory(this, extraData, options); + return CreatePropagationTokenInternal(options); } /// Name of method called in this RPC. - public string Method - { - get - { - return this.method; - } - } + public string Method => MethodInternal; /// Name of host called in this RPC. - public string Host - { - get - { - return this.host; - } - } + public string Host => HostInternal; /// Address of the remote endpoint in URI format. - public string Peer - { - get - { - return peerGetter(this, extraData); - } - } + public string Peer => PeerInternal; /// Deadline for this RPC. - public DateTime Deadline - { - get - { - return this.deadline; - } - } + public DateTime Deadline => DeadlineInternal; /// Initial metadata sent by client. - public Metadata RequestHeaders - { - get - { - return this.requestHeaders; - } - } + public Metadata RequestHeaders => RequestHeadersInternal; /// Cancellation token signals when call is cancelled. - public CancellationToken CancellationToken - { - get - { - return this.cancellationToken; - } - } + public CancellationToken CancellationToken => CancellationTokenInternal; /// Trailers to send back to client after RPC finishes. - public Metadata ResponseTrailers - { - get - { - return this.responseTrailers; - } - } + public Metadata ResponseTrailers => ResponseTrailersInternal; /// Status to send back to client after RPC finishes. public Status Status { get { - return this.status; + return StatusInternal; } set { - status = value; + StatusInternal = value; } } @@ -181,12 +101,12 @@ namespace Grpc.Core { get { - return writeOptionsGetter(this, extraData); + return WriteOptionsInternal; } set { - writeOptionsSetter(this, extraData, value); + WriteOptionsInternal = value; } } @@ -194,12 +114,31 @@ namespace Grpc.Core /// Gets the AuthContext associated with this call. /// Note: Access to AuthContext is an experimental API that can change without any prior notice. /// - public AuthContext AuthContext - { - get - { - return authContextGetter(this, extraData); - } - } + public AuthContext AuthContext => AuthContextInternal; + + /// Provides implementation of a non-virtual public member. + protected abstract Task WriteResponseHeadersInternalAsync(Metadata responseHeaders); + /// Provides implementation of a non-virtual public member. + protected abstract ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options); + /// Provides implementation of a non-virtual public member. + protected abstract string MethodInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract string HostInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract string PeerInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract DateTime DeadlineInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Metadata RequestHeadersInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract CancellationToken CancellationTokenInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Metadata ResponseTrailersInternal { get; } + /// Provides implementation of a non-virtual public member. + protected abstract Status StatusInternal { get; set; } + /// Provides implementation of a non-virtual public member. + protected abstract WriteOptions WriteOptionsInternal { get; set; } + /// Provides implementation of a non-virtual public member. + protected abstract AuthContext AuthContextInternal { get; } } } From 7d6341b627b883d400074bbdd0a70735f5290e84 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 17 Jan 2019 14:51:02 +0100 Subject: [PATCH 08/16] remove unnecessary using --- src/csharp/Grpc.Core.Testing/TestServerCallContext.cs | 1 - src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs | 1 - src/csharp/Grpc.Core/ServerCallContext.cs | 3 --- 3 files changed, 5 deletions(-) diff --git a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs index 7a4fb15b4f9..ff4fb66c6c9 100644 --- a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs +++ b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs @@ -19,7 +19,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using Grpc.Core; namespace Grpc.Core.Testing { diff --git a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs index 1e484bdcf2d..b6a29af2edb 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs @@ -21,7 +21,6 @@ using System.Threading; using System.Threading.Tasks; using Grpc.Core.Internal; -using Grpc.Core.Utils; namespace Grpc.Core { diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 4a2fdf32c71..17aa1fe0661 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -20,9 +20,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; -using Grpc.Core.Utils; - namespace Grpc.Core { /// From dde966f8c62f664b637d195983562b75860e4626 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 17 Jan 2019 12:03:14 -0800 Subject: [PATCH 09/16] Reviewer comments --- examples/cpp/keyvaluestore/caching_interceptor.h | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/examples/cpp/keyvaluestore/caching_interceptor.h b/examples/cpp/keyvaluestore/caching_interceptor.h index 8ecdafaf159..5a31afe0f01 100644 --- a/examples/cpp/keyvaluestore/caching_interceptor.h +++ b/examples/cpp/keyvaluestore/caching_interceptor.h @@ -27,8 +27,9 @@ #endif // This is a naive implementation of a cache. A new cache is for each call. For -// each new key request, the key is first searched in the map and if found, the interceptor feeds in the value. Only -// if the key is not found in the cache do we make a request. +// each new key request, the key is first searched in the map and if found, the +// interceptor fills in the return value without making a request to the server. +// Only if the key is not found in the cache do we make a request. class CachingInterceptor : public grpc::experimental::Interceptor { public: CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} @@ -101,11 +102,14 @@ class CachingInterceptor : public grpc::experimental::Interceptor { auto* status = methods->GetRecvStatus(); *status = grpc::Status::OK; } + // One of Hijack or Proceed always needs to be called to make progress. if (hijack) { - // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in the hook points + // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in + // the hook points methods->Hijack(); } else { - // Proceed is an indicator that the interceptor is done intercepting the batch. + // Proceed is an indicator that the interceptor is done intercepting the + // batch. methods->Proceed(); } } From 189313d1ddf1358fc23e3924a2cf4785916a61b8 Mon Sep 17 00:00:00 2001 From: Alex Polcyn Date: Thu, 17 Jan 2019 01:31:11 +0000 Subject: [PATCH 10/16] Get the ruby interop client buildable for 1.18.0 back compatiblity matrix --- tools/interop_matrix/client_matrix.py | 4 ++++ .../interop_matrix/patches/ruby_v1.18.0/git_repo.patch | 10 ++++++++++ 2 files changed, 14 insertions(+) create mode 100644 tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index cd542b0f4c5..9b533867836 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -201,6 +201,10 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', + ReleaseInfo(patch=[ + 'tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh', + ])), ]), 'php': OrderedDict([ diff --git a/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch b/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch new file mode 100644 index 00000000000..dfa3cfc031a --- /dev/null +++ b/tools/interop_matrix/patches/ruby_v1.18.0/git_repo.patch @@ -0,0 +1,10 @@ +diff --git a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh +index 67f66090ae..e71ad91499 100755 +--- a/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh ++++ b/tools/dockerfile/interoptest/grpc_interop_ruby/build_interop.sh +@@ -30,4 +30,4 @@ cd /var/local/git/grpc + rvm --default use ruby-2.5 + + # build Ruby interop client and server +-(cd src/ruby && gem update bundler && bundle && rake compile) ++(cd src/ruby && gem install bundler -v 1.17.3 && bundle && rake compile) From 3bd12ee2a8ec39cc186bd11abb7f0afa23633072 Mon Sep 17 00:00:00 2001 From: Lei Huang Date: Thu, 17 Jan 2019 16:15:44 -0700 Subject: [PATCH 11/16] grpc: init compression_algorithm_ in ClientContext ctor `compression_algorithm_` could be a random value because not initialized in ctor. --- src/cpp/client/client_context.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index c9ea3e5f83b..efb59c71a8c 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -57,6 +57,7 @@ ClientContext::ClientContext() deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), census_context_(nullptr), propagate_from_call_(nullptr), + compression_algorithm_(GRPC_COMPRESS_NONE), initial_metadata_corked_(false) { g_client_callbacks->DefaultConstructor(this); } From 5fc904a5e52e3e310eab4ae5425039dc4ae9718f Mon Sep 17 00:00:00 2001 From: Alexander Polcyn Date: Thu, 17 Jan 2019 11:56:56 -0800 Subject: [PATCH 12/16] Attempt to fix brew-update/rvm installation issue on mac --- tools/internal_ci/helper_scripts/prepare_build_macos_rc | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc index 5b6b2569393..7b9b02b6318 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc @@ -40,6 +40,7 @@ fi set +ex # rvm script is very verbose and exits with errorcode # Advice from https://github.com/Homebrew/homebrew-cask/issues/8629#issuecomment-68641176 brew update && brew upgrade brew-cask && brew cleanup && brew cask cleanup +rvm --debug requirements ruby-2.5.0 source $HOME/.rvm/scripts/rvm set -e # rvm commands are very verbose time rvm install 2.5.0 From bf48d410a748cbb3a0e385121ebf49583ac52053 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 18 Jan 2019 09:33:35 +0100 Subject: [PATCH 13/16] change suffix for protected ServerCallContext members to *Core --- .../TestServerCallContext.cs | 24 ++++----- .../Internal/DefaultServerCallContext.cs | 24 ++++----- src/csharp/Grpc.Core/ServerCallContext.cs | 52 +++++++++---------- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs index ff4fb66c6c9..e6297e61226 100644 --- a/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs +++ b/src/csharp/Grpc.Core.Testing/TestServerCallContext.cs @@ -74,31 +74,31 @@ namespace Grpc.Core.Testing this.writeOptionsSetter = writeOptionsSetter; } - protected override string MethodInternal => method; + protected override string MethodCore => method; - protected override string HostInternal => host; + protected override string HostCore => host; - protected override string PeerInternal => peer; + protected override string PeerCore => peer; - protected override DateTime DeadlineInternal => deadline; + protected override DateTime DeadlineCore => deadline; - protected override Metadata RequestHeadersInternal => requestHeaders; + protected override Metadata RequestHeadersCore => requestHeaders; - protected override CancellationToken CancellationTokenInternal => cancellationToken; + protected override CancellationToken CancellationTokenCore => cancellationToken; - protected override Metadata ResponseTrailersInternal => responseTrailers; + protected override Metadata ResponseTrailersCore => responseTrailers; - protected override Status StatusInternal { get => status; set => status = value; } - protected override WriteOptions WriteOptionsInternal { get => writeOptionsGetter(); set => writeOptionsSetter(value); } + protected override Status StatusCore { get => status; set => status = value; } + protected override WriteOptions WriteOptionsCore { get => writeOptionsGetter(); set => writeOptionsSetter(value); } - protected override AuthContext AuthContextInternal => authContext; + protected override AuthContext AuthContextCore => authContext; - protected override ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options) + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options) { return contextPropagationToken; } - protected override Task WriteResponseHeadersInternalAsync(Metadata responseHeaders) + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { return writeHeadersFunc(responseHeaders); } diff --git a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs index b6a29af2edb..8220e599f92 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs @@ -61,43 +61,43 @@ namespace Grpc.Core this.authContext = new Lazy(GetAuthContextEager); } - protected override ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options) + protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options) { return new ContextPropagationToken(callHandle, deadline, cancellationToken, options); } - protected override Task WriteResponseHeadersInternalAsync(Metadata responseHeaders) + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { return serverResponseStream.WriteResponseHeadersAsync(responseHeaders); } - protected override string MethodInternal => method; + protected override string MethodCore => method; - protected override string HostInternal => host; + protected override string HostCore => host; - protected override string PeerInternal => callHandle.GetPeer(); + protected override string PeerCore => callHandle.GetPeer(); - protected override DateTime DeadlineInternal => deadline; + protected override DateTime DeadlineCore => deadline; - protected override Metadata RequestHeadersInternal => requestHeaders; + protected override Metadata RequestHeadersCore => requestHeaders; - protected override CancellationToken CancellationTokenInternal => cancellationToken; + protected override CancellationToken CancellationTokenCore => cancellationToken; - protected override Metadata ResponseTrailersInternal => responseTrailers; + protected override Metadata ResponseTrailersCore => responseTrailers; - protected override Status StatusInternal + protected override Status StatusCore { get => status; set => status = value; } - protected override WriteOptions WriteOptionsInternal + protected override WriteOptions WriteOptionsCore { get => serverResponseStream.WriteOptions; set => serverResponseStream.WriteOptions = value; } - protected override AuthContext AuthContextInternal => authContext.Value; + protected override AuthContext AuthContextCore => authContext.Value; private AuthContext GetAuthContextEager() { diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs index 17aa1fe0661..90b6e9419f0 100644 --- a/src/csharp/Grpc.Core/ServerCallContext.cs +++ b/src/csharp/Grpc.Core/ServerCallContext.cs @@ -43,7 +43,7 @@ namespace Grpc.Core /// The task that finished once response headers have been written. public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - return WriteResponseHeadersInternalAsync(responseHeaders); + return WriteResponseHeadersAsyncCore(responseHeaders); } /// @@ -51,41 +51,41 @@ namespace Grpc.Core /// public ContextPropagationToken CreatePropagationToken(ContextPropagationOptions options = null) { - return CreatePropagationTokenInternal(options); + return CreatePropagationTokenCore(options); } /// Name of method called in this RPC. - public string Method => MethodInternal; + public string Method => MethodCore; /// Name of host called in this RPC. - public string Host => HostInternal; + public string Host => HostCore; /// Address of the remote endpoint in URI format. - public string Peer => PeerInternal; + public string Peer => PeerCore; /// Deadline for this RPC. - public DateTime Deadline => DeadlineInternal; + public DateTime Deadline => DeadlineCore; /// Initial metadata sent by client. - public Metadata RequestHeaders => RequestHeadersInternal; + public Metadata RequestHeaders => RequestHeadersCore; /// Cancellation token signals when call is cancelled. - public CancellationToken CancellationToken => CancellationTokenInternal; + public CancellationToken CancellationToken => CancellationTokenCore; /// Trailers to send back to client after RPC finishes. - public Metadata ResponseTrailers => ResponseTrailersInternal; + public Metadata ResponseTrailers => ResponseTrailersCore; /// Status to send back to client after RPC finishes. public Status Status { get { - return StatusInternal; + return StatusCore; } set { - StatusInternal = value; + StatusCore = value; } } @@ -98,12 +98,12 @@ namespace Grpc.Core { get { - return WriteOptionsInternal; + return WriteOptionsCore; } set { - WriteOptionsInternal = value; + WriteOptionsCore = value; } } @@ -111,31 +111,31 @@ namespace Grpc.Core /// Gets the AuthContext associated with this call. /// Note: Access to AuthContext is an experimental API that can change without any prior notice. /// - public AuthContext AuthContext => AuthContextInternal; + public AuthContext AuthContext => AuthContextCore; /// Provides implementation of a non-virtual public member. - protected abstract Task WriteResponseHeadersInternalAsync(Metadata responseHeaders); + protected abstract Task WriteResponseHeadersAsyncCore(Metadata responseHeaders); /// Provides implementation of a non-virtual public member. - protected abstract ContextPropagationToken CreatePropagationTokenInternal(ContextPropagationOptions options); + protected abstract ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options); /// Provides implementation of a non-virtual public member. - protected abstract string MethodInternal { get; } + protected abstract string MethodCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract string HostInternal { get; } + protected abstract string HostCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract string PeerInternal { get; } + protected abstract string PeerCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract DateTime DeadlineInternal { get; } + protected abstract DateTime DeadlineCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract Metadata RequestHeadersInternal { get; } + protected abstract Metadata RequestHeadersCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract CancellationToken CancellationTokenInternal { get; } + protected abstract CancellationToken CancellationTokenCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract Metadata ResponseTrailersInternal { get; } + protected abstract Metadata ResponseTrailersCore { get; } /// Provides implementation of a non-virtual public member. - protected abstract Status StatusInternal { get; set; } + protected abstract Status StatusCore { get; set; } /// Provides implementation of a non-virtual public member. - protected abstract WriteOptions WriteOptionsInternal { get; set; } + protected abstract WriteOptions WriteOptionsCore { get; set; } /// Provides implementation of a non-virtual public member. - protected abstract AuthContext AuthContextInternal { get; } + protected abstract AuthContext AuthContextCore { get; } } } From d67009124fd258d7f42aef0ca87b1c0dddf3e018 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 17 Jan 2019 17:10:18 +0100 Subject: [PATCH 14/16] commenting on PRs is no longer used --- .../prepare_build_linux_perf_rc | 7 ---- .../helper_scripts/prepare_build_macos_rc | 6 --- .../pull_request/grpc_ios_binary_size.cfg | 1 - tools/run_tests/python_utils/comment_on_pr.py | 37 ------------------- 4 files changed, 51 deletions(-) delete mode 100644 tools/run_tests/python_utils/comment_on_pr.py diff --git a/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc b/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc index ec1ec1179d3..ff5593e031a 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_linux_perf_rc @@ -21,15 +21,8 @@ ulimit -c unlimited # Performance PR testing needs GH API key and PR metadata to comment results if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then - set +x sudo apt-get install -y jq export ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref) - - gsutil cp gs://grpc-testing-secrets/github_credentials/oauth_token.txt ~/ - # TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation - export JENKINS_OAUTH_TOKEN=$(cat ~/oauth_token.txt) - export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER - set -x fi sudo pip install tabulate diff --git a/tools/internal_ci/helper_scripts/prepare_build_macos_rc b/tools/internal_ci/helper_scripts/prepare_build_macos_rc index 7b9b02b6318..23619ecbb8b 100644 --- a/tools/internal_ci/helper_scripts/prepare_build_macos_rc +++ b/tools/internal_ci/helper_scripts/prepare_build_macos_rc @@ -25,16 +25,10 @@ export GOOGLE_APPLICATION_CREDENTIALS=${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db3 # If this is a PR using RUN_TESTS_FLAGS var, then add flags to filter tests if [ -n "$KOKORO_GITHUB_PULL_REQUEST_NUMBER" ]; then - set +x brew update brew install jq || brew upgrade jq ghprbTargetBranch=$(curl -s https://api.github.com/repos/grpc/grpc/pulls/$KOKORO_GITHUB_PULL_REQUEST_NUMBER | jq -r .base.ref) export RUN_TESTS_FLAGS="$RUN_TESTS_FLAGS --filter_pr_tests --base_branch origin/$ghprbTargetBranch" - - # TODO(matt-kwong): rename this to GITHUB_OAUTH_TOKEN after Jenkins deprecation - export JENKINS_OAUTH_TOKEN=$(cat ${KOKORO_GFILE_DIR}/oauth_token.txt) - export ghprbPullId=$KOKORO_GITHUB_PULL_REQUEST_NUMBER - set -x fi set +ex # rvm script is very verbose and exits with errorcode diff --git a/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg b/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg index 1c4f7b23109..dc35ce81ffd 100644 --- a/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg +++ b/tools/internal_ci/macos/pull_request/grpc_ios_binary_size.cfg @@ -17,7 +17,6 @@ # Location of the continuous shell script in repository. build_file: "grpc/tools/internal_ci/macos/grpc_ios_binary_size.sh" timeout_mins: 60 -gfile_resources: "/bigstore/grpc-testing-secrets/github_credentials/oauth_token.txt" before_action { fetch_keystore { keystore_resource { diff --git a/tools/run_tests/python_utils/comment_on_pr.py b/tools/run_tests/python_utils/comment_on_pr.py deleted file mode 100644 index 399c996d4db..00000000000 --- a/tools/run_tests/python_utils/comment_on_pr.py +++ /dev/null @@ -1,37 +0,0 @@ -# Copyright 2017 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import json -import urllib2 - - -def comment_on_pr(text): - if 'JENKINS_OAUTH_TOKEN' not in os.environ: - print 'Missing JENKINS_OAUTH_TOKEN env var: not commenting' - return - if 'ghprbPullId' not in os.environ: - print 'Missing ghprbPullId env var: not commenting' - return - req = urllib2.Request( - url='https://api.github.com/repos/grpc/grpc/issues/%s/comments' % - os.environ['ghprbPullId'], - data=json.dumps({ - 'body': text - }), - headers={ - 'Authorization': 'token %s' % os.environ['JENKINS_OAUTH_TOKEN'], - 'Content-Type': 'application/json', - }) - print urllib2.urlopen(req).read() From 44402ad0a1b36770f3546393d346f8c0419e46cf Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 17 Jan 2019 23:21:08 -0800 Subject: [PATCH 15/16] Make executor look more like the rest of the codebase (namespace, etc) --- .../chttp2/transport/chttp2_transport.cc | 6 +- src/core/lib/iomgr/combiner.cc | 7 +- src/core/lib/iomgr/executor.cc | 201 ++++++++++-------- src/core/lib/iomgr/executor.h | 101 ++++----- src/core/lib/iomgr/fork_posix.cc | 6 +- src/core/lib/iomgr/iomgr.cc | 4 +- src/core/lib/iomgr/iomgr_custom.cc | 2 +- src/core/lib/iomgr/resolve_address_posix.cc | 5 +- src/core/lib/iomgr/resolve_address_windows.cc | 3 +- src/core/lib/iomgr/tcp_posix.cc | 8 +- src/core/lib/iomgr/udp_server.cc | 10 +- src/core/lib/surface/init.cc | 2 +- src/core/lib/surface/server.cc | 5 +- src/core/lib/transport/transport.cc | 2 +- test/core/end2end/fuzzers/api_fuzzer.cc | 2 +- test/core/end2end/fuzzers/client_fuzzer.cc | 2 +- test/core/end2end/fuzzers/server_fuzzer.cc | 2 +- test/core/iomgr/resolve_address_test.cc | 2 +- 18 files changed, 204 insertions(+), 166 deletions(-) diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7f4627fa773..fe88d4818e4 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -968,19 +968,19 @@ static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t, get better latency overall if we switch writing work elsewhere and continue with application work above */ if (!t->is_first_write_in_batch) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } /* equivalently, if it's a partial write, we *know* we're going to be taking a thread jump to write it because of the above, may as well do so immediately */ if (partial_write) { - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } switch (t->opt_target) { case GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT: /* executor gives us the largest probability of being able to batch a * write with others on this transport */ - return grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + return grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); case GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY: return grpc_schedule_on_exec_ctx; } diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 402f8904eae..4fc4a9dccf4 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -83,8 +83,9 @@ grpc_combiner* grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); + GRPC_CLOSURE_INIT( + &lock->offload, offload, lock, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_INFO, "C:%p create", lock)); return lock; } @@ -235,7 +236,7 @@ bool grpc_combiner_continue_exec_ctx() { // 3. the DEFAULT executor is threaded // 4. the current thread is not a worker for any background poller if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish() && - grpc_executor_is_threaded() && + grpc_core::Executor::IsThreadedDefault() && !grpc_iomgr_is_any_background_poller_thread()) { GPR_TIMER_MARK("offload_from_finished_exec_ctx", 0); // this execution context wants to move on: schedule remaining work to be diff --git a/src/core/lib/iomgr/executor.cc b/src/core/lib/iomgr/executor.cc index 45d96b80eb4..2703e1a0b77 100644 --- a/src/core/lib/iomgr/executor.cc +++ b/src/core/lib/iomgr/executor.cc @@ -45,20 +45,70 @@ gpr_log(GPR_INFO, "EXECUTOR " str); \ } -grpc_core::TraceFlag executor_trace(false, "executor"); +namespace grpc_core { +namespace { GPR_TLS_DECL(g_this_thread_state); -GrpcExecutor::GrpcExecutor(const char* name) : name_(name) { +Executor* executors[static_cast(ExecutorType::NUM_EXECUTORS)]; + +void default_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, true /* is_short */); +} + +void default_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::DEFAULT)]->Enqueue( + closure, error, false /* is_short */); +} + +void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, true /* is_short */); +} + +void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { + executors[static_cast(ExecutorType::RESOLVER)]->Enqueue( + closure, error, false /* is_short */); +} + +const grpc_closure_scheduler_vtable + vtables_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&default_enqueue_short, &default_enqueue_short, + "def-ex-short"}, + {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, + {{&resolver_enqueue_short, &resolver_enqueue_short, + "res-ex-short"}, + {&resolver_enqueue_long, &resolver_enqueue_long, + "res-ex-long"}}}; + +grpc_closure_scheduler + schedulers_[static_cast(ExecutorType::NUM_EXECUTORS)] + [static_cast(ExecutorJobType::NUM_JOB_TYPES)] = { + {{&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::DEFAULT)] + [static_cast(ExecutorJobType::LONG)]}}, + {{&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::SHORT)]}, + {&vtables_[static_cast(ExecutorType::RESOLVER)] + [static_cast(ExecutorJobType::LONG)]}}}; + +} // namespace + +TraceFlag executor_trace(false, "executor"); + +Executor::Executor(const char* name) : name_(name) { adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER; gpr_atm_rel_store(&num_threads_, 0); max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores()); } -void GrpcExecutor::Init() { SetThreading(true); } +void Executor::Init() { SetThreading(true); } -size_t GrpcExecutor::RunClosures(const char* executor_name, - grpc_closure_list list) { +size_t Executor::RunClosures(const char* executor_name, + grpc_closure_list list) { size_t n = 0; grpc_closure* c = list.head; @@ -82,11 +132,11 @@ size_t GrpcExecutor::RunClosures(const char* executor_name, return n; } -bool GrpcExecutor::IsThreaded() const { +bool Executor::IsThreaded() const { return gpr_atm_acq_load(&num_threads_) > 0; } -void GrpcExecutor::SetThreading(bool threading) { +void Executor::SetThreading(bool threading) { gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_); EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading); @@ -112,7 +162,7 @@ void GrpcExecutor::SetThreading(bool threading) { } thd_state_[0].thd = - grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]); + grpc_core::Thread(name_, &Executor::ThreadMain, &thd_state_[0]); thd_state_[0].thd.Start(); } else { // !threading if (curr_num_threads == 0) { @@ -153,9 +203,9 @@ void GrpcExecutor::SetThreading(bool threading) { EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading); } -void GrpcExecutor::Shutdown() { SetThreading(false); } +void Executor::Shutdown() { SetThreading(false); } -void GrpcExecutor::ThreadMain(void* arg) { +void Executor::ThreadMain(void* arg) { ThreadState* ts = static_cast(arg); gpr_tls_set(&g_this_thread_state, reinterpret_cast(ts)); @@ -192,8 +242,8 @@ void GrpcExecutor::ThreadMain(void* arg) { } } -void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, - bool is_short) { +void Executor::Enqueue(grpc_closure* closure, grpc_error* error, + bool is_short) { bool retry_push; if (is_short) { GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(); @@ -304,7 +354,7 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, gpr_atm_rel_store(&num_threads_, cur_thread_count + 1); thd_state_[cur_thread_count].thd = grpc_core::Thread( - name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]); + name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]); thd_state_[cur_thread_count].thd.Start(); } gpr_spinlock_unlock(&adding_thread_lock_); @@ -316,85 +366,52 @@ void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error, } while (retry_push); } -static GrpcExecutor* executors[GRPC_NUM_EXECUTORS]; - -void default_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void default_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - true /* is_short */); -} - -void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) { - executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error, - false /* is_short */); -} - -static const grpc_closure_scheduler_vtable - vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"}, - {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}}, - {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"}, - {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}}; - -static grpc_closure_scheduler - schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = { - {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}}, - {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]}, - {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}}; - -// grpc_executor_init() and grpc_executor_shutdown() functions are called in the +// Executor::InitAll() and Executor::ShutdownAll() functions are called in the // the grpc_init() and grpc_shutdown() code paths which are protected by a // global mutex. So it is okay to assume that these functions are thread-safe -void grpc_executor_init() { - EXECUTOR_TRACE0("grpc_executor_init() enter"); +void Executor::InitAll() { + EXECUTOR_TRACE0("Executor::InitAll() enter"); - // Return if grpc_executor_init() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr); + // Return if Executor::InitAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] != nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] != + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR] = - grpc_core::New("default-executor"); - executors[GRPC_RESOLVER_EXECUTOR] = - grpc_core::New("resolver-executor"); + executors[static_cast(ExecutorType::DEFAULT)] = + grpc_core::New("default-executor"); + executors[static_cast(ExecutorType::RESOLVER)] = + grpc_core::New("resolver-executor"); - executors[GRPC_DEFAULT_EXECUTOR]->Init(); - executors[GRPC_RESOLVER_EXECUTOR]->Init(); + executors[static_cast(ExecutorType::DEFAULT)]->Init(); + executors[static_cast(ExecutorType::RESOLVER)]->Init(); - EXECUTOR_TRACE0("grpc_executor_init() done"); + EXECUTOR_TRACE0("Executor::InitAll() done"); } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type) { - return &schedulers_[executor_type][job_type]; +grpc_closure_scheduler* Executor::Scheduler(ExecutorType executor_type, + ExecutorJobType job_type) { + return &schedulers_[static_cast(executor_type)] + [static_cast(job_type)]; } -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) { - return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type); +grpc_closure_scheduler* Executor::Scheduler(ExecutorJobType job_type) { + return Executor::Scheduler(ExecutorType::DEFAULT, job_type); } -void grpc_executor_shutdown() { - EXECUTOR_TRACE0("grpc_executor_shutdown() enter"); +void Executor::ShutdownAll() { + EXECUTOR_TRACE0("Executor::ShutdownAll() enter"); - // Return if grpc_executor_shutdown() is already called earlier - if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) { - GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr); + // Return if Executor:SshutdownAll() is already called earlier + if (executors[static_cast(ExecutorType::DEFAULT)] == nullptr) { + GPR_ASSERT(executors[static_cast(ExecutorType::RESOLVER)] == + nullptr); return; } - executors[GRPC_DEFAULT_EXECUTOR]->Shutdown(); - executors[GRPC_RESOLVER_EXECUTOR]->Shutdown(); + executors[static_cast(ExecutorType::DEFAULT)]->Shutdown(); + executors[static_cast(ExecutorType::RESOLVER)]->Shutdown(); // Delete the executor objects. // @@ -408,26 +425,36 @@ void grpc_executor_shutdown() { // By ensuring that all executors are shutdown first, we are also ensuring // that no thread is active across all executors. - grpc_core::Delete(executors[GRPC_DEFAULT_EXECUTOR]); - grpc_core::Delete(executors[GRPC_RESOLVER_EXECUTOR]); - executors[GRPC_DEFAULT_EXECUTOR] = nullptr; - executors[GRPC_RESOLVER_EXECUTOR] = nullptr; + grpc_core::Delete( + executors[static_cast(ExecutorType::DEFAULT)]); + grpc_core::Delete( + executors[static_cast(ExecutorType::RESOLVER)]); + executors[static_cast(ExecutorType::DEFAULT)] = nullptr; + executors[static_cast(ExecutorType::RESOLVER)] = nullptr; - EXECUTOR_TRACE0("grpc_executor_shutdown() done"); + EXECUTOR_TRACE0("Executor::ShutdownAll() done"); } -bool grpc_executor_is_threaded(GrpcExecutorType executor_type) { - GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS); - return executors[executor_type]->IsThreaded(); +bool Executor::IsThreaded(ExecutorType executor_type) { + GPR_ASSERT(executor_type < ExecutorType::NUM_EXECUTORS); + return executors[static_cast(executor_type)]->IsThreaded(); } -bool grpc_executor_is_threaded() { - return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR); +bool Executor::IsThreadedDefault() { + return Executor::IsThreaded(ExecutorType::DEFAULT); } -void grpc_executor_set_threading(bool enable) { - EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable); - for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) { +void Executor::SetThreadingAll(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingAll(%d) called", enable); + for (size_t i = 0; i < static_cast(ExecutorType::NUM_EXECUTORS); + i++) { executors[i]->SetThreading(enable); } } + +void Executor::SetThreadingDefault(bool enable) { + EXECUTOR_TRACE("Executor::SetThreadingDefault(%d) called", enable); + executors[static_cast(ExecutorType::DEFAULT)]->SetThreading(enable); +} + +} // namespace grpc_core diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index 8829138c5fa..9e472279b7b 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -25,7 +25,9 @@ #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/iomgr/closure.h" -typedef struct { +namespace grpc_core { + +struct ThreadState { gpr_mu mu; size_t id; // For debugging purposes const char* name; // Thread state name @@ -35,17 +37,24 @@ typedef struct { bool shutdown; bool queued_long_job; grpc_core::Thread thd; -} ThreadState; +}; -typedef enum { - GRPC_EXECUTOR_SHORT = 0, - GRPC_EXECUTOR_LONG, - GRPC_NUM_EXECUTOR_JOB_TYPES // Add new values above this -} GrpcExecutorJobType; +enum class ExecutorType { + DEFAULT = 0, + RESOLVER, + + NUM_EXECUTORS // Add new values above this +}; -class GrpcExecutor { +enum class ExecutorJobType { + SHORT = 0, + LONG, + NUM_JOB_TYPES // Add new values above this +}; + +class Executor { public: - GrpcExecutor(const char* executor_name); + Executor(const char* executor_name); void Init(); @@ -62,55 +71,51 @@ class GrpcExecutor { * a short job (i.e expected to not block and complete quickly) */ void Enqueue(grpc_closure* closure, grpc_error* error, bool is_short); - private: - static size_t RunClosures(const char* executor_name, grpc_closure_list list); - static void ThreadMain(void* arg); + // TODO(sreek): Currently we have two executors (available globally): The + // default executor and the resolver executor. + // + // Some of the functions below operate on the DEFAULT executor only while some + // operate of ALL the executors. This is a bit confusing and should be cleaned + // up in future (where we make all the following functions take ExecutorType + // and/or JobType) - const char* name_; - ThreadState* thd_state_; - size_t max_threads_; - gpr_atm num_threads_; - gpr_spinlock adding_thread_lock_; -}; - -// == Global executor functions == + // Initialize ALL the executors + static void InitAll(); -typedef enum { - GRPC_DEFAULT_EXECUTOR = 0, - GRPC_RESOLVER_EXECUTOR, + // Shutdown ALL the executors + static void ShutdownAll(); - GRPC_NUM_EXECUTORS // Add new values above this -} GrpcExecutorType; + // Set the threading mode for ALL the executors + static void SetThreadingAll(bool enable); -// TODO(sreek): Currently we have two executors (available globally): The -// default executor and the resolver executor. -// -// Some of the functions below operate on the DEFAULT executor only while some -// operate of ALL the executors. This is a bit confusing and should be cleaned -// up in future (where we make all the following functions take executor_type -// and/or job_type) + // Set the threading mode for ALL the executors + static void SetThreadingDefault(bool enable); -// Initialize ALL the executors -void grpc_executor_init(); + // Get the DEFAULT executor scheduler for the given job_type + static grpc_closure_scheduler* Scheduler(ExecutorJobType job_type); -// Shutdown ALL the executors -void grpc_executor_shutdown(); + // Get the executor scheduler for a given executor_type and a job_type + static grpc_closure_scheduler* Scheduler(ExecutorType executor_type, + ExecutorJobType job_type); -// Set the threading mode for ALL the executors -void grpc_executor_set_threading(bool enable); + // Return if a given executor is running in threaded mode (i.e if + // SetThreading(true) was called previously on that executor) + static bool IsThreaded(ExecutorType executor_type); -// Get the DEFAULT executor scheduler for the given job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type); + // Return if the DEFAULT executor is threaded + static bool IsThreadedDefault(); -// Get the executor scheduler for a given executor_type and a job_type -grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type, - GrpcExecutorJobType job_type); + private: + static size_t RunClosures(const char* executor_name, grpc_closure_list list); + static void ThreadMain(void* arg); -// Return if a given executor is running in threaded mode (i.e if -// grpc_executor_set_threading(true) was called previously on that executor) -bool grpc_executor_is_threaded(GrpcExecutorType executor_type); + const char* name_; + ThreadState* thd_state_; + size_t max_threads_; + gpr_atm num_threads_; + gpr_spinlock adding_thread_lock_; +}; -// Return if the DEFAULT executor is threaded -bool grpc_executor_is_threaded(); +} // namespace grpc_core #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */ diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc index 05ecd2a49b7..2eebe3f26f6 100644 --- a/src/core/lib/iomgr/fork_posix.cc +++ b/src/core/lib/iomgr/fork_posix.cc @@ -71,7 +71,7 @@ void grpc_prefork() { return; } grpc_timer_manager_set_threading(false); - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_core::ExecCtx::Get()->Flush(); grpc_core::Fork::AwaitThreads(); skipped_handler = false; @@ -82,7 +82,7 @@ void grpc_postfork_parent() { grpc_core::Fork::AllowExecCtx(); grpc_core::ExecCtx exec_ctx; grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } @@ -96,7 +96,7 @@ void grpc_postfork_child() { reset_polling_engine(); } grpc_timer_manager_set_threading(true); - grpc_executor_set_threading(true); + grpc_core::Executor::SetThreadingAll(true); } } diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc index dcc69332e0b..33153d9cc3b 100644 --- a/src/core/lib/iomgr/iomgr.cc +++ b/src/core/lib/iomgr/iomgr.cc @@ -52,7 +52,7 @@ void grpc_iomgr_init() { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); - grpc_executor_init(); + grpc_core::Executor::InitAll(); grpc_timer_list_init(); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = (char*)"root"; @@ -88,7 +88,7 @@ void grpc_iomgr_shutdown() { { grpc_timer_manager_shutdown(); grpc_iomgr_platform_flush(); - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); gpr_mu_lock(&g_mu); g_shutdown = 1; diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc index e1cd8f73104..3d07f1abe9a 100644 --- a/src/core/lib/iomgr/iomgr_custom.cc +++ b/src/core/lib/iomgr/iomgr_custom.cc @@ -34,7 +34,7 @@ gpr_thd_id g_init_thread; static void iomgr_platform_init(void) { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); g_init_thread = gpr_thd_currentid(); grpc_pollset_global_init(); } diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index 2a03244ff7d..e6dd8f1ceab 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -150,7 +150,7 @@ typedef struct { void* arg; } request; -/* Callback to be passed to grpc_executor to asynch-ify +/* Callback to be passed to grpc Executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(void* rp, grpc_error* error) { request* r = static_cast(rp); @@ -168,7 +168,8 @@ static void posix_resolve_address(const char* name, const char* default_port, request* r = static_cast(gpr_malloc(sizeof(request))); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc index 3e977dca2da..64351c38a8f 100644 --- a/src/core/lib/iomgr/resolve_address_windows.cc +++ b/src/core/lib/iomgr/resolve_address_windows.cc @@ -153,7 +153,8 @@ static void windows_resolve_address(const char* name, const char* default_port, request* r = (request*)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT( &r->request_closure, do_request_thread, r, - grpc_executor_scheduler(GRPC_RESOLVER_EXECUTOR, GRPC_EXECUTOR_SHORT)); + grpc_core::Executor::Scheduler(grpc_core::ExecutorType::RESOLVER, + grpc_core::ExecutorJobType::SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index d0642c015ff..92f163b58e9 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -227,10 +227,10 @@ static void cover_self(grpc_tcp* tcp) { } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); - GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_core::Executor::Scheduler( + grpc_core::ExecutorJobType::LONG)), + GRPC_ERROR_NONE); } else { while ((p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller)) == nullptr) { diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 3dd7cab855c..5f8865ca57f 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -481,8 +481,9 @@ void GrpcUdpListener::OnRead(grpc_error* error, void* do_read_arg) { if (udp_handler_->Read()) { /* There maybe more packets to read. Schedule read_more_cb_ closure to run * after finishing this event loop. */ - GRPC_CLOSURE_INIT(&do_read_closure_, do_read, do_read_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_read_closure_, do_read, do_read_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_read_closure_, GRPC_ERROR_NONE); } else { /* Finish reading all the packets, re-arm the notification event so we can @@ -542,8 +543,9 @@ void GrpcUdpListener::OnCanWrite(grpc_error* error, void* do_write_arg) { } /* Schedule actual write in another thread. */ - GRPC_CLOSURE_INIT(&do_write_closure_, do_write, do_write_arg, - grpc_executor_scheduler(GRPC_EXECUTOR_LONG)); + GRPC_CLOSURE_INIT( + &do_write_closure_, do_write, do_write_arg, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::LONG)); GRPC_CLOSURE_SCHED(&do_write_closure_, GRPC_ERROR_NONE); } diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 67cf5d89bff..60f506ef5e2 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -165,7 +165,7 @@ void grpc_shutdown(void) { { grpc_timer_manager_set_threading( false); // shutdown timer_manager thread - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); for (i = g_number_of_plugins; i >= 0; i--) { if (g_all_of_the_plugins[i].destroy != nullptr) { g_all_of_the_plugins[i].destroy(); diff --git a/src/core/lib/surface/server.cc b/src/core/lib/surface/server.cc index 7ae6e51a5fb..cdfd3336437 100644 --- a/src/core/lib/surface/server.cc +++ b/src/core/lib/surface/server.cc @@ -1134,8 +1134,9 @@ void grpc_server_start(grpc_server* server) { server_ref(server); server->starting = true; GRPC_CLOSURE_SCHED( - GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_CLOSURE_CREATE( + start_listeners, server, + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)), GRPC_ERROR_NONE); } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index b32f9c6ec1a..43add28ce03 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -73,7 +73,7 @@ void grpc_stream_unref(grpc_stream_refcount* refcount) { Throw this over to the executor (on a core-owned thread) and process it there. */ refcount->destroy.scheduler = - grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); + grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT); } GRPC_CLOSURE_SCHED(&refcount->destroy, GRPC_ERROR_NONE); } diff --git a/test/core/end2end/fuzzers/api_fuzzer.cc b/test/core/end2end/fuzzers/api_fuzzer.cc index a0b82904753..57bc8ad768c 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.cc +++ b/test/core/end2end/fuzzers/api_fuzzer.cc @@ -706,7 +706,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_timer_manager_set_threading(false); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); } grpc_set_resolver_impl(&fuzzer_resolver); grpc_dns_lookup_ares_locked = my_dns_lookup_ares_locked; diff --git a/test/core/end2end/fuzzers/client_fuzzer.cc b/test/core/end2end/fuzzers/client_fuzzer.cc index e21006bb673..8520fb53755 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.cc +++ b/test/core/end2end/fuzzers/client_fuzzer.cc @@ -46,7 +46,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("client_fuzzer"); diff --git a/test/core/end2end/fuzzers/server_fuzzer.cc b/test/core/end2end/fuzzers/server_fuzzer.cc index d370dc7de85..644f98e37ac 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.cc +++ b/test/core/end2end/fuzzers/server_fuzzer.cc @@ -43,7 +43,7 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) { grpc_init(); { grpc_core::ExecCtx exec_ctx; - grpc_executor_set_threading(false); + grpc_core::Executor::SetThreadingAll(false); grpc_resource_quota* resource_quota = grpc_resource_quota_create("server_fuzzer"); diff --git a/test/core/iomgr/resolve_address_test.cc b/test/core/iomgr/resolve_address_test.cc index 1d9e1ee27e2..0ae0ec888b6 100644 --- a/test/core/iomgr/resolve_address_test.cc +++ b/test/core/iomgr/resolve_address_test.cc @@ -290,7 +290,7 @@ int main(int argc, char** argv) { test_invalid_ip_addresses(); test_unparseable_hostports(); } - grpc_executor_shutdown(); + grpc_core::Executor::ShutdownAll(); } gpr_cmdline_destroy(cl); From dba6fdce914152bd9d5d25b32d8417bcc149da89 Mon Sep 17 00:00:00 2001 From: Sanjay Pujare Date: Fri, 18 Jan 2019 15:35:40 -0800 Subject: [PATCH 16/16] update interop client matrix to add 1.18 for core langs --- tools/interop_matrix/client_matrix.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py index 9b533867836..07f144d8250 100644 --- a/tools/interop_matrix/client_matrix.py +++ b/tools/interop_matrix/client_matrix.py @@ -98,6 +98,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'go': OrderedDict([ @@ -161,6 +162,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'node': OrderedDict([ @@ -225,6 +227,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), 'csharp': OrderedDict([ @@ -249,6 +252,7 @@ LANG_RELEASE_MATRIX = { ('v1.15.0', ReleaseInfo()), ('v1.16.0', ReleaseInfo()), ('v1.17.1', ReleaseInfo()), + ('v1.18.0', ReleaseInfo()), ]), }