diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h index 56f71ef2349..c7e5dbc6470 100644 --- a/include/grpc/support/slice_buffer.h +++ b/include/grpc/support/slice_buffer.h @@ -74,6 +74,8 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *slices, size_t n); /* add a very small (less than 8 bytes) amount of data to the end of a slice buffer: returns a pointer into which to add the data */ gpr_uint8 *gpr_slice_buffer_tiny_add(gpr_slice_buffer *sb, unsigned len); +/* pop the last buffer, but don't unref it */ +void gpr_slice_buffer_pop(gpr_slice_buffer *sb); /* clear a slice buffer, unref all elements */ void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb); diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 6cd51f925c3..b280e4bd020 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -143,6 +143,13 @@ void gpr_slice_buffer_addn(gpr_slice_buffer *sb, gpr_slice *s, size_t n) { } } +void gpr_slice_buffer_pop(gpr_slice_buffer *sb) { + if (sb->count != 0) { + size_t count = --sb->count; + sb->length -= GPR_SLICE_LENGTH(sb->slices[count]); + } +} + void gpr_slice_buffer_reset_and_unref(gpr_slice_buffer *sb) { size_t i; diff --git a/src/cpp/proto/proto_utils.cc b/src/cpp/proto/proto_utils.cc index 72f1bf74417..e4e51bfebf5 100644 --- a/src/cpp/proto/proto_utils.cc +++ b/src/cpp/proto/proto_utils.cc @@ -35,38 +35,135 @@ #include #include +#include #include +#include +#include +#include -namespace grpc { +const int kMaxBufferLength = 8192; -bool SerializeProto(const grpc::protobuf::Message &msg, - grpc_byte_buffer **bp) { - grpc::string msg_str; - bool success = msg.SerializeToString(&msg_str); - if (success) { - gpr_slice slice = - gpr_slice_from_copied_buffer(msg_str.data(), msg_str.length()); - *bp = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); +class GrpcBufferWriter GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyOutputStream { + public: + explicit GrpcBufferWriter(grpc_byte_buffer **bp, + int block_size = kMaxBufferLength) + : block_size_(block_size), byte_count_(0), have_backup_(false) { + *bp = grpc_byte_buffer_create(NULL, 0); + slice_buffer_ = &(*bp)->data.slice_buffer; + } + + ~GrpcBufferWriter() GRPC_OVERRIDE { + if (have_backup_) { + gpr_slice_unref(backup_slice_); + } + } + + bool Next(void **data, int *size) GRPC_OVERRIDE { + if (have_backup_) { + slice_ = backup_slice_; + have_backup_ = false; + } else { + slice_ = gpr_slice_malloc(block_size_); + } + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + gpr_slice_buffer_add(slice_buffer_, slice_); + return true; + } + + void BackUp(int count) GRPC_OVERRIDE { + gpr_slice_buffer_pop(slice_buffer_); + if (count == block_size_) { + backup_slice_ = slice_; + } else { + backup_slice_ = + gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); + gpr_slice_buffer_add(slice_buffer_, slice_); + } + have_backup_ = true; + byte_count_ -= count; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } + + private: + const int block_size_; + gpr_int64 byte_count_; + gpr_slice_buffer *slice_buffer_; + bool have_backup_; + gpr_slice backup_slice_; + gpr_slice slice_; +}; + +class GrpcBufferReader GRPC_FINAL + : public ::google::protobuf::io::ZeroCopyInputStream { + public: + explicit GrpcBufferReader(grpc_byte_buffer *buffer) + : byte_count_(0), backup_count_(0) { + reader_ = grpc_byte_buffer_reader_create(buffer); + } + ~GrpcBufferReader() GRPC_OVERRIDE { + grpc_byte_buffer_reader_destroy(reader_); } - return success; -} -bool DeserializeProto(grpc_byte_buffer *buffer, - grpc::protobuf::Message *msg) { - grpc::string msg_string; - grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); - gpr_slice slice; - while (grpc_byte_buffer_reader_next(reader, &slice)) { - const char *data = reinterpret_cast( - slice.refcount ? slice.data.refcounted.bytes - : slice.data.inlined.bytes); - msg_string.append(data, slice.refcount ? slice.data.refcounted.length - : slice.data.inlined.length); - gpr_slice_unref(slice); + bool Next(const void **data, int *size) GRPC_OVERRIDE { + if (backup_count_ > 0) { + *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - + backup_count_; + *size = backup_count_; + backup_count_ = 0; + return true; + } + if (!grpc_byte_buffer_reader_next(reader_, &slice_)) { + return false; + } + gpr_slice_unref(slice_); + *data = GPR_SLICE_START_PTR(slice_); + byte_count_ += *size = GPR_SLICE_LENGTH(slice_); + return true; } - grpc_byte_buffer_reader_destroy(reader); - return msg->ParseFromString(msg_string); + + void BackUp(int count) GRPC_OVERRIDE { + backup_count_ = count; + } + + bool Skip(int count) GRPC_OVERRIDE { + const void *data; + int size; + while (Next(&data, &size)) { + if (size >= count) { + BackUp(size - count); + return true; + } + // size < count; + count -= size; + } + // error or we have too large count; + return false; + } + + gpr_int64 ByteCount() const GRPC_OVERRIDE { + return byte_count_ - backup_count_; + } + + private: + gpr_int64 byte_count_; + gpr_int64 backup_count_; + grpc_byte_buffer_reader *reader_; + gpr_slice slice_; +}; + +namespace grpc { + +bool SerializeProto(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp) { + GrpcBufferWriter writer(bp); + return msg.SerializeToZeroCopyStream(&writer); +} + +bool DeserializeProto(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg) { + GrpcBufferReader reader(buffer); + return msg->ParseFromZeroCopyStream(&reader); } } // namespace grpc diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 6ef9e600062..339c0ef216c 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -357,90 +357,3 @@ class ForeLink(ticket_interfaces.ForeLink, activated.Activated): self._complete(ticket.operation_id, ticket.payload) else: self._cancel(ticket.operation_id) - - -class _ActivatedForeLink(ticket_interfaces.ForeLink, activated.Activated): - - def __init__( - self, port, request_deserializers, response_serializers, - root_certificates, key_chain_pairs): - self._port = port - self._request_deserializers = request_deserializers - self._response_serializers = response_serializers - self._root_certificates = root_certificates - self._key_chain_pairs = key_chain_pairs - - self._lock = threading.Lock() - self._pool = None - self._fore_link = None - self._rear_link = null.NULL_REAR_LINK - - def join_rear_link(self, rear_link): - with self._lock: - self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - if self._fore_link is not None: - self._fore_link.join_rear_link(rear_link) - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._fore_link = ForeLink( - self._pool, self._request_deserializers, self._response_serializers, - self._root_certificates, self._key_chain_pairs, port=self._port) - self._fore_link.join_rear_link(self._rear_link) - self._fore_link.start() - return self - - def _stop(self): - with self._lock: - self._fore_link.stop() - self._fore_link = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - return self._start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def port(self): - with self._lock: - return None if self._fore_link is None else self._fore_link.port() - - def accept_back_to_front_ticket(self, ticket): - with self._lock: - if self._fore_link is not None: - self._fore_link.accept_back_to_front_ticket(ticket) - - -def activated_fore_link( - port, request_deserializers, response_serializers, root_certificates, - key_chain_pairs): - """Creates a ForeLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - port: The port on which to serve RPCs, or None for a port to be - automatically selected. - request_deserializers: A dictionary from RPC method names to request object - deserializer behaviors. - response_serializers: A dictionary from RPC method names to response object - serializer behaviors. - root_certificates: The PEM-encoded client root certificates as a bytestring - or None. - key_chain_pairs: A sequence of PEM-encoded private key-certificate chain - pairs. - """ - return _ActivatedForeLink( - port, request_deserializers, response_serializers, root_certificates, - key_chain_pairs) diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index fc71bf0a6c6..62703fab30e 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -387,127 +387,3 @@ class RearLink(ticket_interfaces.RearLink, activated.Activated): else: # NOTE(nathaniel): All other categories are treated as cancellation. self._cancel(ticket.operation_id) - - -class _ActivatedRearLink(ticket_interfaces.RearLink, activated.Activated): - - def __init__( - self, host, port, request_serializers, response_deserializers, secure, - root_certificates, private_key, certificate_chain, - server_host_override=None): - self._host = host - self._port = port - self._request_serializers = request_serializers - self._response_deserializers = response_deserializers - self._secure = secure - self._root_certificates = root_certificates - self._private_key = private_key - self._certificate_chain = certificate_chain - self._server_host_override = server_host_override - - self._lock = threading.Lock() - self._pool = None - self._rear_link = None - self._fore_link = null.NULL_FORE_LINK - - def join_fore_link(self, fore_link): - with self._lock: - self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link - if self._rear_link is not None: - self._rear_link.join_fore_link(self._fore_link) - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._rear_link = RearLink( - self._host, self._port, self._pool, self._request_serializers, - self._response_deserializers, self._secure, self._root_certificates, - self._private_key, self._certificate_chain, - server_host_override=self._server_host_override) - self._rear_link.join_fore_link(self._fore_link) - self._rear_link.start() - return self - - def _stop(self): - with self._lock: - self._rear_link.stop() - self._rear_link = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - return self._start() - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def accept_front_to_back_ticket(self, ticket): - with self._lock: - if self._rear_link is not None: - self._rear_link.accept_front_to_back_ticket(ticket) - - -# TODO(issue 726): reconcile these two creation functions. -def activated_rear_link( - host, port, request_serializers, response_deserializers): - """Creates a RearLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - host: The host to which to connect for RPC service. - port: The port to which to connect for RPC service. - request_serializers: A dictionary from RPC method name to request object - serializer behavior. - response_deserializers: A dictionary from RPC method name to response - object deserializer behavior. - secure: A boolean indicating whether or not to use a secure connection. - root_certificates: The PEM-encoded root certificates or None to ask for - them to be retrieved from a default location. - private_key: The PEM-encoded private key to use or None if no private key - should be used. - certificate_chain: The PEM-encoded certificate chain to use or None if no - certificate chain should be used. - """ - return _ActivatedRearLink( - host, port, request_serializers, response_deserializers, False, None, - None, None) - - - -def secure_activated_rear_link( - host, port, request_serializers, response_deserializers, root_certificates, - private_key, certificate_chain, server_host_override=None): - """Creates a RearLink that is also an activated.Activated. - - The returned object is only valid for use between calls to its start and stop - methods (or in context when used as a context manager). - - Args: - host: The host to which to connect for RPC service. - port: The port to which to connect for RPC service. - request_serializers: A dictionary from RPC method name to request object - serializer behavior. - response_deserializers: A dictionary from RPC method name to response - object deserializer behavior. - root_certificates: The PEM-encoded root certificates or None to ask for - them to be retrieved from a default location. - private_key: The PEM-encoded private key to use or None if no private key - should be used. - certificate_chain: The PEM-encoded certificate chain to use or None if no - certificate chain should be used. - server_host_override: (For testing only) the target name used for SSL - host name checking. - """ - return _ActivatedRearLink( - host, port, request_serializers, response_deserializers, True, - root_certificates, private_key, certificate_chain, - server_host_override=server_host_override) diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/early_adopter/_reexport.py index 3fed8099f6f..f3416028e8c 100644 --- a/src/python/src/grpc/early_adopter/_reexport.py +++ b/src/python/src/grpc/early_adopter/_reexport.py @@ -174,45 +174,6 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync): return _ReexportedFuture(self._underlying.future(request_iterator, timeout)) -class _Stub(interfaces.Stub): - - def __init__(self, assembly_stub, cardinalities): - self._assembly_stub = assembly_stub - self._cardinalities = cardinalities - - def __enter__(self): - self._assembly_stub.__enter__() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._assembly_stub.__exit__(exc_type, exc_val, exc_tb) - return False - - def __getattr__(self, attr): - underlying_attr = self._assembly_stub.__getattr__(attr) - method_cardinality = self._cardinalities.get(attr) - # TODO(nathaniel): unify this trick with its other occurrence in the code. - if method_cardinality is None: - for name, method_cardinality in self._cardinalities.iteritems(): - last_slash_index = name.rfind('/') - if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: - break - else: - raise AttributeError(attr) - if method_cardinality is interfaces.Cardinality.UNARY_UNARY: - return _UnaryUnarySyncAsync(underlying_attr) - elif method_cardinality is interfaces.Cardinality.UNARY_STREAM: - return lambda request, timeout: _CancellableIterator( - underlying_attr(request, timeout)) - elif method_cardinality is interfaces.Cardinality.STREAM_UNARY: - return _StreamUnarySyncAsync(underlying_attr) - elif method_cardinality is interfaces.Cardinality.STREAM_STREAM: - return lambda request_iterator, timeout: _CancellableIterator( - underlying_attr(request_iterator, timeout)) - else: - raise AttributeError(attr) - - def common_cardinalities(early_adopter_cardinalities): common_cardinalities = {} for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems(): @@ -225,5 +186,13 @@ def rpc_context(face_rpc_context): return _RpcContext(face_rpc_context) -def stub(face_stub, cardinalities): - return _Stub(face_stub, cardinalities) +def cancellable_iterator(face_cancellable_iterator): + return _CancellableIterator(face_cancellable_iterator) + + +def unary_unary_sync_async(face_unary_unary_multi_callable): + return _UnaryUnarySyncAsync(face_unary_unary_multi_callable) + + +def stream_unary_sync_async(face_stream_unary_multi_callable): + return _StreamUnarySyncAsync(face_stream_unary_multi_callable) diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index b46f94e9fbf..1c02f9e4d63 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -36,7 +36,13 @@ from grpc._adapter import rear as _rear from grpc.early_adopter import _face_utilities from grpc.early_adopter import _reexport from grpc.early_adopter import interfaces -from grpc.framework.assembly import implementations as _assembly_implementations +from grpc.framework.base import util as _base_utilities +from grpc.framework.base.packets import implementations as _tickets_implementations +from grpc.framework.face import implementations as _face_implementations +from grpc.framework.foundation import logging_pool + +_THREAD_POOL_SIZE = 80 +_ONE_DAY_IN_SECONDS = 24 * 60 * 60 class _Server(interfaces.Server): @@ -50,30 +56,39 @@ class _Server(interfaces.Server): else: self._key_chain_pairs = ((private_key, certificate_chain),) + self._pool = None + self._back = None self._fore_link = None - self._server = None def _start(self): with self._lock: - if self._server is None: - self._fore_link = _fore.activated_fore_link( - self._port, self._breakdown.request_deserializers, + if self._pool is None: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + servicer = _face_implementations.servicer( + self._pool, self._breakdown.implementations, None) + self._back = _tickets_implementations.back( + servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, + _ONE_DAY_IN_SECONDS) + self._fore_link = _fore.ForeLink( + self._pool, self._breakdown.request_deserializers, self._breakdown.response_serializers, None, self._key_chain_pairs) - - self._server = _assembly_implementations.assemble_service( - self._breakdown.implementations, self._fore_link) - self._server.start() + self._back.join_fore_link(self._fore_link) + self._fore_link.join_rear_link(self._back) + self._fore_link.start() else: raise ValueError('Server currently running!') def _stop(self): with self._lock: - if self._server is None: + if self._pool is None: raise ValueError('Server not running!') else: - self._server.stop() - self._server = None + self._fore_link.stop() + _base_utilities.wait_for_idle(self._back) + self._pool.shutdown(wait=True) self._fore_link = None + self._back = None + self._pool = None def __enter__(self): self._start() @@ -93,11 +108,101 @@ class _Server(interfaces.Server): with self._lock: return self._fore_link.port() -def _build_stub(breakdown, activated_rear_link): - assembly_stub = _assembly_implementations.assemble_dynamic_inline_stub( - _reexport.common_cardinalities(breakdown.cardinalities), - activated_rear_link) - return _reexport.stub(assembly_stub, breakdown.cardinalities) + +class _Stub(interfaces.Stub): + + def __init__( + self, breakdown, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=None): + self._lock = threading.Lock() + self._breakdown = breakdown + self._host = host + self._port = port + self._secure = secure + self._root_certificates = root_certificates + self._private_key = private_key + self._certificate_chain = certificate_chain + self._server_host_override = server_host_override + + self._pool = None + self._front = None + self._rear_link = None + self._understub = None + + def __enter__(self): + with self._lock: + if self._pool is None: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + self._front = _tickets_implementations.front( + self._pool, self._pool, self._pool) + self._rear_link = _rear.RearLink( + self._host, self._port, self._pool, + self._breakdown.request_serializers, + self._breakdown.response_deserializers, self._secure, + self._root_certificates, self._private_key, self._certificate_chain, + server_host_override=self._server_host_override) + self._front.join_rear_link(self._rear_link) + self._rear_link.join_fore_link(self._front) + self._rear_link.start() + self._understub = _face_implementations.dynamic_stub( + _reexport.common_cardinalities(self._breakdown.cardinalities), + self._front, self._pool, '') + else: + raise ValueError('Tried to __enter__ already-__enter__ed Stub!') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + with self._lock: + if self._pool is None: + raise ValueError('Tried to __exit__ non-__enter__ed Stub!') + else: + self._rear_link.stop() + _base_utilities.wait_for_idle(self._front) + self._pool.shutdown(wait=True) + self._rear_link = None + self._front = None + self._pool = None + self._understub = None + return False + + def __getattr__(self, attr): + with self._lock: + if self._pool is None: + raise ValueError('Tried to __getattr__ non-__enter__ed Stub!') + else: + underlying_attr = getattr(self._understub, attr, None) + method_cardinality = self._breakdown.cardinalities.get(attr) + # TODO(nathaniel): Eliminate this trick. + if underlying_attr is None: + for method_name, method_cardinality in self._breakdown.cardinalities.iteritems(): + last_slash_index = method_name.rfind('/') + if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr: + underlying_attr = getattr(self._understub, method_name) + break + else: + raise AttributeError(attr) + if method_cardinality is interfaces.Cardinality.UNARY_UNARY: + return _reexport.unary_unary_sync_async(underlying_attr) + elif method_cardinality is interfaces.Cardinality.UNARY_STREAM: + return lambda request, timeout: _reexport.cancellable_iterator( + underlying_attr(request, timeout)) + elif method_cardinality is interfaces.Cardinality.STREAM_UNARY: + return _reexport.stream_unary_sync_async(underlying_attr) + elif method_cardinality is interfaces.Cardinality.STREAM_STREAM: + return lambda request_iterator, timeout: ( + _reexport.cancellable_iterator(underlying_attr( + request_iterator, timeout))) + else: + raise AttributeError(attr) + + +def _build_stub( + methods, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=None): + breakdown = _face_utilities.break_down_invocation(methods) + return _Stub( + breakdown, host, port, secure, root_certificates, private_key, + certificate_chain, server_host_override=server_host_override) def _build_server(methods, port, private_key, certificate_chain): @@ -118,11 +223,7 @@ def insecure_stub(methods, host, port): Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _face_utilities.break_down_invocation(methods) - activated_rear_link = _rear.activated_rear_link( - host, port, breakdown.request_serializers, - breakdown.response_deserializers) - return _build_stub(breakdown, activated_rear_link) + return _build_stub(methods, host, port, False, None, None, None) def secure_stub( @@ -148,12 +249,9 @@ def secure_stub( Returns: An interfaces.Stub affording RPC invocation. """ - breakdown = _face_utilities.break_down_invocation(methods) - activated_rear_link = _rear.secure_activated_rear_link( - host, port, breakdown.request_serializers, - breakdown.response_deserializers, root_certificates, private_key, + return _build_stub( + methods, host, port, True, root_certificates, private_key, certificate_chain, server_host_override=server_host_override) - return _build_stub(breakdown, activated_rear_link) def insecure_server(methods, port): diff --git a/src/python/src/grpc/framework/assembly/__init__.py b/src/python/src/grpc/framework/assembly/__init__.py deleted file mode 100644 index 70865191060..00000000000 --- a/src/python/src/grpc/framework/assembly/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/src/grpc/framework/assembly/implementations.py b/src/python/src/grpc/framework/assembly/implementations.py deleted file mode 100644 index 24afcbeb6d6..00000000000 --- a/src/python/src/grpc/framework/assembly/implementations.py +++ /dev/null @@ -1,264 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""Implementations for assembling RPC framework values.""" - -import threading - -# tickets_interfaces, face_interfaces, and activated are referenced from -# specification in this module. -from grpc.framework.assembly import interfaces -from grpc.framework.base import util as base_utilities -from grpc.framework.base.packets import implementations as tickets_implementations -from grpc.framework.base.packets import interfaces as tickets_interfaces # pylint: disable=unused-import -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.face import implementations as face_implementations -from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import -from grpc.framework.face import utilities as face_utilities -from grpc.framework.foundation import activated # pylint: disable=unused-import -from grpc.framework.foundation import logging_pool - -_ONE_DAY_IN_SECONDS = 60 * 60 * 24 -_THREAD_POOL_SIZE = 100 - - -class _FaceStub(object): - - def __init__(self, rear_link): - self._rear_link = rear_link - self._lock = threading.Lock() - self._pool = None - self._front = None - self._under_stub = None - - def __enter__(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._front = tickets_implementations.front( - self._pool, self._pool, self._pool) - self._rear_link.start() - self._rear_link.join_fore_link(self._front) - self._front.join_rear_link(self._rear_link) - self._under_stub = face_implementations.generic_stub(self._front, self._pool) - - def __exit__(self, exc_type, exc_val, exc_tb): - with self._lock: - self._under_stub = None - self._rear_link.stop() - base_utilities.wait_for_idle(self._front) - self._front = None - self._pool.shutdown(wait=True) - self._pool = None - return False - - def __getattr__(self, attr): - with self._lock: - if self._under_stub is None: - raise ValueError('Called out of context!') - else: - return getattr(self._under_stub, attr) - - -def _behaviors(method_cardinalities, front, pool): - behaviors = {} - stub = face_implementations.generic_stub(front, pool) - for name, method_cardinality in method_cardinalities.iteritems(): - if method_cardinality is cardinality.Cardinality.UNARY_UNARY: - behaviors[name] = stub.unary_unary_multi_callable(name) - elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: - behaviors[name] = lambda request, context, bound_name=name: ( - stub.inline_value_in_stream_out(bound_name, request, context)) - elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: - behaviors[name] = stub.stream_unary_multi_callable(name) - elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: - behaviors[name] = lambda request_iterator, context, bound_name=name: ( - stub.inline_stream_in_stream_out( - bound_name, request_iterator, context)) - return behaviors - - -class _DynamicInlineStub(object): - - def __init__(self, cardinalities, rear_link): - self._cardinalities = cardinalities - self._rear_link = rear_link - self._lock = threading.Lock() - self._pool = None - self._front = None - self._behaviors = None - - def __enter__(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - self._front = tickets_implementations.front( - self._pool, self._pool, self._pool) - self._rear_link.start() - self._rear_link.join_fore_link(self._front) - self._front.join_rear_link(self._rear_link) - self._behaviors = _behaviors( - self._cardinalities, self._front, self._pool) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - with self._lock: - self._behaviors = None - self._rear_link.stop() - base_utilities.wait_for_idle(self._front) - self._front = None - self._pool.shutdown(wait=True) - self._pool = None - return False - - def __getattr__(self, attr): - with self._lock: - behavior = self._behaviors.get(attr) - if behavior is None: - for name, behavior in self._behaviors.iteritems(): - last_slash_index = name.rfind('/') - if 0 <= last_slash_index and name[last_slash_index + 1:] == attr: - return behavior - else: - raise AttributeError( - '_DynamicInlineStub instance has no attribute "%s"!' % attr) - else: - return behavior - - -class _ServiceAssembly(interfaces.Server): - - def __init__(self, implementations, fore_link): - self._implementations = implementations - self._fore_link = fore_link - self._lock = threading.Lock() - self._pool = None - self._back = None - - def _start(self): - with self._lock: - self._pool = logging_pool.pool(_THREAD_POOL_SIZE) - servicer = face_implementations.servicer( - self._pool, self._implementations, None) - self._back = tickets_implementations.back( - servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS, - _ONE_DAY_IN_SECONDS) - self._fore_link.start() - self._fore_link.join_rear_link(self._back) - self._back.join_fore_link(self._fore_link) - - def _stop(self): - with self._lock: - self._fore_link.stop() - base_utilities.wait_for_idle(self._back) - self._back = None - self._pool.shutdown(wait=True) - self._pool = None - - def __enter__(self): - self._start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._stop() - return False - - def start(self): - return self._start() - - def stop(self): - self._stop() - - def port(self): - with self._lock: - return self._fore_link.port() - - -def assemble_face_stub(activated_rear_link): - """Assembles a face_interfaces.GenericStub. - - The returned object is a context manager and may only be used in context to - invoke RPCs. - - Args: - activated_rear_link: An object that is both a tickets_interfaces.RearLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - A face_interfaces.GenericStub on which, in context, RPCs can be invoked. - """ - return _FaceStub(activated_rear_link) - - -def assemble_dynamic_inline_stub(cardinalities, activated_rear_link): - """Assembles a stub with method names for attributes. - - The returned object is a context manager and may only be used in context to - invoke RPCs. - - The returned object, when used in context, will respond to attribute access - as follows: if the requested attribute is the name of a unary-unary RPC - method, the value of the attribute will be a - face_interfaces.UnaryUnaryMultiCallable with which to invoke the RPC method. - If the requested attribute is the name of a unary-stream RPC method, the - value of the attribute will be a face_interfaces.UnaryStreamMultiCallable - with which to invoke the RPC method. If the requested attribute is the name - of a stream-unary RPC method, the value of the attribute will be a - face_interfaces.StreamUnaryMultiCallable with which to invoke the RPC method. - If the requested attribute is the name of a stream-stream RPC method, the - value of the attribute will be a face_interfaces.StreamStreamMultiCallable - with which to invoke the RPC method. - - Args: - cardinalities: A dictionary from RPC method name to cardinality.Cardinality - value identifying the cardinality of the named RPC method. - activated_rear_link: An object that is both a tickets_interfaces.RearLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - A face_interfaces.DynamicStub on which, in context, RPCs can be invoked. - """ - return _DynamicInlineStub(cardinalities, activated_rear_link) - - -def assemble_service(implementations, activated_fore_link): - """Assembles the service-side of the RPC Framework stack. - - Args: - implementations: A dictionary from RPC method name to - face_interfaces.MethodImplementation. - activated_fore_link: An object that is both a tickets_interfaces.ForeLink - and an activated.Activated. The object should be in the inactive state - when passed to this method. - - Returns: - An interfaces.Server encapsulating RPC service. - """ - return _ServiceAssembly(implementations, activated_fore_link) diff --git a/src/python/src/grpc/framework/assembly/implementations_test.py b/src/python/src/grpc/framework/assembly/implementations_test.py deleted file mode 100644 index 5540edff7ad..00000000000 --- a/src/python/src/grpc/framework/assembly/implementations_test.py +++ /dev/null @@ -1,288 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# TODO(nathaniel): Expand this test coverage. - -"""Test of the GRPC-backed ForeLink and RearLink.""" - -import threading -import unittest - -from grpc.framework.assembly import implementations -from grpc.framework.base import interfaces -from grpc.framework.base.packets import packets as tickets -from grpc.framework.base.packets import interfaces as tickets_interfaces -from grpc.framework.base.packets import null -from grpc.framework.face import utilities as face_utilities -from grpc.framework.foundation import logging_pool -from grpc._junkdrawer import math_pb2 - -DIV = 'Div' -DIV_MANY = 'DivMany' -FIB = 'Fib' -SUM = 'Sum' - -def _fibbonacci(limit): - left, right = 0, 1 - for _ in xrange(limit): - yield left - left, right = right, left + right - - -def _div(request, unused_context): - return math_pb2.DivReply( - quotient=request.dividend / request.divisor, - remainder=request.dividend % request.divisor) - - -def _div_many(request_iterator, unused_context): - for request in request_iterator: - yield math_pb2.DivReply( - quotient=request.dividend / request.divisor, - remainder=request.dividend % request.divisor) - - -def _fib(request, unused_context): - for number in _fibbonacci(request.limit): - yield math_pb2.Num(num=number) - - -def _sum(request_iterator, unused_context): - accumulation = 0 - for request in request_iterator: - accumulation += request.num - return math_pb2.Num(num=accumulation) - - -_IMPLEMENTATIONS = { - DIV: face_utilities.unary_unary_inline(_div), - DIV_MANY: face_utilities.stream_stream_inline(_div_many), - FIB: face_utilities.unary_stream_inline(_fib), - SUM: face_utilities.stream_unary_inline(_sum), -} - -_CARDINALITIES = { - name: implementation.cardinality - for name, implementation in _IMPLEMENTATIONS.iteritems()} - -_TIMEOUT = 10 - - -class PipeLink(tickets_interfaces.ForeLink, tickets_interfaces.RearLink): - - def __init__(self): - self._fore_lock = threading.Lock() - self._fore_link = null.NULL_FORE_LINK - self._rear_lock = threading.Lock() - self._rear_link = null.NULL_REAR_LINK - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - return False - - def start(self): - pass - - def stop(self): - pass - - def accept_back_to_front_ticket(self, ticket): - with self._fore_lock: - self._fore_link.accept_back_to_front_ticket(ticket) - - def join_rear_link(self, rear_link): - with self._rear_lock: - self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - - def accept_front_to_back_ticket(self, ticket): - with self._rear_lock: - self._rear_link.accept_front_to_back_ticket(ticket) - - def join_fore_link(self, fore_link): - with self._fore_lock: - self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link - - -class FaceStubTest(unittest.TestCase): - - def testUnaryUnary(self): - divisor = 7 - dividend = 13 - expected_quotient = 1 - expected_remainder = 6 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - service.start() - try: - with face_stub: - response = face_stub.blocking_value_in_value_out( - DIV, math_pb2.DivArgs(divisor=divisor, dividend=dividend), - _TIMEOUT) - self.assertEqual(expected_quotient, response.quotient) - self.assertEqual(expected_remainder, response.remainder) - finally: - service.stop() - - def testUnaryStream(self): - stream_length = 29 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - responses = list( - face_stub.inline_value_in_stream_out( - FIB, math_pb2.FibArgs(limit=stream_length), _TIMEOUT)) - numbers = [response.num for response in responses] - for early, middle, later in zip(numbers, numbers[1:], numbers[2:]): - self.assertEqual(early + middle, later) - - def testStreamUnary(self): - stream_length = 13 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - multi_callable = face_stub.stream_unary_multi_callable(SUM) - response_future = multi_callable.future( - (math_pb2.Num(num=index) for index in range(stream_length)), - _TIMEOUT) - self.assertEqual( - (stream_length * (stream_length - 1)) / 2, - response_future.result().num) - - def testStreamStream(self): - stream_length = 17 - divisor_offset = 7 - dividend_offset = 17 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - face_stub = implementations.assemble_face_stub(pipe) - - with service, face_stub: - response_iterator = face_stub.inline_stream_in_stream_out( - DIV_MANY, - (math_pb2.DivArgs( - divisor=divisor_offset + index, - dividend=dividend_offset + index) - for index in range(stream_length)), - _TIMEOUT) - for index, response in enumerate(response_iterator): - self.assertEqual( - (dividend_offset + index) / (divisor_offset + index), - response.quotient) - self.assertEqual( - (dividend_offset + index) % (divisor_offset + index), - response.remainder) - self.assertEqual(stream_length, index + 1) - - -class DynamicInlineStubTest(unittest.TestCase): - - def testUnaryUnary(self): - divisor = 59 - dividend = 973 - expected_quotient = dividend / divisor - expected_remainder = dividend % divisor - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - service.start() - with dynamic_stub: - response = dynamic_stub.Div( - math_pb2.DivArgs(divisor=divisor, dividend=dividend), _TIMEOUT) - self.assertEqual(expected_quotient, response.quotient) - self.assertEqual(expected_remainder, response.remainder) - service.stop() - - def testUnaryStream(self): - stream_length = 43 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_iterator = dynamic_stub.Fib( - math_pb2.FibArgs(limit=stream_length), _TIMEOUT) - numbers = tuple(response.num for response in response_iterator) - for early, middle, later in zip(numbers, numbers[:1], numbers[:2]): - self.assertEqual(early + middle, later) - self.assertEqual(stream_length, len(numbers)) - - def testStreamUnary(self): - stream_length = 127 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_future = dynamic_stub.Sum.future( - (math_pb2.Num(num=index) for index in range(stream_length)), - _TIMEOUT) - self.assertEqual( - (stream_length * (stream_length - 1)) / 2, - response_future.result().num) - - def testStreamStream(self): - stream_length = 179 - divisor_offset = 71 - dividend_offset = 1763 - pipe = PipeLink() - service = implementations.assemble_service(_IMPLEMENTATIONS, pipe) - dynamic_stub = implementations.assemble_dynamic_inline_stub( - _CARDINALITIES, pipe) - - with service, dynamic_stub: - response_iterator = dynamic_stub.DivMany( - (math_pb2.DivArgs( - divisor=divisor_offset + index, - dividend=dividend_offset + index) - for index in range(stream_length)), - _TIMEOUT) - for index, response in enumerate(response_iterator): - self.assertEqual( - (dividend_offset + index) / (divisor_offset + index), - response.quotient) - self.assertEqual( - (dividend_offset + index) % (divisor_offset + index), - response.remainder) - self.assertEqual(stream_length, index + 1) - - -if __name__ == '__main__': - unittest.main() diff --git a/src/python/src/grpc/framework/assembly/interfaces.py b/src/python/src/grpc/framework/assembly/interfaces.py deleted file mode 100644 index c469dc4fd2b..00000000000 --- a/src/python/src/grpc/framework/assembly/interfaces.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -# TODO(nathaniel): The assembly layer only exists to smooth out wrinkles in -# the face layer. The two should be squashed together as soon as manageable. -"""Interfaces for assembling RPC Framework values.""" - -import abc - -from grpc.framework.foundation import activated - - -class Server(activated.Activated): - """The server interface. - - Aside from being able to be activated and deactivated, objects of this type - are able to report the port on which they are servicing RPCs. - """ - __metaclass__ = abc.ABCMeta - - # TODO(issue 726): This is an abstraction violation; not every Server is - # necessarily serving over a network at all. - @abc.abstractmethod - def port(self): - """Identifies the port on which this Server is servicing RPCs. - - This method may only be called while the server is active. - - Returns: - The number of the port on which this Server is servicing RPCs. - """ - raise NotImplementedError() diff --git a/src/python/src/grpc/framework/face/implementations.py b/src/python/src/grpc/framework/face/implementations.py index e8d91a3c91c..4a6de52974e 100644 --- a/src/python/src/grpc/framework/face/implementations.py +++ b/src/python/src/grpc/framework/face/implementations.py @@ -213,14 +213,14 @@ class _DynamicStub(interfaces.DynamicStub): self._pool = pool def __getattr__(self, attr): - cardinality = self._cardinalities.get(attr) - if cardinality is cardinality.Cardinality.UNARY_UNARY: + method_cardinality = self._cardinalities.get(attr) + if method_cardinality is cardinality.Cardinality.UNARY_UNARY: return _UnaryUnaryMultiCallable(self._front, attr) - elif cardinality is cardinality.Cardinality.UNARY_STREAM: + elif method_cardinality is cardinality.Cardinality.UNARY_STREAM: return _UnaryStreamMultiCallable(self._front, attr) - elif cardinality is cardinality.Cardinality.STREAM_UNARY: + elif method_cardinality is cardinality.Cardinality.STREAM_UNARY: return _StreamUnaryMultiCallable(self._front, attr, self._pool) - elif cardinality is cardinality.Cardinality.STREAM_STREAM: + elif method_cardinality is cardinality.Cardinality.STREAM_STREAM: return _StreamStreamMultiCallable(self._front, attr, self._pool) else: raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr) @@ -315,4 +315,4 @@ def dynamic_stub(cardinalities, front, pool, prefix): An interfaces.DynamicStub that performs RPCs via the given base_interfaces.Front. """ - return _DynamicStub(cardinalities, front, pool, prefix) + return _DynamicStub(cardinalities, front, pool) diff --git a/src/python/src/setup.py b/src/python/src/setup.py index cdb82a9dc35..a513a2811bd 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -64,7 +64,6 @@ _PACKAGES = ( 'grpc._junkdrawer', 'grpc.early_adopter', 'grpc.framework', - 'grpc.framework.assembly', 'grpc.framework.base', 'grpc.framework.base.packets', 'grpc.framework.common', diff --git a/test/core/support/slice_buffer_test.c b/test/core/support/slice_buffer_test.c index 8301795dbfd..a48278434f1 100644 --- a/test/core/support/slice_buffer_test.c +++ b/test/core/support/slice_buffer_test.c @@ -62,8 +62,13 @@ int main(int argc, char **argv) { } GPR_ASSERT(buf.count > 0); GPR_ASSERT(buf.length == 50); - gpr_slice_unref(aaa); - gpr_slice_unref(bb); + for (i = 0; i < 10; i++) { + gpr_slice_buffer_pop(&buf); + gpr_slice_unref(aaa); + gpr_slice_unref(bb); + } + GPR_ASSERT(buf.count == 0); + GPR_ASSERT(buf.length == 0); gpr_slice_buffer_destroy(&buf); return 0; diff --git a/tools/gce_setup/grpc_docker.sh b/tools/gce_setup/grpc_docker.sh index 318bfc8868f..3deef05ef35 100755 --- a/tools/gce_setup/grpc_docker.sh +++ b/tools/gce_setup/grpc_docker.sh @@ -921,6 +921,17 @@ grpc_interop_gen_ruby_cmd() { echo $the_cmd } +# constructs the full dockerized python interop test cmd. +# +# call-seq: +# flags= .... # generic flags to include the command +# cmd=$($grpc_gen_test_cmd $flags) +grpc_interop_gen_python_cmd() { + local cmd_prefix="sudo docker run grpc/python bin/bash -l -c" + local the_cmd="$cmd_prefix 'python -B -m interop.client --use_test_ca --use_tls $@'" + echo $the_cmd +} + # constructs the full dockerized java interop test cmd. # # call-seq: diff --git a/tools/gce_setup/interop_test_runner.sh b/tools/gce_setup/interop_test_runner.sh index 90a78b1264d..430ad09b8c6 100755 --- a/tools/gce_setup/interop_test_runner.sh +++ b/tools/gce_setup/interop_test_runner.sh @@ -36,7 +36,7 @@ echo $result_file_name main() { source grpc_docker.sh test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response) - clients=(cxx java go ruby node csharp_mono) + clients=(cxx java go ruby node python csharp_mono) servers=(cxx java go ruby node python) for test_case in "${test_cases[@]}" do diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json index 69022af12e1..ef483d9799c 100755 --- a/tools/run_tests/python_tests.json +++ b/tools/run_tests/python_tests.json @@ -26,9 +26,6 @@ { "module": "grpc.early_adopter.implementations_test" }, - { - "module": "grpc.framework.assembly.implementations_test" - }, { "module": "grpc.framework.base.packets.implementations_test" },