mirror of https://github.com/grpc/grpc.git
commit
fe1f4aeb23
69 changed files with 2311 additions and 1345 deletions
@ -0,0 +1,46 @@ |
||||
/*
|
||||
* |
||||
* Copyright 2016, Google Inc. |
||||
* All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are |
||||
* met: |
||||
* |
||||
* * Redistributions of source code must retain the above copyright |
||||
* notice, this list of conditions and the following disclaimer. |
||||
* * Redistributions in binary form must reproduce the above |
||||
* copyright notice, this list of conditions and the following disclaimer |
||||
* in the documentation and/or other materials provided with the |
||||
* distribution. |
||||
* * Neither the name of Google Inc. nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* |
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
* |
||||
*/ |
||||
|
||||
#include <grpc/grpc.h> |
||||
|
||||
extern void grpc_chttp2_plugin_init(void); |
||||
extern void grpc_chttp2_plugin_shutdown(void); |
||||
extern void grpc_client_config_init(void); |
||||
extern void grpc_client_config_shutdown(void); |
||||
|
||||
void grpc_register_built_in_plugins(void) { |
||||
grpc_register_plugin(grpc_chttp2_plugin_init, |
||||
grpc_chttp2_plugin_shutdown); |
||||
grpc_register_plugin(grpc_client_config_init, |
||||
grpc_client_config_shutdown); |
||||
} |
@ -0,0 +1,90 @@ |
||||
#region Copyright notice and license |
||||
|
||||
// Copyright 2015, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
#endregion |
||||
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Reflection; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using Grpc.Core; |
||||
using Grpc.Core.Internal; |
||||
using Grpc.Core.Utils; |
||||
using NUnit.Framework; |
||||
|
||||
namespace Grpc.Core.Tests |
||||
{ |
||||
public class AppDomainUnloadTest |
||||
{ |
||||
[Test] |
||||
public void AppDomainUnloadHookCanCleanupAbandonedCall() |
||||
{ |
||||
var setup = new AppDomainSetup |
||||
{ |
||||
ApplicationBase = AppDomain.CurrentDomain.BaseDirectory |
||||
}; |
||||
var childDomain = AppDomain.CreateDomain("test", null, setup); |
||||
var remoteObj = childDomain.CreateInstance(typeof(AppDomainTestClass).Assembly.GetName().Name, typeof(AppDomainTestClass).FullName); |
||||
|
||||
// Try to unload the appdomain once we've created a server and a channel inside the appdomain. |
||||
AppDomain.Unload(childDomain); |
||||
} |
||||
|
||||
public class AppDomainTestClass |
||||
{ |
||||
const string Host = "127.0.0.1"; |
||||
|
||||
/// <summary> |
||||
/// Creates a server and a channel and initiates a call. The code is invoked from inside of an AppDomain |
||||
/// to test if AppDomain.Unload() work if Grpc is being used. |
||||
/// </summary> |
||||
public AppDomainTestClass() |
||||
{ |
||||
var helper = new MockServiceHelper(Host); |
||||
var server = helper.GetServer(); |
||||
server.Start(); |
||||
var channel = helper.GetChannel(); |
||||
|
||||
var readyToShutdown = new TaskCompletionSource<object>(); |
||||
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => |
||||
{ |
||||
readyToShutdown.SetResult(null); |
||||
await requestStream.ToListAsync(); |
||||
}); |
||||
|
||||
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); |
||||
readyToShutdown.Task.Wait(); // make sure handler is running |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,57 @@ |
||||
#region Copyright notice and license |
||||
|
||||
// Copyright 2015, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
#endregion |
||||
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using Grpc.Core; |
||||
using Grpc.Core.Internal; |
||||
using Grpc.Core.Utils; |
||||
using NUnit.Framework; |
||||
|
||||
namespace Grpc.Core.Tests |
||||
{ |
||||
public class ShutdownHookClientTest |
||||
{ |
||||
const string Host = "127.0.0.1"; |
||||
|
||||
[Test] |
||||
public void ProcessExitHookCanCleanupAbandonedChannels() |
||||
{ |
||||
var channel = new Channel(Host, 1000, ChannelCredentials.Insecure); |
||||
var channel2 = new Channel(Host, 1001, ChannelCredentials.Insecure); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,69 @@ |
||||
#region Copyright notice and license |
||||
|
||||
// Copyright 2015, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
#endregion |
||||
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using Grpc.Core; |
||||
using Grpc.Core.Internal; |
||||
using Grpc.Core.Utils; |
||||
using NUnit.Framework; |
||||
|
||||
namespace Grpc.Core.Tests |
||||
{ |
||||
public class ShutdownHookPendingCallTest |
||||
{ |
||||
const string Host = "127.0.0.1"; |
||||
|
||||
[Test] |
||||
public void ProcessExitHookCanCleanupAbandonedCall() |
||||
{ |
||||
var helper = new MockServiceHelper(Host); |
||||
var server = helper.GetServer(); |
||||
server.Start(); |
||||
var channel = helper.GetChannel(); |
||||
|
||||
var readyToShutdown = new TaskCompletionSource<object>(); |
||||
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => |
||||
{ |
||||
readyToShutdown.SetResult(null); |
||||
await requestStream.ToListAsync(); |
||||
}); |
||||
|
||||
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); |
||||
readyToShutdown.Task.Wait(); // make sure handler is running |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,58 @@ |
||||
#region Copyright notice and license |
||||
|
||||
// Copyright 2015, Google Inc. |
||||
// All rights reserved. |
||||
// |
||||
// Redistribution and use in source and binary forms, with or without |
||||
// modification, are permitted provided that the following conditions are |
||||
// met: |
||||
// |
||||
// * Redistributions of source code must retain the above copyright |
||||
// notice, this list of conditions and the following disclaimer. |
||||
// * Redistributions in binary form must reproduce the above |
||||
// copyright notice, this list of conditions and the following disclaimer |
||||
// in the documentation and/or other materials provided with the |
||||
// distribution. |
||||
// * Neither the name of Google Inc. nor the names of its |
||||
// contributors may be used to endorse or promote products derived from |
||||
// this software without specific prior written permission. |
||||
// |
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
#endregion |
||||
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using Grpc.Core; |
||||
using Grpc.Core.Internal; |
||||
using Grpc.Core.Utils; |
||||
using NUnit.Framework; |
||||
|
||||
namespace Grpc.Core.Tests |
||||
{ |
||||
public class ShutdownHookServerTest |
||||
{ |
||||
const string Host = "127.0.0.1"; |
||||
|
||||
[Test] |
||||
public void ProcessExitHookCanCleanupAbandonedServers() |
||||
{ |
||||
var helper = new MockServiceHelper(Host); |
||||
var server = helper.GetServer(); |
||||
server.Start(); |
||||
} |
||||
} |
||||
} |
@ -1,48 +0,0 @@ |
||||
# Copyright 2015, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
import collections |
||||
|
||||
from grpc.beta import interfaces |
||||
|
||||
class AuthMetadataContext(collections.namedtuple( |
||||
'AuthMetadataContext', [ |
||||
'service_url', |
||||
'method_name' |
||||
]), interfaces.GRPCAuthMetadataContext): |
||||
pass |
||||
|
||||
|
||||
class AuthMetadataPluginCallback(interfaces.GRPCAuthMetadataContext): |
||||
|
||||
def __init__(self, callback): |
||||
self._callback = callback |
||||
|
||||
def __call__(self, metadata, error): |
||||
self._callback(metadata, error) |
@ -0,0 +1,566 @@ |
||||
# Copyright 2016, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
"""Translates gRPC's client-side API into gRPC's client-side Beta API.""" |
||||
|
||||
import grpc |
||||
from grpc._cython import cygrpc |
||||
from grpc.beta import interfaces |
||||
from grpc.framework.common import cardinality |
||||
from grpc.framework.foundation import future |
||||
from grpc.framework.interfaces.face import face |
||||
|
||||
_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = { |
||||
grpc.StatusCode.CANCELLED: ( |
||||
face.Abortion.Kind.CANCELLED, face.CancellationError), |
||||
grpc.StatusCode.UNKNOWN: ( |
||||
face.Abortion.Kind.REMOTE_FAILURE, face.RemoteError), |
||||
grpc.StatusCode.DEADLINE_EXCEEDED: ( |
||||
face.Abortion.Kind.EXPIRED, face.ExpirationError), |
||||
grpc.StatusCode.UNIMPLEMENTED: ( |
||||
face.Abortion.Kind.LOCAL_FAILURE, face.LocalError), |
||||
} |
||||
|
||||
|
||||
def _fully_qualified_method(group, method): |
||||
return b'/{}/{}'.format(group, method) |
||||
|
||||
|
||||
def _effective_metadata(metadata, metadata_transformer): |
||||
non_none_metadata = () if metadata is None else metadata |
||||
if metadata_transformer is None: |
||||
return non_none_metadata |
||||
else: |
||||
return metadata_transformer(non_none_metadata) |
||||
|
||||
|
||||
def _credentials(grpc_call_options): |
||||
return None if grpc_call_options is None else grpc_call_options.credentials |
||||
|
||||
|
||||
def _abortion(rpc_error_call): |
||||
code = rpc_error_call.code() |
||||
pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code) |
||||
error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0] |
||||
return face.Abortion( |
||||
error_kind, rpc_error_call.initial_metadata(), |
||||
rpc_error_call.trailing_metadata(), code, rpc_error_code.details()) |
||||
|
||||
|
||||
def _abortion_error(rpc_error_call): |
||||
code = rpc_error_call.code() |
||||
pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code) |
||||
exception_class = face.AbortionError if pair is None else pair[1] |
||||
return exception_class( |
||||
rpc_error_call.initial_metadata(), rpc_error_call.trailing_metadata(), |
||||
code, rpc_error_call.details()) |
||||
|
||||
|
||||
class _InvocationProtocolContext(interfaces.GRPCInvocationContext): |
||||
|
||||
def disable_next_request_compression(self): |
||||
pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement. |
||||
|
||||
|
||||
class _Rendezvous(future.Future, face.Call): |
||||
|
||||
def __init__(self, response_future, response_iterator, call): |
||||
self._future = response_future |
||||
self._iterator = response_iterator |
||||
self._call = call |
||||
|
||||
def cancel(self): |
||||
return self._call.cancel() |
||||
|
||||
def cancelled(self): |
||||
return self._future.cancelled() |
||||
|
||||
def running(self): |
||||
return self._future.running() |
||||
|
||||
def done(self): |
||||
return self._future.done() |
||||
|
||||
def result(self, timeout=None): |
||||
try: |
||||
return self._future.result(timeout=timeout) |
||||
except grpc.RpcError as rpc_error_call: |
||||
raise _abortion_error(rpc_error_call) |
||||
except grpc.FutureTimeoutError: |
||||
raise future.TimeoutError() |
||||
except grpc.FutureCancelledError: |
||||
raise future.CancelledError() |
||||
|
||||
def exception(self, timeout=None): |
||||
try: |
||||
rpc_error_call = self._future.exception(timeout=timeout) |
||||
return _abortion_error(rpc_error_call) |
||||
except grpc.FutureTimeoutError: |
||||
raise future.TimeoutError() |
||||
except grpc.FutureCancelledError: |
||||
raise future.CancelledError() |
||||
|
||||
def traceback(self, timeout=None): |
||||
try: |
||||
return self._future.traceback(timeout=timeout) |
||||
except grpc.FutureTimeoutError: |
||||
raise future.TimeoutError() |
||||
except grpc.FutureCancelledError: |
||||
raise future.CancelledError() |
||||
|
||||
def add_done_callback(self, fn): |
||||
self._future.add_done_callback(lambda ignored_callback: fn(self)) |
||||
|
||||
def __iter__(self): |
||||
return self |
||||
|
||||
def _next(self): |
||||
try: |
||||
return next(self._iterator) |
||||
except grpc.RpcError as rpc_error_call: |
||||
raise _abortion_error(rpc_error_call) |
||||
|
||||
def __next__(self): |
||||
return self._next() |
||||
|
||||
def next(self): |
||||
return self._next() |
||||
|
||||
def is_active(self): |
||||
return self._call.is_active() |
||||
|
||||
def time_remaining(self): |
||||
return self._call.time_remaining() |
||||
|
||||
def add_abortion_callback(self, abortion_callback): |
||||
registered = self._call.add_callback( |
||||
lambda: abortion_callback(_abortion(self._call))) |
||||
return None if registered else _abortion(self._call) |
||||
|
||||
def protocol_context(self): |
||||
return _InvocationProtocolContext() |
||||
|
||||
def initial_metadata(self): |
||||
return self._call.initial_metadata() |
||||
|
||||
def terminal_metadata(self): |
||||
return self._call.terminal_metadata() |
||||
|
||||
def code(self): |
||||
return self._call.code() |
||||
|
||||
def details(self): |
||||
return self._call.details() |
||||
|
||||
|
||||
def _blocking_unary_unary( |
||||
channel, group, method, timeout, with_call, protocol_options, metadata, |
||||
metadata_transformer, request, request_serializer, response_deserializer): |
||||
try: |
||||
multi_callable = channel.unary_unary( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
if with_call: |
||||
response, call = multi_callable( |
||||
request, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options), with_call=True) |
||||
return response, _Rendezvous(None, None, call) |
||||
else: |
||||
return multi_callable( |
||||
request, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
except grpc.RpcError as rpc_error_call: |
||||
raise _abortion_error(rpc_error_call) |
||||
|
||||
|
||||
def _future_unary_unary( |
||||
channel, group, method, timeout, protocol_options, metadata, |
||||
metadata_transformer, request, request_serializer, response_deserializer): |
||||
multi_callable = channel.unary_unary( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
response_future = multi_callable.future( |
||||
request, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
return _Rendezvous(response_future, None, response_future) |
||||
|
||||
|
||||
def _unary_stream( |
||||
channel, group, method, timeout, protocol_options, metadata, |
||||
metadata_transformer, request, request_serializer, response_deserializer): |
||||
multi_callable = channel.unary_stream( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
response_iterator = multi_callable( |
||||
request, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
return _Rendezvous(None, response_iterator, response_iterator) |
||||
|
||||
|
||||
def _blocking_stream_unary( |
||||
channel, group, method, timeout, with_call, protocol_options, metadata, |
||||
metadata_transformer, request_iterator, request_serializer, |
||||
response_deserializer): |
||||
try: |
||||
multi_callable = channel.stream_unary( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
if with_call: |
||||
response, call = multi_callable( |
||||
request_iterator, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options), with_call=True) |
||||
return response, _Rendezvous(None, None, call) |
||||
else: |
||||
return multi_callable( |
||||
request_iterator, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
except grpc.RpcError as rpc_error_call: |
||||
raise _abortion_error(rpc_error_call) |
||||
|
||||
|
||||
def _future_stream_unary( |
||||
channel, group, method, timeout, protocol_options, metadata, |
||||
metadata_transformer, request_iterator, request_serializer, |
||||
response_deserializer): |
||||
multi_callable = channel.stream_unary( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
response_future = multi_callable.future( |
||||
request_iterator, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
return _Rendezvous(response_future, None, response_future) |
||||
|
||||
|
||||
def _stream_stream( |
||||
channel, group, method, timeout, protocol_options, metadata, |
||||
metadata_transformer, request_iterator, request_serializer, |
||||
response_deserializer): |
||||
multi_callable = channel.stream_stream( |
||||
_fully_qualified_method(group, method), |
||||
request_serializer=request_serializer, |
||||
response_deserializer=response_deserializer) |
||||
effective_metadata = _effective_metadata(metadata, metadata_transformer) |
||||
response_iterator = multi_callable( |
||||
request_iterator, timeout=timeout, metadata=effective_metadata, |
||||
credentials=_credentials(protocol_options)) |
||||
return _Rendezvous(None, response_iterator, response_iterator) |
||||
|
||||
|
||||
class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable): |
||||
|
||||
def __init__( |
||||
self, channel, group, method, metadata_transformer, request_serializer, |
||||
response_deserializer): |
||||
self._channel = channel |
||||
self._group = group |
||||
self._method = method |
||||
self._metadata_transformer = metadata_transformer |
||||
self._request_serializer = request_serializer |
||||
self._response_deserializer = response_deserializer |
||||
|
||||
def __call__( |
||||
self, request, timeout, metadata=None, with_call=False, |
||||
protocol_options=None): |
||||
return _blocking_unary_unary( |
||||
self._channel, self._group, self._method, timeout, with_call, |
||||
protocol_options, metadata, self._metadata_transformer, request, |
||||
self._request_serializer, self._response_deserializer) |
||||
|
||||
def future(self, request, timeout, metadata=None, protocol_options=None): |
||||
return _future_unary_unary( |
||||
self._channel, self._group, self._method, timeout, protocol_options, |
||||
metadata, self._metadata_transformer, request, self._request_serializer, |
||||
self._response_deserializer) |
||||
|
||||
def event( |
||||
self, request, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable): |
||||
|
||||
def __init__( |
||||
self, channel, group, method, metadata_transformer, request_serializer, |
||||
response_deserializer): |
||||
self._channel = channel |
||||
self._group = group |
||||
self._method = method |
||||
self._metadata_transformer = metadata_transformer |
||||
self._request_serializer = request_serializer |
||||
self._response_deserializer = response_deserializer |
||||
|
||||
def __call__(self, request, timeout, metadata=None, protocol_options=None): |
||||
return _unary_stream( |
||||
self._channel, self._group, self._method, timeout, protocol_options, |
||||
metadata, self._metadata_transformer, request, self._request_serializer, |
||||
self._response_deserializer) |
||||
|
||||
def event( |
||||
self, request, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable): |
||||
|
||||
def __init__( |
||||
self, channel, group, method, metadata_transformer, request_serializer, |
||||
response_deserializer): |
||||
self._channel = channel |
||||
self._group = group |
||||
self._method = method |
||||
self._metadata_transformer = metadata_transformer |
||||
self._request_serializer = request_serializer |
||||
self._response_deserializer = response_deserializer |
||||
|
||||
def __call__( |
||||
self, request_iterator, timeout, metadata=None, with_call=False, |
||||
protocol_options=None): |
||||
return _blocking_stream_unary( |
||||
self._channel, self._group, self._method, timeout, with_call, |
||||
protocol_options, metadata, self._metadata_transformer, |
||||
request_iterator, self._request_serializer, self._response_deserializer) |
||||
|
||||
def future( |
||||
self, request_iterator, timeout, metadata=None, protocol_options=None): |
||||
return _future_stream_unary( |
||||
self._channel, self._group, self._method, timeout, protocol_options, |
||||
metadata, self._metadata_transformer, request_iterator, |
||||
self._request_serializer, self._response_deserializer) |
||||
|
||||
def event( |
||||
self, receiver, abortion_callback, timeout, metadata=None, |
||||
protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
class _StreamStreamMultiCallable(face.StreamStreamMultiCallable): |
||||
|
||||
def __init__( |
||||
self, channel, group, method, metadata_transformer, request_serializer, |
||||
response_deserializer): |
||||
self._channel = channel |
||||
self._group = group |
||||
self._method = method |
||||
self._metadata_transformer = metadata_transformer |
||||
self._request_serializer = request_serializer |
||||
self._response_deserializer = response_deserializer |
||||
|
||||
def __call__( |
||||
self, request_iterator, timeout, metadata=None, protocol_options=None): |
||||
return _stream_stream( |
||||
self._channel, self._group, self._method, timeout, protocol_options, |
||||
metadata, self._metadata_transformer, request_iterator, |
||||
self._request_serializer, self._response_deserializer) |
||||
|
||||
def event( |
||||
self, receiver, abortion_callback, timeout, metadata=None, |
||||
protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
|
||||
class _GenericStub(face.GenericStub): |
||||
|
||||
def __init__( |
||||
self, channel, metadata_transformer, request_serializers, |
||||
response_deserializers): |
||||
self._channel = channel |
||||
self._metadata_transformer = metadata_transformer |
||||
self._request_serializers = request_serializers or {} |
||||
self._response_deserializers = response_deserializers or {} |
||||
|
||||
def blocking_unary_unary( |
||||
self, group, method, request, timeout, metadata=None, |
||||
with_call=None, protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _blocking_unary_unary( |
||||
self._channel, group, method, timeout, with_call, protocol_options, |
||||
metadata, self._metadata_transformer, request, request_serializer, |
||||
response_deserializer) |
||||
|
||||
def future_unary_unary( |
||||
self, group, method, request, timeout, metadata=None, |
||||
protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _future_unary_unary( |
||||
self._channel, group, method, timeout, protocol_options, metadata, |
||||
self._metadata_transformer, request, request_serializer, |
||||
response_deserializer) |
||||
|
||||
def inline_unary_stream( |
||||
self, group, method, request, timeout, metadata=None, |
||||
protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _unary_stream( |
||||
self._channel, group, method, timeout, protocol_options, metadata, |
||||
self._metadata_transformer, request, request_serializer, |
||||
response_deserializer) |
||||
|
||||
def blocking_stream_unary( |
||||
self, group, method, request_iterator, timeout, metadata=None, |
||||
with_call=None, protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _blocking_stream_unary( |
||||
self._channel, group, method, timeout, with_call, protocol_options, |
||||
metadata, self._metadata_transformer, request_iterator, |
||||
request_serializer, response_deserializer) |
||||
|
||||
def future_stream_unary( |
||||
self, group, method, request_iterator, timeout, metadata=None, |
||||
protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _future_stream_unary( |
||||
self._channel, group, method, timeout, protocol_options, metadata, |
||||
self._metadata_transformer, request_iterator, request_serializer, |
||||
response_deserializer) |
||||
|
||||
def inline_stream_stream( |
||||
self, group, method, request_iterator, timeout, metadata=None, |
||||
protocol_options=None): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _stream_stream( |
||||
self._channel, group, method, timeout, protocol_options, metadata, |
||||
self._metadata_transformer, request_iterator, request_serializer, |
||||
response_deserializer) |
||||
|
||||
def event_unary_unary( |
||||
self, group, method, request, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
def event_unary_stream( |
||||
self, group, method, request, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
def event_stream_unary( |
||||
self, group, method, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
def event_stream_stream( |
||||
self, group, method, receiver, abortion_callback, timeout, |
||||
metadata=None, protocol_options=None): |
||||
raise NotImplementedError() |
||||
|
||||
def unary_unary(self, group, method): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _UnaryUnaryMultiCallable( |
||||
self._channel, group, method, self._metadata_transformer, |
||||
request_serializer, response_deserializer) |
||||
|
||||
def unary_stream(self, group, method): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _UnaryStreamMultiCallable( |
||||
self._channel, group, method, self._metadata_transformer, |
||||
request_serializer, response_deserializer) |
||||
|
||||
def stream_unary(self, group, method): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _StreamUnaryMultiCallable( |
||||
self._channel, group, method, self._metadata_transformer, |
||||
request_serializer, response_deserializer) |
||||
|
||||
def stream_stream(self, group, method): |
||||
request_serializer = self._request_serializers.get((group, method,)) |
||||
response_deserializer = self._response_deserializers.get((group, method,)) |
||||
return _StreamStreamMultiCallable( |
||||
self._channel, group, method, self._metadata_transformer, |
||||
request_serializer, response_deserializer) |
||||
|
||||
def __enter__(self): |
||||
return self |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
return False |
||||
|
||||
|
||||
class _DynamicStub(face.DynamicStub): |
||||
|
||||
def __init__(self, generic_stub, group, cardinalities): |
||||
self._generic_stub = generic_stub |
||||
self._group = group |
||||
self._cardinalities = cardinalities |
||||
|
||||
def __getattr__(self, attr): |
||||
method_cardinality = self._cardinalities.get(attr) |
||||
if method_cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||
return self._generic_stub.unary_unary(self._group, attr) |
||||
elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||
return self._generic_stub.unary_stream(self._group, attr) |
||||
elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||
return self._generic_stub.stream_unary(self._group, attr) |
||||
elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||
return self._generic_stub.stream_stream(self._group, attr) |
||||
else: |
||||
raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr) |
||||
|
||||
def __enter__(self): |
||||
return self |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
return False |
||||
|
||||
|
||||
def generic_stub( |
||||
channel, host, metadata_transformer, request_serializers, |
||||
response_deserializers): |
||||
return _GenericStub( |
||||
channel, metadata_transformer, request_serializers, |
||||
response_deserializers) |
||||
|
||||
|
||||
def dynamic_stub( |
||||
channel, service, cardinalities, host, metadata_transformer, |
||||
request_serializers, response_deserializers): |
||||
return _DynamicStub( |
||||
_GenericStub( |
||||
channel, metadata_transformer, request_serializers, |
||||
response_deserializers), |
||||
service, cardinalities) |
@ -0,0 +1,359 @@ |
||||
# Copyright 2016, Google Inc. |
||||
# All rights reserved. |
||||
# |
||||
# Redistribution and use in source and binary forms, with or without |
||||
# modification, are permitted provided that the following conditions are |
||||
# met: |
||||
# |
||||
# * Redistributions of source code must retain the above copyright |
||||
# notice, this list of conditions and the following disclaimer. |
||||
# * Redistributions in binary form must reproduce the above |
||||
# copyright notice, this list of conditions and the following disclaimer |
||||
# in the documentation and/or other materials provided with the |
||||
# distribution. |
||||
# * Neither the name of Google Inc. nor the names of its |
||||
# contributors may be used to endorse or promote products derived from |
||||
# this software without specific prior written permission. |
||||
# |
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
|
||||
"""Translates gRPC's server-side API into gRPC's server-side Beta API.""" |
||||
|
||||
import collections |
||||
import threading |
||||
|
||||
import grpc |
||||
from grpc.beta import interfaces |
||||
from grpc.framework.common import cardinality |
||||
from grpc.framework.common import style |
||||
from grpc.framework.foundation import abandonment |
||||
from grpc.framework.foundation import logging_pool |
||||
from grpc.framework.foundation import stream |
||||
from grpc.framework.interfaces.face import face |
||||
|
||||
_DEFAULT_POOL_SIZE = 8 |
||||
|
||||
|
||||
class _ServerProtocolContext(interfaces.GRPCServicerContext): |
||||
|
||||
def __init__(self, servicer_context): |
||||
self._servicer_context = servicer_context |
||||
|
||||
def peer(self): |
||||
return self._servicer_context.peer() |
||||
|
||||
def disable_next_response_compression(self): |
||||
pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement. |
||||
|
||||
|
||||
class _FaceServicerContext(face.ServicerContext): |
||||
|
||||
def __init__(self, servicer_context): |
||||
self._servicer_context = servicer_context |
||||
|
||||
def is_active(self): |
||||
return self._servicer_context.is_active() |
||||
|
||||
def time_remaining(self): |
||||
return self._servicer_context.time_remaining() |
||||
|
||||
def add_abortion_callback(self, abortion_callback): |
||||
raise NotImplementedError( |
||||
'add_abortion_callback no longer supported server-side!') |
||||
|
||||
def cancel(self): |
||||
self._servicer_context.cancel() |
||||
|
||||
def protocol_context(self): |
||||
return _ServerProtocolContext(self._servicer_context) |
||||
|
||||
def invocation_metadata(self): |
||||
return self._servicer_context.invocation_metadata() |
||||
|
||||
def initial_metadata(self, initial_metadata): |
||||
self._servicer_context.send_initial_metadata(initial_metadata) |
||||
|
||||
def terminal_metadata(self, terminal_metadata): |
||||
self._servicer_context.set_terminal_metadata(terminal_metadata) |
||||
|
||||
def code(self, code): |
||||
self._servicer_context.set_code(code) |
||||
|
||||
def details(self, details): |
||||
self._servicer_context.set_details(details) |
||||
|
||||
|
||||
def _adapt_unary_request_inline(unary_request_inline): |
||||
def adaptation(request, servicer_context): |
||||
return unary_request_inline(request, _FaceServicerContext(servicer_context)) |
||||
return adaptation |
||||
|
||||
|
||||
def _adapt_stream_request_inline(stream_request_inline): |
||||
def adaptation(request_iterator, servicer_context): |
||||
return stream_request_inline( |
||||
request_iterator, _FaceServicerContext(servicer_context)) |
||||
return adaptation |
||||
|
||||
|
||||
class _Callback(stream.Consumer): |
||||
|
||||
def __init__(self): |
||||
self._condition = threading.Condition() |
||||
self._values = [] |
||||
self._terminated = False |
||||
self._cancelled = False |
||||
|
||||
def consume(self, value): |
||||
with self._condition: |
||||
self._values.append(value) |
||||
self._condition.notify_all() |
||||
|
||||
def terminate(self): |
||||
with self._condition: |
||||
self._terminated = True |
||||
self._condition.notify_all() |
||||
|
||||
def consume_and_terminate(self, value): |
||||
with self._condition: |
||||
self._values.append(value) |
||||
self._terminated = True |
||||
self._condition.notify_all() |
||||
|
||||
def cancel(self): |
||||
with self._condition: |
||||
self._cancelled = True |
||||
self._condition.notify_all() |
||||
|
||||
def draw_one_value(self): |
||||
with self._condition: |
||||
while True: |
||||
if self._cancelled: |
||||
raise abandonment.Abandoned() |
||||
elif self._values: |
||||
return self._values.pop(0) |
||||
elif self._terminated: |
||||
return None |
||||
else: |
||||
self._condition.wait() |
||||
|
||||
def draw_all_values(self): |
||||
with self._condition: |
||||
while True: |
||||
if self._cancelled: |
||||
raise abandonment.Abandoned() |
||||
elif self._terminated: |
||||
all_values = tuple(self._values) |
||||
self._values = None |
||||
return all_values |
||||
else: |
||||
self._condition.wait() |
||||
|
||||
|
||||
def _pipe_requests(request_iterator, request_consumer, servicer_context): |
||||
for request in request_iterator: |
||||
if not servicer_context.is_active(): |
||||
return |
||||
request_consumer.consume(request) |
||||
if not servicer_context.is_active(): |
||||
return |
||||
request_consumer.terminate() |
||||
|
||||
|
||||
def _adapt_unary_unary_event(unary_unary_event): |
||||
def adaptation(request, servicer_context): |
||||
callback = _Callback() |
||||
if not servicer_context.add_callback(callback.cancel): |
||||
raise abandonment.Abandoned() |
||||
unary_unary_event( |
||||
request, callback.consume_and_terminate, |
||||
_FaceServicerContext(servicer_context)) |
||||
return callback.draw_all_values()[0] |
||||
return adaptation |
||||
|
||||
|
||||
def _adapt_unary_stream_event(unary_stream_event): |
||||
def adaptation(request, servicer_context): |
||||
callback = _Callback() |
||||
if not servicer_context.add_callback(callback.cancel): |
||||
raise abandonment.Abandoned() |
||||
unary_stream_event( |
||||
request, callback, _FaceServicerContext(servicer_context)) |
||||
while True: |
||||
response = callback.draw_one_value() |
||||
if response is None: |
||||
return |
||||
else: |
||||
yield response |
||||
return adaptation |
||||
|
||||
|
||||
def _adapt_stream_unary_event(stream_unary_event): |
||||
def adaptation(request_iterator, servicer_context): |
||||
callback = _Callback() |
||||
if not servicer_context.add_callback(callback.cancel): |
||||
raise abandonment.Abandoned() |
||||
request_consumer = stream_unary_event( |
||||
callback.consume_and_terminate, _FaceServicerContext(servicer_context)) |
||||
request_pipe_thread = threading.Thread( |
||||
target=_pipe_requests, |
||||
args=(request_iterator, request_consumer, servicer_context,)) |
||||
request_pipe_thread.start() |
||||
return callback.draw_all_values()[0] |
||||
return adaptation |
||||
|
||||
|
||||
def _adapt_stream_stream_event(stream_stream_event): |
||||
def adaptation(request_iterator, servicer_context): |
||||
callback = _Callback() |
||||
if not servicer_context.add_callback(callback.cancel): |
||||
raise abandonment.Abandoned() |
||||
request_consumer = stream_stream_event( |
||||
callback, _FaceServicerContext(servicer_context)) |
||||
request_pipe_thread = threading.Thread( |
||||
target=_pipe_requests, |
||||
args=(request_iterator, request_consumer, servicer_context,)) |
||||
request_pipe_thread.start() |
||||
while True: |
||||
response = callback.draw_one_value() |
||||
if response is None: |
||||
return |
||||
else: |
||||
yield response |
||||
return adaptation |
||||
|
||||
|
||||
class _SimpleMethodHandler( |
||||
collections.namedtuple( |
||||
'_MethodHandler', |
||||
('request_streaming', 'response_streaming', 'request_deserializer', |
||||
'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary', |
||||
'stream_stream',)), |
||||
grpc.RpcMethodHandler): |
||||
pass |
||||
|
||||
|
||||
def _simple_method_handler( |
||||
implementation, request_deserializer, response_serializer): |
||||
if implementation.style is style.Service.INLINE: |
||||
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||
return _SimpleMethodHandler( |
||||
False, False, request_deserializer, response_serializer, |
||||
_adapt_unary_request_inline(implementation.unary_unary_inline), None, |
||||
None, None) |
||||
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||
return _SimpleMethodHandler( |
||||
False, True, request_deserializer, response_serializer, None, |
||||
_adapt_unary_request_inline(implementation.unary_stream_inline), None, |
||||
None) |
||||
elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||
return _SimpleMethodHandler( |
||||
True, False, request_deserializer, response_serializer, None, None, |
||||
_adapt_stream_request_inline(implementation.stream_unary_inline), |
||||
None) |
||||
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||
return _SimpleMethodHandler( |
||||
True, True, request_deserializer, response_serializer, None, None, |
||||
None, |
||||
_adapt_stream_request_inline(implementation.stream_stream_inline)) |
||||
elif implementation.style is style.Service.EVENT: |
||||
if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY: |
||||
return _SimpleMethodHandler( |
||||
False, False, request_deserializer, response_serializer, |
||||
_adapt_unary_unary_event(implementation.unary_unary_event), None, |
||||
None, None) |
||||
elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM: |
||||
return _SimpleMethodHandler( |
||||
False, True, request_deserializer, response_serializer, None, |
||||
_adapt_unary_stream_event(implementation.unary_stream_event), None, |
||||
None) |
||||
elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY: |
||||
return _SimpleMethodHandler( |
||||
True, False, request_deserializer, response_serializer, None, None, |
||||
_adapt_stream_unary_event(implementation.stream_unary_event), None) |
||||
elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM: |
||||
return _SimpleMethodHandler( |
||||
True, True, request_deserializer, response_serializer, None, None, |
||||
None, _adapt_stream_stream_event(implementation.stream_stream_event)) |
||||
|
||||
|
||||
class _GenericRpcHandler(grpc.GenericRpcHandler): |
||||
|
||||
def __init__( |
||||
self, method_implementations, multi_method_implementation, |
||||
request_deserializers, response_serializers): |
||||
self._method_implementations = method_implementations |
||||
self._multi_method_implementation = multi_method_implementation |
||||
self._request_deserializers = request_deserializers or {} |
||||
self._response_serializers = response_serializers or {} |
||||
|
||||
def service(self, handler_call_details): |
||||
try: |
||||
group_name, method_name = handler_call_details.method.split(b'/')[1:3] |
||||
except ValueError: |
||||
return None |
||||
else: |
||||
method_implementation = self._method_implementations.get( |
||||
(group_name, method_name,)) |
||||
if method_implementation is not None: |
||||
return _simple_method_handler( |
||||
method_implementation, |
||||
self._request_deserializers.get((group_name, method_name,)), |
||||
self._response_serializers.get((group_name, method_name,))) |
||||
elif self._multi_method_implementation is None: |
||||
return None |
||||
else: |
||||
try: |
||||
return None #TODO(nathaniel): call the multimethod. |
||||
except face.NoSuchMethodError: |
||||
return None |
||||
|
||||
|
||||
class _Server(interfaces.Server): |
||||
|
||||
def __init__(self, server): |
||||
self._server = server |
||||
|
||||
def add_insecure_port(self, address): |
||||
return self._server.add_insecure_port(address) |
||||
|
||||
def add_secure_port(self, address, server_credentials): |
||||
return self._server.add_secure_port(address, server_credentials) |
||||
|
||||
def start(self): |
||||
self._server.start() |
||||
|
||||
def stop(self, grace): |
||||
return self._server.stop(grace) |
||||
|
||||
def __enter__(self): |
||||
self._server.start() |
||||
return self |
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb): |
||||
self._server.stop(None) |
||||
return False |
||||
|
||||
|
||||
def server( |
||||
service_implementations, multi_method_implementation, request_deserializers, |
||||
response_serializers, thread_pool, thread_pool_size): |
||||
generic_rpc_handler = _GenericRpcHandler( |
||||
service_implementations, multi_method_implementation, |
||||
request_deserializers, response_serializers) |
||||
if thread_pool is None: |
||||
effective_thread_pool = logging_pool.pool( |
||||
_DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) |
||||
else: |
||||
effective_thread_pool = thread_pool |
||||
return _Server(grpc.server((generic_rpc_handler,), effective_thread_pool)) |
Loading…
Reference in new issue