resolve merge confict

pull/1066/head
Yang Gao 10 years ago
commit d672d8fbe2
  1. 20
      include/grpc++/completion_queue.h
  2. 27
      src/compiler/python_generator.cc
  3. 15
      src/cpp/common/completion_queue.cc
  4. 5
      src/python/interop/interop/_insecure_interop_test.py
  5. 6
      src/python/interop/interop/_secure_interop_test.py
  6. 7
      src/python/interop/interop/client.py
  7. 38
      src/python/interop/interop/methods.py
  8. 5
      src/python/interop/interop/server.py
  9. 82
      src/python/src/grpc/early_adopter/_face_utilities.py
  10. 5
      src/python/src/grpc/early_adopter/_reexport.py
  11. 59
      src/python/src/grpc/early_adopter/implementations.py
  12. 8
      src/python/src/grpc/early_adopter/implementations_test.py
  13. 58
      test/cpp/end2end/async_end2end_test.cc
  14. 2
      tools/run_tests/build_python.sh
  15. 2
      tools/run_tests/run_python.sh

@ -34,6 +34,7 @@
#ifndef GRPCXX_COMPLETION_QUEUE_H
#define GRPCXX_COMPLETION_QUEUE_H
#include <chrono>
#include <grpc++/impl/client_unary_call.h>
struct grpc_completion_queue;
@ -75,10 +76,21 @@ class CompletionQueue {
explicit CompletionQueue(grpc_completion_queue* take);
~CompletionQueue();
// Blocking read from queue.
// Returns true if an event was received, false if the queue is ready
// for destruction.
bool Next(void** tag, bool* ok);
// Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
enum NextStatus {SHUTDOWN, GOT_EVENT, TIMEOUT};
// Nonblocking (until deadline) read from queue.
// Cannot rely on result of tag or ok if return is TIMEOUT
NextStatus AsyncNext(void **tag, bool *ok,
std::chrono::system_clock::time_point deadline);
// Blocking (until deadline) read from queue.
// Returns false if the queue is ready for destruction, true if event
bool Next(void **tag, bool *ok) {
return (AsyncNext(tag,ok,
std::chrono::system_clock::time_point::max()) !=
SHUTDOWN);
}
// Shutdown has to be called, and the CompletionQueue can only be
// destructed when false is returned from Next().

@ -229,7 +229,8 @@ bool GetModuleAndMessagePath(const Descriptor* type,
return true;
}
bool PrintServerFactory(const ServiceDescriptor* service, Printer* out) {
bool PrintServerFactory(const std::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
out->Print("def early_adopter_create_$Service$_server(servicer, port, "
"root_certificates, key_chain_pairs):\n",
"Service", service->name());
@ -293,17 +294,18 @@ bool PrintServerFactory(const ServiceDescriptor* service, Printer* out) {
out->Print("),\n");
}
out->Print("}\n");
// out->Print("return implementations.insecure_server("
// "method_service_descriptions, port)\n");
out->Print(
"return implementations.secure_server("
"method_service_descriptions, port, root_certificates,"
" key_chain_pairs)\n");
"\"$PackageQualifiedServiceName$\","
" method_service_descriptions, port, root_certificates,"
" key_chain_pairs)\n",
"PackageQualifiedServiceName", package_qualified_service_name);
}
return true;
}
bool PrintStubFactory(const ServiceDescriptor* service, Printer* out) {
bool PrintStubFactory(const std::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
map<std::string, std::string> dict = ListToDict({
"Service", service->name(),
});
@ -369,7 +371,9 @@ bool PrintStubFactory(const ServiceDescriptor* service, Printer* out) {
out->Print("}\n");
out->Print(
"return implementations.insecure_stub("
"method_invocation_descriptions, host, port)\n");
"\"$PackageQualifiedServiceName$\","
" method_invocation_descriptions, host, port)\n",
"PackageQualifiedServiceName", package_qualified_service_name);
}
return true;
}
@ -392,13 +396,18 @@ pair<bool, std::string> GetServices(const FileDescriptor* file) {
if (!PrintPreamble(file, &out)) {
return make_pair(false, "");
}
auto package = file->package();
if (!package.empty()) {
package = package.append(".");
}
for (int i = 0; i < file->service_count(); ++i) {
auto service = file->service(i);
auto package_qualified_service_name = package + service->name();
if (!(PrintServicer(service, &out) &&
PrintServer(service, &out) &&
PrintStub(service, &out) &&
PrintServerFactory(service, &out) &&
PrintStubFactory(service, &out))) {
PrintServerFactory(package_qualified_service_name, service, &out) &&
PrintStubFactory(package_qualified_service_name, service, &out))) {
return make_pair(false, "");
}
}

@ -57,19 +57,26 @@ class EventDeleter {
}
};
bool CompletionQueue::Next(void** tag, bool* ok) {
CompletionQueue::NextStatus
CompletionQueue::AsyncNext(void** tag, bool* ok,
std::chrono::system_clock::time_point deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
gpr_timespec gpr_deadline;
Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
if (!ev) { /* got a NULL back because deadline passed */
return TIMEOUT;
}
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
return false;
return SHUTDOWN;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
return GOT_EVENT;
}
}
}

@ -42,11 +42,12 @@ class InsecureInteropTest(
unittest.TestCase):
def setUp(self):
self.server = implementations.insecure_server(methods.SERVER_METHODS, 0)
self.server = implementations.insecure_server(
methods.SERVICE_NAME, methods.SERVER_METHODS, 0)
self.server.start()
port = self.server.port()
self.stub = implementations.insecure_stub(
methods.CLIENT_METHODS, 'localhost', port)
methods.SERVICE_NAME, methods.CLIENT_METHODS, 'localhost', port)
def tearDown(self):
self.server.stop()

@ -46,12 +46,12 @@ class SecureInteropTest(
def setUp(self):
self.server = implementations.secure_server(
methods.SERVER_METHODS, 0, resources.private_key(),
resources.certificate_chain())
methods.SERVICE_NAME, methods.SERVER_METHODS, 0,
resources.private_key(), resources.certificate_chain())
self.server.start()
port = self.server.port()
self.stub = implementations.secure_stub(
methods.CLIENT_METHODS, 'localhost', port,
methods.SERVICE_NAME, methods.CLIENT_METHODS, 'localhost', port,
resources.test_root_certificates(), None, None,
server_host_override=_SERVER_HOST_OVERRIDE)

@ -67,12 +67,13 @@ def _stub(args):
root_certificates = resources.prod_root_certificates()
stub = implementations.secure_stub(
methods.CLIENT_METHODS, args.server_host, args.server_port,
root_certificates, None, None,
methods.SERVICE_NAME, methods.CLIENT_METHODS, args.server_host,
args.server_port, root_certificates, None, None,
server_host_override=args.server_host_override)
else:
stub = implementations.insecure_stub(
methods.CLIENT_METHODS, args.server_host, args.server_port)
methods.SERVICE_NAME, methods.CLIENT_METHODS, args.server_host,
args.server_port)
return stub

@ -122,31 +122,31 @@ _SERVER_HALF_DUPLEX_CALL = utilities.stream_stream_service_description(
messages_pb2.StreamingOutputCallResponse.SerializeToString)
_SERVICE_NAME = '/grpc.testing.TestService'
SERVICE_NAME = 'grpc.testing.TestService'
EMPTY_CALL_METHOD_NAME = _SERVICE_NAME + '/EmptyCall'
UNARY_CALL_METHOD_NAME = _SERVICE_NAME + '/UnaryCall'
STREAMING_OUTPUT_CALL_METHOD_NAME = _SERVICE_NAME + '/StreamingOutputCall'
STREAMING_INPUT_CALL_METHOD_NAME = _SERVICE_NAME + '/StreamingInputCall'
FULL_DUPLEX_CALL_METHOD_NAME = _SERVICE_NAME + '/FullDuplexCall'
HALF_DUPLEX_CALL_METHOD_NAME = _SERVICE_NAME + '/HalfDuplexCall'
_EMPTY_CALL_METHOD_NAME = 'EmptyCall'
_UNARY_CALL_METHOD_NAME = 'UnaryCall'
_STREAMING_OUTPUT_CALL_METHOD_NAME = 'StreamingOutputCall'
_STREAMING_INPUT_CALL_METHOD_NAME = 'StreamingInputCall'
_FULL_DUPLEX_CALL_METHOD_NAME = 'FullDuplexCall'
_HALF_DUPLEX_CALL_METHOD_NAME = 'HalfDuplexCall'
CLIENT_METHODS = {
EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
_EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
_UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
_STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
_STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
_FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
_HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
}
SERVER_METHODS = {
EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
_EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
_UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
_STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
_STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
_FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
_HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
}

@ -54,10 +54,11 @@ def serve():
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
server = implementations.secure_server(
methods.SERVER_METHODS, args.port, private_key, certificate_chain)
methods.SERVICE_NAME, methods.SERVER_METHODS, args.port, private_key,
certificate_chain)
else:
server = implementations.insecure_server(
methods.SERVER_METHODS, args.port)
methods.SERVICE_NAME, methods.SERVER_METHODS, args.port)
server.start()
logging.info('Server serving.')

@ -38,16 +38,28 @@ from grpc.early_adopter import _reexport
from grpc.early_adopter import interfaces
def _qualified_name(service_name, method_name):
return '/%s/%s' % (service_name, method_name)
# TODO(nathaniel): This structure is getting bloated; it could be shrunk if
# implementations._Stub used a generic rather than a dynamic underlying
# face-layer stub.
class InvocationBreakdown(object):
"""An intermediate representation of invocation-side views of RPC methods.
Attributes:
cardinalities: A dictionary from RPC method name to interfaces.Cardinality
value.
request_serializers: A dictionary from RPC method name to callable
behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing response values for the RPC.
qualified_names: A dictionary from unqualified RPC method name to
service-qualified RPC method name.
face_cardinalities: A dictionary from service-qualified RPC method name to
to cardinality.Cardinality value.
request_serializers: A dictionary from service-qualified RPC method name to
callable behavior to be used serializing request values for the RPC.
response_deserializers: A dictionary from service-qualified RPC method name
to callable behavior to be used deserializing response values for the
RPC.
"""
__metaclass__ = abc.ABCMeta
@ -56,7 +68,8 @@ class _EasyInvocationBreakdown(
InvocationBreakdown,
collections.namedtuple(
'_EasyInvocationBreakdown',
('cardinalities', 'request_serializers', 'response_deserializers'))):
('cardinalities', 'qualified_names', 'face_cardinalities',
'request_serializers', 'response_deserializers'))):
pass
@ -64,12 +77,12 @@ class ServiceBreakdown(object):
"""An intermediate representation of service-side views of RPC methods.
Attributes:
implementations: A dictionary from RPC method name to
implementations: A dictionary from service-qualified RPC method name to
face_interfaces.MethodImplementation implementing the RPC method.
request_deserializers: A dictionary from RPC method name to callable
behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from RPC method name to callable
behavior to be used serializing response values for the RPC.
request_deserializers: A dictionary from service-qualified RPC method name
to callable behavior to be used deserializing request values for the RPC.
response_serializers: A dictionary from service-qualified RPC method name
to callable behavior to be used serializing response values for the RPC.
"""
__metaclass__ = abc.ABCMeta
@ -82,10 +95,11 @@ class _EasyServiceBreakdown(
pass
def break_down_invocation(method_descriptions):
def break_down_invocation(service_name, method_descriptions):
"""Derives an InvocationBreakdown from several RPC method descriptions.
Args:
service_name: The package-qualified full name of the service.
method_descriptions: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs.
@ -93,17 +107,26 @@ def break_down_invocation(method_descriptions):
An InvocationBreakdown corresponding to the given method descriptions.
"""
cardinalities = {}
qualified_names = {}
face_cardinalities = {}
request_serializers = {}
response_deserializers = {}
for name, method_description in method_descriptions.iteritems():
qualified_name = _qualified_name(service_name, name)
method_cardinality = method_description.cardinality()
cardinalities[name] = method_description.cardinality()
request_serializers[name] = method_description.serialize_request
response_deserializers[name] = method_description.deserialize_response
qualified_names[name] = qualified_name
face_cardinalities[qualified_name] = _reexport.common_cardinality(
method_cardinality)
request_serializers[qualified_name] = method_description.serialize_request
response_deserializers[qualified_name] = (
method_description.deserialize_response)
return _EasyInvocationBreakdown(
cardinalities, request_serializers, response_deserializers)
cardinalities, qualified_names, face_cardinalities, request_serializers,
response_deserializers)
def break_down_service(method_descriptions):
def break_down_service(service_name, method_descriptions):
"""Derives a ServiceBreakdown from several RPC method descriptions.
Args:
@ -117,37 +140,44 @@ def break_down_service(method_descriptions):
request_deserializers = {}
response_serializers = {}
for name, method_description in method_descriptions.iteritems():
cardinality = method_description.cardinality()
if cardinality is interfaces.Cardinality.UNARY_UNARY:
qualified_name = _qualified_name(service_name, name)
method_cardinality = method_description.cardinality()
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_unary):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = face_utilities.unary_unary_inline(service)
elif cardinality is interfaces.Cardinality.UNARY_STREAM:
implementations[qualified_name] = face_utilities.unary_unary_inline(
service)
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
def service(
request, face_rpc_context,
service_behavior=method_description.service_unary_stream):
return service_behavior(
request, _reexport.rpc_context(face_rpc_context))
implementations[name] = face_utilities.unary_stream_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_UNARY:
implementations[qualified_name] = face_utilities.unary_stream_inline(
service)
elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_unary):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = face_utilities.stream_unary_inline(service)
elif cardinality is interfaces.Cardinality.STREAM_STREAM:
implementations[qualified_name] = face_utilities.stream_unary_inline(
service)
elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
def service(
request_iterator, face_rpc_context,
service_behavior=method_description.service_stream_stream):
return service_behavior(
request_iterator, _reexport.rpc_context(face_rpc_context))
implementations[name] = face_utilities.stream_stream_inline(service)
request_deserializers[name] = method_description.deserialize_request
response_serializers[name] = method_description.serialize_response
implementations[qualified_name] = face_utilities.stream_stream_inline(
service)
request_deserializers[qualified_name] = (
method_description.deserialize_request)
response_serializers[qualified_name] = (
method_description.serialize_response)
return _EasyServiceBreakdown(
implementations, request_deserializers, response_serializers)

@ -174,6 +174,11 @@ class _StreamUnarySyncAsync(interfaces.StreamUnarySyncAsync):
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))
def common_cardinality(early_adopter_cardinality):
return _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[
early_adopter_cardinality]
def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():

@ -146,8 +146,7 @@ class _Stub(interfaces.Stub):
self._rear_link.join_fore_link(self._front)
self._rear_link.start()
self._understub = _face_implementations.dynamic_stub(
_reexport.common_cardinalities(self._breakdown.cardinalities),
self._front, self._pool, '')
self._breakdown.face_cardinalities, self._front, self._pool, '')
else:
raise ValueError('Tried to __enter__ already-__enter__ed Stub!')
return self
@ -171,17 +170,9 @@ class _Stub(interfaces.Stub):
if self._pool is None:
raise ValueError('Tried to __getattr__ non-__enter__ed Stub!')
else:
underlying_attr = getattr(self._understub, attr, None)
method_cardinality = self._breakdown.cardinalities.get(attr)
# TODO(nathaniel): Eliminate this trick.
if underlying_attr is None:
for method_name, method_cardinality in self._breakdown.cardinalities.iteritems():
last_slash_index = method_name.rfind('/')
if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr:
underlying_attr = getattr(self._understub, method_name)
break
else:
raise AttributeError(attr)
underlying_attr = getattr(
self._understub, self._breakdown.qualified_names.get(attr), None)
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _reexport.unary_unary_sync_async(underlying_attr)
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
@ -198,44 +189,49 @@ class _Stub(interfaces.Stub):
def _build_stub(
methods, host, port, secure, root_certificates, private_key,
service_name, methods, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=None):
breakdown = _face_utilities.break_down_invocation(methods)
breakdown = _face_utilities.break_down_invocation(service_name, methods)
return _Stub(
breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
def _build_server(methods, port, private_key, certificate_chain):
breakdown = _face_utilities.break_down_service(methods)
def _build_server(service_name, methods, port, private_key, certificate_chain):
breakdown = _face_utilities.break_down_service(service_name, methods)
return _Server(breakdown, port, private_key, certificate_chain)
def insecure_stub(methods, host, port):
def insecure_stub(service_name, methods, host, port):
"""Constructs an insecure interfaces.Stub.
Args:
service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
supported by the created stub.
supported by the created stub. The RPC method names in the dictionary are
not qualified by the service name or decorated in any other way.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
Returns:
An interfaces.Stub affording RPC invocation.
"""
return _build_stub(methods, host, port, False, None, None, None)
return _build_stub(
service_name, methods, host, port, False, None, None, None)
def secure_stub(
methods, host, port, root_certificates, private_key, certificate_chain,
server_host_override=None):
service_name, methods, host, port, root_certificates, private_key,
certificate_chain, server_host_override=None):
"""Constructs an insecure interfaces.Stub.
Args:
service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
supported by the created stub.
supported by the created stub. The RPC method names in the dictionary are
not qualified by the service name or decorated in any other way.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
root_certificates: The PEM-encoded root certificates or None to ask for
@ -251,17 +247,19 @@ def secure_stub(
An interfaces.Stub affording RPC invocation.
"""
return _build_stub(
methods, host, port, True, root_certificates, private_key,
service_name, methods, host, port, True, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
def insecure_server(methods, port):
def insecure_server(service_name, methods, port):
"""Constructs an insecure interfaces.Server.
Args:
service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
be serviced by the created server. The RPC method names in the dictionary
are not qualified by the service name or decorated in any other way.
port: The desired port on which to serve or zero to ask for a port to
be automatically selected.
@ -269,16 +267,18 @@ def insecure_server(methods, port):
An interfaces.Server that will run with no security and
service unsecured raw requests.
"""
return _build_server(methods, port, None, None)
return _build_server(service_name, methods, port, None, None)
def secure_server(methods, port, private_key, certificate_chain):
def secure_server(service_name, methods, port, private_key, certificate_chain):
"""Constructs a secure interfaces.Server.
Args:
service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodServiceDescription describing the RPCs to
be serviced by the created server.
be serviced by the created server. The RPC method names in the dictionary
are not qualified by the service name or decorated in any other way.
port: The port on which to serve or zero to ask for a port to be
automatically selected.
private_key: A pem-encoded private key.
@ -287,4 +287,5 @@ def secure_server(methods, port, private_key, certificate_chain):
Returns:
An interfaces.Server that will serve secure traffic.
"""
return _build_server(methods, port, private_key, certificate_chain)
return _build_server(
service_name, methods, port, private_key, certificate_chain)

@ -37,6 +37,8 @@ from grpc.early_adopter import implementations
from grpc.early_adopter import utilities
from grpc._junkdrawer import math_pb2
SERVICE_NAME = 'math.Math'
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
@ -104,10 +106,12 @@ _TIMEOUT = 3
class EarlyAdopterImplementationsTest(unittest.TestCase):
def setUp(self):
self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0)
self.server = implementations.insecure_server(
SERVICE_NAME, _SERVICE_DESCRIPTIONS, 0)
self.server.start()
port = self.server.port()
self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port)
self.stub = implementations.insecure_stub(
SERVICE_NAME, _INVOCATION_DESCRIPTIONS, 'localhost', port)
def tearDown(self):
self.server.stop()

@ -76,6 +76,20 @@ void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
EXPECT_EQ(tag(i), got_tag);
}
void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok,
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::time_point::max(),
CompletionQueue::NextStatus expected_outcome =
CompletionQueue::GOT_EVENT) {
bool ok;
void* got_tag;
EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
if (expected_outcome == CompletionQueue::GOT_EVENT) {
EXPECT_EQ(expect_ok, ok);
EXPECT_EQ(tag(i), got_tag);
}
}
class AsyncEnd2endTest : public ::testing::Test {
protected:
AsyncEnd2endTest() : service_(&srv_cq_) {}
@ -166,6 +180,50 @@ TEST_F(AsyncEnd2endTest, SequentialRpcs) {
SendRpc(10);
}
// Test a simple RPC using the async version of Next
TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
ResetStub();
EchoRequest send_request;
EchoRequest recv_request;
EchoResponse send_response;
EchoResponse recv_response;
Status recv_status;
ClientContext cli_ctx;
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
response_reader(stub_->AsyncEcho(&cli_ctx, send_request,
&cli_cq_, tag(1)));
std::chrono::system_clock::time_point
time_now(std::chrono::system_clock::now()),
time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5));
verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
tag(2));
verify_timed_ok(&srv_cq_, 2, true, time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
verify_timed_ok(&cli_cq_, 1, true, time_limit);
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
verify_timed_ok(&srv_cq_, 3, true);
response_reader->Finish(&recv_response, &recv_status, tag(4));
verify_timed_ok(&cli_cq_, 4, true);
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.IsOk());
}
// Two pings and a final pong.
TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();

@ -38,5 +38,5 @@ rm -rf python2.7_virtual_environment
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install enum34==1.0.4 futures==2.2.0 protobuf==3.0.0-alpha-1
CFLAGS=-I$root/include LDFLAGS=-L$root/libs/opt pip install src/python/src
CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
pip install src/python/interop

@ -34,6 +34,6 @@ set -ex
cd $(dirname $0)/../..
root=`pwd`
export LD_LIBRARY_PATH=$root/libs/opt
export LD_LIBRARY_PATH=$root/libs/$CONFIG
source python2.7_virtual_environment/bin/activate
python2.7 -B $*

Loading…
Cancel
Save