Merge github.com:google/grpc into async-api

pull/357/head
Craig Tiller 10 years ago
commit e863e1b855
  1. 6
      include/grpc/support/port_platform.h
  2. 1
      src/core/iomgr/pollset_kick.c
  3. 9
      src/core/iomgr/wakeup_fd_nospecial.c
  4. 8
      src/core/iomgr/wakeup_fd_pipe.c
  5. 2
      src/core/iomgr/wakeup_fd_pipe.h
  6. 14
      src/core/iomgr/wakeup_fd_posix.c
  7. 31
      src/core/iomgr/wakeup_fd_posix.h
  8. 2
      src/core/support/log_posix.c
  9. 11
      src/python/src/_adapter/_links_test.py
  10. 2
      src/python/src/_adapter/_lonely_rear_link_test.py
  11. 3
      src/python/src/_adapter/fore.py
  12. 49
      src/python/src/_framework/base/interfaces.py
  13. 44
      src/python/src/_framework/base/interfaces_test.py
  14. 25
      src/python/src/_framework/base/packets/_ends.py
  15. 2
      src/python/src/_framework/base/packets/_ingestion.py
  16. 5
      src/python/src/_framework/base/packets/_interfaces.py
  17. 82
      src/python/src/_framework/base/packets/_termination.py
  18. 34
      src/python/src/_framework/base/packets/_transmission.py
  19. 7
      src/python/src/_framework/base/packets/packets.py
  20. 15
      src/python/src/_framework/base/util.py
  21. 11
      src/python/src/_framework/face/_calls.py
  22. 28
      src/python/src/_framework/face/_control.py
  23. 43
      src/python/src/_framework/face/interfaces.py
  24. 24
      src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
  25. 1
      test/core/iomgr/poll_kick_posix_test.c
  26. 3
      tools/run_tests/build_python.sh

@ -56,6 +56,8 @@
#define GPR_CPU_LINUX 1
#define GPR_GCC_SYNC 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_WAKEUP_FD 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1
@ -68,7 +70,7 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LINUX 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_HAS_SPECIAL_WAKEUP_FD 1
#define GPR_POSIX_WAKEUP_FD 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
@ -86,6 +88,8 @@
#define GPR_GCC_ATOMIC 1
#define GPR_POSIX_LOG 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
#define GPR_POSIX_WAKEUP_FD 1
#define GPR_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1

@ -138,6 +138,7 @@ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
}
void grpc_pollset_kick_global_init_fallback_fd(void) {
gpr_mu_init(&fd_freelist_mu);
grpc_wakeup_fd_global_init_force_fallback();
}

@ -38,16 +38,17 @@
#include <grpc/support/port_platform.h>
#ifndef GPR_POSIX_HAS_SPECIAL_WAKEUP_FD
#ifdef GPR_POSIX_NO_SPECIAL_WAKEUP_FD
#include "src/core/iomgr/wakeup_fd.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <stddef.h>
static int check_availability_invalid(void) {
return 0;
}
const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = {
const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
NULL, NULL, NULL, NULL, check_availability_invalid
};
#endif /* GPR_POSIX_HAS_SPECIAL_WAKEUP */
#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */

@ -31,7 +31,10 @@
*
*/
/* TODO(klempner): Allow this code to be disabled. */
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_WAKEUP_FD
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <errno.h>
@ -87,7 +90,8 @@ static int pipe_check_availability(void) {
return 1;
}
const grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable = {
const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
};
#endif /* GPR_POSIX_WAKUP_FD */

@ -36,6 +36,6 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
extern grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable;
extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ */

@ -31,6 +31,10 @@
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_WAKEUP_FD
#include "src/core/iomgr/wakeup_fd_posix.h"
#include "src/core/iomgr/wakeup_fd_pipe.h"
#include <stddef.h>
@ -38,15 +42,15 @@
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
void grpc_wakeup_fd_global_init(void) {
if (specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &specialized_wakeup_fd_vtable;
if (grpc_specialized_wakeup_fd_vtable.check_availability()) {
wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
} else {
wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
}
}
void grpc_wakeup_fd_global_init_force_fallback(void) {
wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
}
void grpc_wakeup_fd_global_destroy(void) {
@ -68,3 +72,5 @@ void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
#endif /* GPR_POSIX_WAKEUP_FD */

@ -62,29 +62,14 @@
#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
/* Force using the fallback implementation. This is intended for testing
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
/* Private structures; don't access their fields directly outside of wakeup fd
* code. */
struct grpc_wakeup_fd_info {
int read_fd;
int write_fd;
};
typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
typedef struct grpc_wakeup_fd_vtable {
void (*create)(grpc_wakeup_fd_info *fd_info);
@ -95,8 +80,20 @@ typedef struct grpc_wakeup_fd_vtable {
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
struct grpc_wakeup_fd_info {
int read_fd;
int write_fd;
};
#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */
extern const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable;
extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ */

@ -64,7 +64,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
va_end(args);
if (ret < 0) {
message = NULL;
} else if (ret <= sizeof(buf) - 1) {
} else if ((size_t)ret <= sizeof(buf) - 1) {
message = buf;
} else {
message = allocated = gpr_malloc(ret + 1);

@ -80,8 +80,8 @@ class RoundTripTest(unittest.TestCase):
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
None, None, _TIMEOUT)
test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
@ -133,8 +133,9 @@ class RoundTripTest(unittest.TestCase):
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
None, test_front_to_back_datum, _TIMEOUT)
test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
@ -196,7 +197,7 @@ class RoundTripTest(unittest.TestCase):
commencement_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method,
interfaces.FULL, None, None, _TIMEOUT)
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():

@ -69,7 +69,7 @@ class LonelyRearLinkTest(unittest.TestCase):
front_to_back_ticket = packets.FrontToBackPacket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.FULL, None, None, _TIMEOUT)
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with fore_link.condition:

@ -116,7 +116,8 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers[method])
ticket = tickets.FrontToBackPacket(
call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None,
call, 0, tickets.Kind.COMMENCEMENT, method,
interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)

@ -29,27 +29,24 @@
"""Interfaces defined and used by the base layer of RPC Framework."""
# TODO(nathaniel): Use Python's new enum library for enumerated types rather
# than constants merely placed close together.
import abc
import enum
# stream is referenced from specification in this module.
from _framework.foundation import stream # pylint: disable=unused-import
# Operation outcomes.
COMPLETED = 'completed'
CANCELLED = 'cancelled'
EXPIRED = 'expired'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
# Subscription categories.
FULL = 'full'
TERMINATION_ONLY = 'termination only'
NONE = 'none'
@enum.unique
class Outcome(enum.Enum):
"""Operation outcomes."""
COMPLETED = 'completed'
CANCELLED = 'cancelled'
EXPIRED = 'expired'
RECEPTION_FAILURE = 'reception failure'
TRANSMISSION_FAILURE = 'transmission failure'
SERVICER_FAILURE = 'servicer failure'
SERVICED_FAILURE = 'serviced failure'
class OperationContext(object):
@ -70,9 +67,7 @@ class OperationContext(object):
"""Adds a function to be called upon operation termination.
Args:
callback: A callable that will be passed one of COMPLETED, CANCELLED,
EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or
SERVICED_FAILURE.
callback: A callable that will be passed an Outcome value.
"""
raise NotImplementedError()
@ -167,11 +162,20 @@ class ServicedSubscription(object):
"""A sum type representing a serviced's interest in an operation.
Attributes:
category: One of FULL, TERMINATION_ONLY, or NONE.
ingestor: A ServicedIngestor. Must be present if category is FULL.
kind: A Kind value.
ingestor: A ServicedIngestor. Must be present if kind is Kind.FULL. Must
be None if kind is Kind.TERMINATION_ONLY or Kind.NONE.
"""
__metaclass__ = abc.ABCMeta
@enum.unique
class Kind(enum.Enum):
"""Kinds of subscription."""
FULL = 'full'
TERMINATION_ONLY = 'termination only'
NONE = 'none'
class End(object):
"""Common type for entry-point objects on both sides of an operation."""
@ -182,9 +186,8 @@ class End(object):
"""Reports the number of terminated operations broken down by outcome.
Returns:
A dictionary from operation outcome constant (COMPLETED, CANCELLED,
EXPIRED, and so on) to an integer representing the number of operations
that terminated with that outcome.
A dictionary from Outcome value to an integer identifying the number
of operations that terminated with that outcome.
"""
raise NotImplementedError()

@ -49,13 +49,13 @@ TRIGGERED_FAILURE = 'triggered failure'
WAIT_ON_CONDITION = 'wait on condition'
EMPTY_OUTCOME_DICT = {
interfaces.COMPLETED: 0,
interfaces.CANCELLED: 0,
interfaces.EXPIRED: 0,
interfaces.RECEPTION_FAILURE: 0,
interfaces.TRANSMISSION_FAILURE: 0,
interfaces.SERVICER_FAILURE: 0,
interfaces.SERVICED_FAILURE: 0,
interfaces.Outcome.COMPLETED: 0,
interfaces.Outcome.CANCELLED: 0,
interfaces.Outcome.EXPIRED: 0,
interfaces.Outcome.RECEPTION_FAILURE: 0,
interfaces.Outcome.TRANSMISSION_FAILURE: 0,
interfaces.Outcome.SERVICER_FAILURE: 0,
interfaces.Outcome.SERVICED_FAILURE: 0,
}
@ -169,7 +169,8 @@ class FrontAndBackTest(object):
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
@ -183,7 +184,7 @@ class FrontAndBackTest(object):
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The packet arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.COMPLETED] = 1
second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the packet had arrived at the back and the back had
@ -204,8 +205,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
@ -226,8 +229,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertEqual(
1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual(test_payloads, test_consumer.values())
def testCancellation(self):
@ -242,7 +247,8 @@ class FrontAndBackTest(object):
operation.cancel()
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED])
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.CANCELLED])
util.wait_for_idle(self.back)
self.assertListEqual([], test_consumer.calls)
@ -260,7 +266,7 @@ class FrontAndBackTest(object):
# The back started processing based on the first packet and then stopped
# upon receiving the cancellation packet.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.CANCELLED] = 1
second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
@ -292,8 +298,10 @@ class FrontAndBackTest(object):
duration = termination_time_cell[0] - start_time
self.assertLessEqual(timeout, duration)
self.assertLess(duration, timeout + allowance)
self.assertEqual(interfaces.EXPIRED, outcome_cell[0])
self.assertEqual(interfaces.Outcome.EXPIRED, outcome_cell[0])
util.wait_for_idle(self.front)
self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED])
self.assertEqual(
1, self.front.operation_stats()[interfaces.Outcome.EXPIRED])
util.wait_for_idle(self.back)
self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED])
self.assertLessEqual(
1, self.back.operation_stats()[interfaces.Outcome.EXPIRED])

@ -51,13 +51,13 @@ from _framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
_OPERATION_OUTCOMES = (
base_interfaces.COMPLETED,
base_interfaces.CANCELLED,
base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE,
base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE,
base_interfaces.SERVICED_FAILURE,
base_interfaces.Outcome.COMPLETED,
base_interfaces.Outcome.CANCELLED,
base_interfaces.Outcome.EXPIRED,
base_interfaces.Outcome.RECEPTION_FAILURE,
base_interfaces.Outcome.TRANSMISSION_FAILURE,
base_interfaces.Outcome.SERVICER_FAILURE,
base_interfaces.Outcome.SERVICED_FAILURE,
)
@ -193,10 +193,10 @@ def _front_operate(
lock = threading.Lock()
with lock:
termination_manager = _termination.front_termination_manager(
work_pool, utility_pool, termination_action, subscription.category)
work_pool, utility_pool, termination_action, subscription.kind)
transmission_manager = _transmission.front_transmission_manager(
lock, transmission_pool, callback, operation_id, name,
subscription.category, trace_id, timeout, termination_manager)
subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE,
termination_manager, transmission_manager)
@ -225,9 +225,10 @@ def _front_operate(
transmission_manager.inmit(payload, complete)
returned_reception_manager = (
None if subscription.category == base_interfaces.NONE
else reception_manager)
if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE:
returned_reception_manager = None
else:
returned_reception_manager = reception_manager
return _FrontManagement(
returned_reception_manager, emission_manager, operation_context,

@ -111,7 +111,7 @@ class _FrontConsumerCreator(_ConsumerCreator):
def create_consumer(self, requirement):
"""See _ConsumerCreator.create_consumer for specification."""
if self._subscription.category == interfaces.FULL:
if self._subscription.kind is interfaces.ServicedSubscription.Kind.FULL:
try:
return _ConsumerCreation(
self._subscription.ingestor.consumer(self._operation_context),

@ -58,10 +58,7 @@ class TerminationManager(object):
immediately.
Args:
callback: A callable that will be passed one of base_interfaces.COMPLETED,
base_interfaces.CANCELLED, base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE.
callback: A callable that will be passed a base_interfaces.Outcome value.
"""
raise NotImplementedError()

@ -29,6 +29,8 @@
"""State and behavior for operation termination."""
import enum
from _framework.base import interfaces
from _framework.base.packets import _constants
from _framework.base.packets import _interfaces
@ -37,26 +39,32 @@ from _framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
# TODO(nathaniel): enum module.
_EMISSION = 'emission'
_TRANSMISSION = 'transmission'
_INGESTION = 'ingestion'
_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,)
_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,)
_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,)
_KINDS_TO_OUTCOMES = {
packets.Kind.COMPLETION: interfaces.COMPLETED,
packets.Kind.CANCELLATION: interfaces.CANCELLED,
packets.Kind.EXPIRATION: interfaces.EXPIRED,
packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE,
packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED,
packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED,
packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED,
packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE,
packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE,
packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE,
packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE,
}
@enum.unique
class _Requirement(enum.Enum):
"""Symbols indicating events required for termination."""
EMISSION = 'emission'
TRANSMISSION = 'transmission'
INGESTION = 'ingestion'
_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,)
_BACK_NOT_LISTENING_REQUIREMENTS = (
_Requirement.EMISSION, _Requirement.INGESTION,)
_LISTENING_REQUIREMENTS = (
_Requirement.TRANSMISSION, _Requirement.INGESTION,)
class _TerminationManager(_interfaces.TerminationManager):
"""An implementation of _interfaces.TerminationManager."""
@ -68,9 +76,8 @@ class _TerminationManager(_interfaces.TerminationManager):
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION
identifying what must finish for the operation to be considered
completed.
requirements: A combination of _Requirement values identifying what
must finish for the operation to be considered completed.
local_failure: A packets.Kind specifying what constitutes local failure of
customer work.
"""
@ -137,21 +144,21 @@ class _TerminationManager(_interfaces.TerminationManager):
def emission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_EMISSION)
self._outstanding_requirements.discard(_Requirement.EMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def transmission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_TRANSMISSION)
self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def ingestion_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
self._outstanding_requirements.discard(_INGESTION)
self._outstanding_requirements.discard(_Requirement.INGESTION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
@ -163,39 +170,46 @@ class _TerminationManager(_interfaces.TerminationManager):
self._terminate(kind)
def front_termination_manager(work_pool, utility_pool, action, subscription):
def front_termination_manager(
work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for front-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or
interfaces.NONE.
subscription_kind: An interfaces.ServicedSubscription.Kind value.
Returns:
A TerminationManager appropriate for front-side use.
"""
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
requirements = _FRONT_NOT_LISTENING_REQUIREMENTS
else:
requirements = _LISTENING_REQUIREMENTS
return _TerminationManager(
work_pool, utility_pool, action,
_FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
_LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE)
work_pool, utility_pool, action, requirements,
packets.Kind.SERVICED_FAILURE)
def back_termination_manager(work_pool, utility_pool, action, subscription):
def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for back-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
subscription: One of interfaces.FULL, interfaces.termination_only, or
interfaces.NONE.
subscription_kind: An interfaces.ServicedSubscription.Kind value.
Returns:
A TerminationManager appropriate for back-side use.
"""
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
requirements = _BACK_NOT_LISTENING_REQUIREMENTS
else:
requirements = _LISTENING_REQUIREMENTS
return _TerminationManager(
work_pool, utility_pool, action,
_BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
_LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE)
work_pool, utility_pool, action, requirements,
packets.Kind.SERVICER_FAILURE)

@ -91,20 +91,19 @@ class _Packetizer(object):
class _FrontPacketizer(_Packetizer):
"""Front-side packet-creating behavior."""
def __init__(self, name, subscription, trace_id, timeout):
def __init__(self, name, subscription_kind, trace_id, timeout):
"""Constructor.
Args:
name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent
from the back.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
"""
self._name = name
self._subscription = subscription
self._subscription_kind = subscription_kind
self._trace_id = trace_id
self._timeout = timeout
@ -114,13 +113,13 @@ class _FrontPacketizer(_Packetizer):
return packets.FrontToBackPacket(
operation_id, sequence_number,
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
self._name, self._subscription, self._trace_id, payload,
self._name, self._subscription_kind, self._trace_id, payload,
self._timeout)
else:
return packets.FrontToBackPacket(
operation_id, 0,
packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
self._name, self._subscription, self._trace_id, payload,
self._name, self._subscription_kind, self._trace_id, payload,
self._timeout)
def packetize_abortion(self, operation_id, sequence_number, kind):
@ -335,8 +334,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
def front_transmission_manager(
lock, pool, callback, operation_id, name, subscription, trace_id, timeout,
termination_manager):
lock, pool, callback, operation_id, name, subscription_kind, trace_id,
timeout, termination_manager):
"""Creates a TransmissionManager appropriate for front-side use.
Args:
@ -347,9 +346,8 @@ def front_transmission_manager(
of the operation.
operation_id: The operation's ID.
name: The name of the operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent
from the back.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@ -361,12 +359,13 @@ def front_transmission_manager(
"""
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _FrontPacketizer(
name, subscription, trace_id, timeout),
name, subscription_kind, trace_id, timeout),
termination_manager)
def back_transmission_manager(
lock, pool, callback, operation_id, termination_manager, subscription):
lock, pool, callback, operation_id, termination_manager,
subscription_kind):
"""Creates a TransmissionManager appropriate for back-side use.
Args:
@ -378,14 +377,13 @@ def back_transmission_manager(
operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent from
the back.
subscription_kind: An interfaces.ServicedSubscription.Kind value
describing the interest the front has in packets sent from the back.
Returns:
A TransmissionManager appropriate for back-side use.
"""
if subscription == interfaces.NONE:
if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
return _EmptyTransmissionManager()
else:
return _TransmittingTransmissionManager(

@ -71,10 +71,9 @@ class FrontToBackPacket(
Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE.
name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
or Kind.ENTIRE. Must be None for any other kind.
subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
interfaces.NONE describing the interest the front has in packets sent from
the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE.
Must be None for any other kind.
subscription: An interfaces.ServicedSubscription.Kind value describing the
interest the front has in packets sent from the back. Must be present if
kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
payload: A customer payload object. Must be present if kind is

@ -36,13 +36,14 @@ from _framework.base import interfaces
class _ServicedSubscription(
collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']),
collections.namedtuple('_ServicedSubscription', ['kind', 'ingestor']),
interfaces.ServicedSubscription):
"""See interfaces.ServicedSubscription for specification."""
_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None)
_NONE_SUBSCRIPTION = _ServicedSubscription(
interfaces.ServicedSubscription.Kind.NONE, None)
_TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription(
interfaces.TERMINATION_ONLY, None)
interfaces.ServicedSubscription.Kind.TERMINATION_ONLY, None)
def none_serviced_subscription():
@ -72,12 +73,14 @@ def full_serviced_subscription(ingestor):
"""Creates a "full" interfaces.ServicedSubscription object.
Args:
ingestor: A ServicedIngestor.
ingestor: An interfaces.ServicedIngestor.
Returns:
A ServicedSubscription object indicating a full subscription.
An interfaces.ServicedSubscription object indicating a full
subscription.
"""
return _ServicedSubscription(interfaces.FULL, ingestor)
return _ServicedSubscription(
interfaces.ServicedSubscription.Kind.FULL, ingestor)
def wait_for_idle(end):

@ -94,7 +94,7 @@ class _OperationCancellableIterator(interfaces.CancellableIterator):
def cancel(self):
self._operation.cancel()
self._rendezvous.set_outcome(base_interfaces.CANCELLED)
self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED)
class _OperationFuture(future.Future):
@ -150,15 +150,12 @@ class _OperationFuture(future.Future):
"""Indicates to this object that the operation has terminated.
Args:
operation_outcome: One of base_interfaces.COMPLETED,
base_interfaces.CANCELLED, base_interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE
indicating the categorical outcome of the operation.
operation_outcome: A base_interfaces.Outcome value indicating the
outcome of the operation.
"""
with self._condition:
if (self._outcome is None and
operation_outcome != base_interfaces.COMPLETED):
operation_outcome is not base_interfaces.Outcome.COMPLETED):
self._outcome = future.raised(
_control.abortion_outcome_to_exception(operation_outcome))
self._condition.notify_all()

@ -40,13 +40,17 @@ from _framework.foundation import stream
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
_OPERATION_OUTCOME_TO_RPC_ABORTION = {
base_interfaces.CANCELLED: interfaces.CANCELLED,
base_interfaces.EXPIRED: interfaces.EXPIRED,
base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE,
base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE,
base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
}
base_interfaces.Outcome.CANCELLED: interfaces.Abortion.CANCELLED,
base_interfaces.Outcome.EXPIRED: interfaces.Abortion.EXPIRED,
base_interfaces.Outcome.RECEPTION_FAILURE:
interfaces.Abortion.NETWORK_FAILURE,
base_interfaces.Outcome.TRANSMISSION_FAILURE:
interfaces.Abortion.NETWORK_FAILURE,
base_interfaces.Outcome.SERVICED_FAILURE:
interfaces.Abortion.SERVICED_FAILURE,
base_interfaces.Outcome.SERVICER_FAILURE:
interfaces.Abortion.SERVICER_FAILURE,
}
def _as_operation_termination_callback(rpc_abortion_callback):
@ -59,13 +63,13 @@ def _as_operation_termination_callback(rpc_abortion_callback):
def _abortion_outcome_to_exception(abortion_outcome):
if abortion_outcome == base_interfaces.CANCELLED:
if abortion_outcome == base_interfaces.Outcome.CANCELLED:
return exceptions.CancellationError()
elif abortion_outcome == base_interfaces.EXPIRED:
elif abortion_outcome == base_interfaces.Outcome.EXPIRED:
return exceptions.ExpirationError()
elif abortion_outcome == base_interfaces.SERVICER_FAILURE:
elif abortion_outcome == base_interfaces.Outcome.SERVICER_FAILURE:
return exceptions.ServicerError()
elif abortion_outcome == base_interfaces.SERVICED_FAILURE:
elif abortion_outcome == base_interfaces.Outcome.SERVICED_FAILURE:
return exceptions.ServicedError()
else:
return exceptions.NetworkError()
@ -133,7 +137,7 @@ class Rendezvous(stream.Consumer):
def set_outcome(self, outcome):
with self._condition:
if outcome != base_interfaces.COMPLETED:
if outcome is not base_interfaces.Outcome.COMPLETED:
self._abortion = outcome
self._condition.notify()

@ -30,6 +30,7 @@
"""Interfaces for the face layer of RPC Framework."""
import abc
import enum
# exceptions, abandonment, and future are referenced from specification in this
# module.
@ -58,14 +59,15 @@ class CancellableIterator(object):
raise NotImplementedError()
# Constants that categorize RPC abortion.
# TODO(nathaniel): Learn and use Python's enum library for this de facto
# enumerated type
CANCELLED = 'abortion: cancelled'
EXPIRED = 'abortion: expired'
NETWORK_FAILURE = 'abortion: network failure'
SERVICED_FAILURE = 'abortion: serviced failure'
SERVICER_FAILURE = 'abortion: servicer failure'
@enum.unique
class Abortion(enum.Enum):
"""Categories of RPC abortion."""
CANCELLED = 'cancelled'
EXPIRED = 'expired'
NETWORK_FAILURE = 'network failure'
SERVICED_FAILURE = 'serviced failure'
SERVICER_FAILURE = 'servicer failure'
class RpcContext(object):
@ -93,9 +95,8 @@ class RpcContext(object):
"""Registers a callback to be called if the RPC is aborted.
Args:
abortion_callback: A callable to be called and passed one of CANCELLED,
EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the
event of RPC abortion.
abortion_callback: A callable to be called and passed an Abortion value
in the event of RPC abortion.
"""
raise NotImplementedError()
@ -474,9 +475,8 @@ class Stub(object):
request: The request value for the RPC.
response_callback: A callback to be called to accept the response value
of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@ -494,9 +494,8 @@ class Stub(object):
request: The request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@ -513,9 +512,8 @@ class Stub(object):
name: The RPC method name.
response_callback: A callback to be called to accept the response value
of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@ -533,9 +531,8 @@ class Stub(object):
name: The RPC method name.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
abortion_callback: A callback to be called to accept one of CANCELLED,
EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
abortion.
abortion_callback: A callback to be called and passed an Abortion value
in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:

@ -176,7 +176,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -190,7 +190,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@ -202,7 +202,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -217,7 +217,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.consume(request)
callback.block_until_terminated()
self.assertEqual(interfaces.EXPIRED, callback.abortion())
self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
@ -231,7 +231,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -245,7 +245,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@ -262,7 +262,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate()
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -279,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate()
callback.block_until_terminated()
self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testParallelInvocations(self):
for name, test_messages_sequence in (
@ -321,7 +321,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -335,7 +335,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@ -351,7 +351,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@ -364,4 +364,4 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
self.assertEqual(interfaces.CANCELLED, callback.abortion())
self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())

@ -105,6 +105,7 @@ static void test_over_free(void) {
grpc_pollset_kick_post_poll(&kick_state[i]);
grpc_pollset_kick_destroy(&kick_state[i]);
}
gpr_free(kick_state);
}
static void run_tests(void) {

@ -7,6 +7,5 @@ cd $(dirname $0)/../..
root=`pwd`
virtualenv python2.7_virtual_environment
python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0
python2.7_virtual_environment/bin/pip install third_party/protobuf/python
python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0 protobuf==2.6.1
python2.7_virtual_environment/bin/pip install src/python

Loading…
Cancel
Save