Merge github.com:grpc/grpc into error

pull/6897/head
Craig Tiller 9 years ago
commit 34b11dfb55
  1. 4
      examples/cpp/helloworld/greeter_async_client.cc
  2. 4
      examples/cpp/helloworld/greeter_async_server.cc
  3. 3
      setup.py
  4. 42
      src/compiler/objective_c_generator.cc
  5. 78
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  6. 3
      src/core/ext/transport/chttp2/transport/internal.h
  7. 2
      src/core/lib/iomgr/error.c
  8. 2
      src/core/lib/iomgr/error.h
  9. 15
      src/csharp/Grpc.Core.Tests/ServerTest.cs
  10. 3
      src/csharp/Grpc.Core/Server.cs
  11. 284
      src/python/grpcio/tests/unit/_cython/cygrpc_test.py
  12. 34
      src/python/grpcio/tests/unit/_cython/test_utilities.py
  13. 2
      tools/distrib/python/check_grpcio_tools.py
  14. 10
      tools/distrib/python/grpcio_tools/README.rst
  15. 5
      tools/distrib/python/grpcio_tools/grpc/tools/protoc.py
  16. 4
      tools/distrib/python/grpcio_tools/protoc_lib_deps.py
  17. 36
      tools/distrib/python/grpcio_tools/setup.py
  18. 35
      tools/distrib/python/make_grpcio_tools.py
  19. 7
      tools/dockerfile/grpc_artifact_python_manylinux_x64/Dockerfile
  20. 7
      tools/dockerfile/grpc_artifact_python_manylinux_x86/Dockerfile
  21. 4
      tools/run_tests/build_artifact_python.sh

@ -87,7 +87,9 @@ class GreeterClient {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
cq.Next(&got_tag, &ok);
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or the cq_ is shutting down.
GPR_ASSERT(cq.Next(&got_tag, &ok));
// Verify that the result from "cq" corresponds, by its tag, our previous
// request.

@ -160,7 +160,9 @@ class ServerImpl final {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok);
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}

@ -234,8 +234,7 @@ setuptools.setup(
ext_modules=CYTHON_EXTENSION_MODULES,
packages=list(PACKAGES),
package_dir=PACKAGE_DIRECTORIES,
# TODO(atash): Figure out why auditwheel doesn't like namespace packages.
#namespace_packages=['grpc'],
namespace_packages=['grpc'],
package_data=PACKAGE_DATA,
install_requires=INSTALL_REQUIRES,
setup_requires=SETUP_REQUIRES,

@ -60,9 +60,34 @@ void PrintProtoRpcDeclarationAsPragma(Printer *printer,
" returns ($server_stream$$response_type$)\n\n");
}
template <typename DescriptorType>
static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
std::vector<grpc::string> comments;
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED,
&comments);
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING,
&comments);
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING,
&comments);
if (comments.empty()) {
return;
}
printer->Print("/**\n");
for (auto it = comments.begin(); it != comments.end(); ++it) {
printer->Print(" * ");
size_t start_pos = it->find_first_not_of(' ');
if (start_pos != grpc::string::npos) {
printer->Print(it->c_str() + start_pos);
}
printer->Print("\n");
}
printer->Print(" */\n");
}
void PrintMethodSignature(Printer *printer, const MethodDescriptor *method,
const map< ::grpc::string, ::grpc::string> &vars) {
// TODO(jcanizales): Print method comments.
// Print comment
PrintAllComments(method, printer);
printer->Print(vars, "- ($return_type$)$method_name$With");
if (method->client_streaming()) {
@ -195,8 +220,10 @@ void PrintMethodImplementations(Printer *printer,
printer.Print("@end\n\n");
printer.Print(
"// Basic service implementation, over gRPC, that only does"
" marshalling and parsing.\n");
"/**\n"
" * Basic service implementation, over gRPC, that only does\n"
" * marshalling and parsing.\n"
" */\n");
printer.Print(vars,
"@interface $service_class$ :"
" GRPCProtoService<$service_class$>\n");
@ -220,18 +247,13 @@ void PrintMethodImplementations(Printer *printer,
{"service_class", ServiceClassName(service)},
{"package", service->file()->package()}};
printer.Print(vars,
"static NSString *const kPackageName = @\"$package$\";\n");
printer.Print(
vars, "static NSString *const kServiceName = @\"$service_name$\";\n\n");
printer.Print(vars, "@implementation $service_class$\n\n");
printer.Print("// Designated initializer\n");
printer.Print("- (instancetype)initWithHost:(NSString *)host {\n");
printer.Print(
printer.Print(vars,
" return (self = [super initWithHost:host"
" packageName:kPackageName serviceName:kServiceName]);\n");
" packageName:@\"$package$\" serviceName:@\"$service_name$\"]);\n");
printer.Print("}\n\n");
printer.Print(
"// Override superclass initializer to disallow different"

@ -47,6 +47,7 @@
#include "src/core/ext/transport/chttp2/transport/internal.h"
#include "src/core/ext/transport/chttp2/transport/status_conversion.h"
#include "src/core/ext/transport/chttp2/transport/timeout_encoding.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
@ -105,7 +106,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status);
grpc_status_code status,
gpr_slice *optional_message);
static void close_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
@ -161,6 +163,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(t->ep == NULL);
gpr_slice_unref(t->optional_drop_message);
gpr_slice_buffer_destroy(&t->global.qbuf);
gpr_slice_buffer_destroy(&t->writing.outbuf);
@ -261,6 +265,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->writing.is_client = is_client;
t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
&t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
is_client ? "client_transport" : "server_transport");
@ -869,7 +874,7 @@ static void maybe_start_some_streams(
grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
&stream_global)) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE);
GRPC_STATUS_UNAVAILABLE, NULL);
}
}
@ -953,7 +958,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_from_api(exec_ctx, transport_global, stream_global,
op->cancel_with_status);
op->cancel_with_status, op->optional_close_message);
}
if (op->close_with_status != GRPC_STATUS_OK) {
@ -977,7 +982,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
stream_global->seen_error = true;
@ -1036,7 +1041,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
"(%lu vs. %lu)",
metadata_size, metadata_peer_limit);
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
} else {
if (contains_non_ok_status(transport_global,
op->send_trailing_metadata)) {
@ -1228,7 +1233,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
grpc_chttp2_incoming_metadata_buffer_publish(
@ -1267,7 +1272,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->exceeded_metadata_size) {
cancel_from_api(exec_ctx, transport_global, stream_global,
GRPC_STATUS_RESOURCE_EXHAUSTED);
GRPC_STATUS_RESOURCE_EXHAUSTED, NULL);
}
}
if (stream_global->all_incoming_byte_streams_finished) {
@ -1334,7 +1339,8 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
static void cancel_from_api(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status) {
grpc_status_code status,
gpr_slice *optional_message) {
if (!stream_global->read_closed || !stream_global->write_closed) {
if (stream_global->id != 0) {
gpr_slice_buffer_add(
@ -1344,8 +1350,12 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
(uint32_t)grpc_chttp2_grpc_status_to_http2_error(status),
&stream_global->stats.outgoing));
}
if (optional_message) {
gpr_slice_ref(*optional_message);
}
grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
NULL);
optional_message);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
stream_global->seen_error = true;
@ -1571,8 +1581,12 @@ static void close_from_api(grpc_exec_ctx *exec_ctx,
static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
void *user_data,
grpc_chttp2_stream_global *stream_global) {
grpc_chttp2_transport *transport = TRANSPORT_FROM_GLOBAL(transport_global);
cancel_from_api(user_data, transport_global, stream_global,
GRPC_STATUS_UNAVAILABLE);
GRPC_STATUS_UNAVAILABLE,
GPR_SLICE_IS_EMPTY(transport->optional_drop_message)
? NULL
: &transport->optional_drop_message);
}
static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
@ -1651,18 +1665,58 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx,
}
}
static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t) {
grpc_http_parser parser;
size_t i = 0;
grpc_error *error = GRPC_ERROR_NONE;
grpc_http_response response;
memset(&response, 0, sizeof(response));
grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
grpc_error *parse_error = GRPC_ERROR_NONE;
for (; i < t->read_buffer.count && parse_error == GRPC_ERROR_NONE; i++) {
parse_error = grpc_http_parser_parse(&parser, t->read_buffer.slices[i]);
}
if (parse_error == GRPC_ERROR_NONE &&
(parse_error = grpc_http_parser_eof(&parser)) == GRPC_ERROR_NONE) {
error = grpc_error_set_int(
GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
GRPC_ERROR_INT_HTTP_STATUS, response.status);
}
GRPC_ERROR_UNREF(parse_error);
grpc_http_parser_destroy(&parser);
grpc_http_response_destroy(&response);
return error;
}
static void parsing_action(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_chttp2_transport *t = arg;
GPR_TIMER_BEGIN("reading_action.parse", 0);
size_t i = 0;
grpc_error *errors[2] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE};
grpc_error *errors[3] = {GRPC_ERROR_REF(error), GRPC_ERROR_NONE,
GRPC_ERROR_NONE};
for (; i < t->read_buffer.count && errors[1] == GRPC_ERROR_NONE; i++) {
errors[1] = grpc_chttp2_perform_read(exec_ctx, &t->parsing,
t->read_buffer.slices[i]);
};
if (i != t->read_buffer.count) {
gpr_slice_unref(t->optional_drop_message);
errors[2] = try_http_parsing(exec_ctx, t);
if (errors[2] != GRPC_ERROR_NONE) {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received http1.x response");
} else {
t->optional_drop_message = gpr_slice_from_copied_string(
"Connection dropped: received unparseable response");
}
}
grpc_error *err =
errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE
errors[0] == GRPC_ERROR_NONE && errors[1] == GRPC_ERROR_NONE &&
errors[2] == GRPC_ERROR_NONE
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
GPR_ARRAY_SIZE(errors));

@ -383,6 +383,9 @@ struct grpc_chttp2_transport {
/** Transport op to be applied post-parsing */
grpc_transport_op *post_parsing_op;
/** Message explaining the reason of dropping connection */
gpr_slice optional_drop_message;
};
typedef struct {

@ -113,6 +113,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "fd";
case GRPC_ERROR_INT_WSA_ERROR:
return "wsa_error";
case GRPC_ERROR_INT_HTTP_STATUS:
return "http_status";
}
GPR_UNREACHABLE_CODE(return "unknown");
}

@ -90,6 +90,8 @@ typedef enum {
GRPC_ERROR_INT_WSA_ERROR,
/// File descriptor associated with this error
GRPC_ERROR_INT_FD,
/// HTTP status (i.e. 404)
GRPC_ERROR_INT_HTTP_STATUS,
} grpc_error_ints;
typedef enum {

@ -93,5 +93,20 @@ namespace Grpc.Core.Tests
server.ShutdownAsync().Wait();
}
[Test]
public void UnstartedServerCanBeShutdown()
{
var server = new Server();
server.ShutdownAsync().Wait();
Assert.Throws(typeof(InvalidOperationException), () => server.Start());
}
[Test]
public void UnstartedServerDoesNotPreventShutdown()
{
// just create a server, don't start it, and make sure it doesn't prevent shutdown.
var server = new Server();
}
}
}

@ -140,6 +140,7 @@ namespace Grpc.Core
lock (myLock)
{
GrpcPreconditions.CheckState(!startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
startRequested = true;
handle.Start();
@ -203,7 +204,6 @@ namespace Grpc.Core
{
lock (myLock)
{
GrpcPreconditions.CheckState(startRequested);
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
@ -215,7 +215,6 @@ namespace Grpc.Core
{
handle.CancelAllCalls();
}
await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
DisposeHandle();

@ -143,22 +143,60 @@ class TypeSmokeTest(unittest.TestCase):
del completion_queue
class InsecureServerInsecureClient(unittest.TestCase):
class ServerClientMixin(object):
def setUp(self):
def setUpMixin(self, server_credentials, client_credentials, host_override):
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server()
self.server.register_completion_queue(self.server_completion_queue)
self.port = self.server.add_http2_port('[::]:0')
if server_credentials:
self.port = self.server.add_http2_port('[::]:0', server_credentials)
else:
self.port = self.server.add_http2_port('[::]:0')
self.server.start()
self.client_completion_queue = cygrpc.CompletionQueue()
self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port))
def tearDown(self):
if client_credentials:
client_channel_arguments = cygrpc.ChannelArgs([
cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override,
host_override)])
self.client_channel = cygrpc.Channel(
'localhost:{}'.format(self.port), client_channel_arguments,
client_credentials)
else:
self.client_channel = cygrpc.Channel('localhost:{}'.format(self.port))
if host_override:
self.host_argument = None # default host
self.expected_host = host_override
else:
# arbitrary host name necessitating no further identification
self.host_argument = b'hostess'
self.expected_host = self.host_argument
def tearDownMixin(self):
del self.server
del self.client_completion_queue
del self.server_completion_queue
def _perform_operations(self, operations, call, queue, deadline, description):
"""Perform the list of operations with given call, queue, and deadline.
Invocation errors are reported with as an exception with `description` in
the message. Performs the operations asynchronously, returning a future.
"""
def performer():
tag = object()
try:
call_result = call.start_batch(cygrpc.Operations(operations), tag)
self.assertEqual(cygrpc.CallError.ok, call_result)
event = queue.poll(deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete, event.type)
self.assertTrue(event.success)
self.assertIs(tag, event.tag)
except Exception as error:
raise Exception("Error in '{}': {}".format(description, error.message))
return event
return test_utilities.SimpleFuture(performer)
def testEcho(self):
DEADLINE = time.time()+5
DEADLINE_TOLERANCE = 0.25
@ -175,7 +213,6 @@ class InsecureServerInsecureClient(unittest.TestCase):
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
HOST = b'hostess'
cygrpc_deadline = cygrpc.Timespec(DEADLINE)
@ -188,7 +225,8 @@ class InsecureServerInsecureClient(unittest.TestCase):
client_call_tag = object()
client_call = self.client_channel.create_call(
None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline)
None, 0, self.client_completion_queue, METHOD, self.host_argument,
cygrpc_deadline)
client_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
CLIENT_METADATA_ASCII_VALUE),
@ -216,7 +254,8 @@ class InsecureServerInsecureClient(unittest.TestCase):
test_common.metadata_transmitted(client_initial_metadata,
request_event.request_metadata))
self.assertEqual(METHOD, request_event.request_call_details.method)
self.assertEqual(HOST, request_event.request_call_details.host)
self.assertEqual(self.expected_host,
request_event.request_call_details.host)
self.assertLess(
abs(DEADLINE - float(request_event.request_call_details.deadline)),
DEADLINE_TOLERANCE)
@ -292,172 +331,101 @@ class InsecureServerInsecureClient(unittest.TestCase):
del client_call
del server_call
class SecureServerSecureClient(unittest.TestCase):
def setUp(self):
server_credentials = cygrpc.server_credentials_ssl(
None, [cygrpc.SslPemKeyCertPair(resources.private_key(),
resources.certificate_chain())], False)
channel_credentials = cygrpc.channel_credentials_ssl(
resources.test_root_certificates(), None)
self.server_completion_queue = cygrpc.CompletionQueue()
self.server = cygrpc.Server()
self.server.register_completion_queue(self.server_completion_queue)
self.port = self.server.add_http2_port('[::]:0', server_credentials)
self.server.start()
self.client_completion_queue = cygrpc.CompletionQueue()
client_channel_arguments = cygrpc.ChannelArgs([
cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override,
_SSL_HOST_OVERRIDE)])
self.client_channel = cygrpc.Channel(
'localhost:{}'.format(self.port), client_channel_arguments,
channel_credentials)
def tearDown(self):
del self.server
del self.client_completion_queue
del self.server_completion_queue
def testEcho(self):
def test6522(self):
DEADLINE = time.time()+5
DEADLINE_TOLERANCE = 0.25
CLIENT_METADATA_ASCII_KEY = b'key'
CLIENT_METADATA_ASCII_VALUE = b'val'
CLIENT_METADATA_BIN_KEY = b'key-bin'
CLIENT_METADATA_BIN_VALUE = b'\0'*1000
SERVER_INITIAL_METADATA_KEY = b'init_me_me_me'
SERVER_INITIAL_METADATA_VALUE = b'whodawha?'
SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = b'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok
SERVER_STATUS_DETAILS = b'our work is never over'
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'/twinkies'
HOST = None # Default host
METHOD = b'twinkies'
cygrpc_deadline = cygrpc.Timespec(DEADLINE)
empty_metadata = cygrpc.Metadata([])
server_request_tag = object()
request_call_result = self.server.request_call(
self.server.request_call(
self.server_completion_queue, self.server_completion_queue,
server_request_tag)
client_call = self.client_channel.create_call(
None, 0, self.client_completion_queue, METHOD, self.host_argument,
cygrpc_deadline)
self.assertEqual(cygrpc.CallError.ok, request_call_result)
plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, '')
call_credentials = cygrpc.call_credentials_metadata_plugin(plugin)
# Prologue
def perform_client_operations(operations, description):
return self._perform_operations(
operations, client_call,
self.client_completion_queue, cygrpc_deadline, description)
client_call_tag = object()
client_call = self.client_channel.create_call(
None, 0, self.client_completion_queue, METHOD, HOST, cygrpc_deadline)
client_call.set_credentials(call_credentials)
client_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY,
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
cygrpc.operation_send_initial_metadata(client_initial_metadata,
_EMPTY_FLAGS),
cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
self.client_completion_queue, cygrpc_deadline)
client_event_future = perform_client_operations([
cygrpc.operation_send_initial_metadata(empty_metadata,
_EMPTY_FLAGS),
cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
self.assertEqual(cygrpc.CompletionType.operation_complete,
request_event.type)
self.assertIsInstance(request_event.operation_call, cygrpc.Call)
self.assertIs(server_request_tag, request_event.tag)
self.assertEqual(0, len(request_event.batch_operations))
client_metadata_with_credentials = list(client_initial_metadata) + [
(_CALL_CREDENTIALS_METADATA_KEY, _CALL_CREDENTIALS_METADATA_VALUE)]
self.assertTrue(
test_common.metadata_transmitted(client_metadata_with_credentials,
request_event.request_metadata))
self.assertEqual(METHOD, request_event.request_call_details.method)
self.assertEqual(_SSL_HOST_OVERRIDE,
request_event.request_call_details.host)
self.assertLess(
abs(DEADLINE - float(request_event.request_call_details.deadline)),
DEADLINE_TOLERANCE)
server_call_tag = object()
server_call = request_event.operation_call
server_initial_metadata = cygrpc.Metadata([
cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY,
SERVER_INITIAL_METADATA_VALUE)])
server_trailing_metadata = cygrpc.Metadata([
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
cygrpc.operation_send_initial_metadata(server_initial_metadata,
_EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
server_trailing_metadata, SERVER_STATUS_CODE,
SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
client_event = client_event_future.result()
server_event = self.server_completion_queue.poll(cygrpc_deadline)
def perform_server_operations(operations, description):
return self._perform_operations(
operations, server_call,
self.server_completion_queue, cygrpc_deadline, description)
self.assertEqual(6, len(client_event.batch_operations))
found_client_op_types = set()
for client_result in client_event.batch_operations:
# we expect each op type to be unique
self.assertNotIn(client_result.type, found_client_op_types)
found_client_op_types.add(client_result.type)
if client_result.type == cygrpc.OperationType.receive_initial_metadata:
self.assertTrue(
test_common.metadata_transmitted(server_initial_metadata,
client_result.received_metadata))
elif client_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(RESPONSE, client_result.received_message.bytes())
elif client_result.type == cygrpc.OperationType.receive_status_on_client:
self.assertTrue(
test_common.metadata_transmitted(server_trailing_metadata,
client_result.received_metadata))
self.assertEqual(SERVER_STATUS_DETAILS,
client_result.received_status_details)
self.assertEqual(SERVER_STATUS_CODE, client_result.received_status_code)
self.assertEqual(set([
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.send_message,
cygrpc.OperationType.send_close_from_client,
cygrpc.OperationType.receive_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.receive_status_on_client
]), found_client_op_types)
server_event_future = perform_server_operations([
cygrpc.operation_send_initial_metadata(empty_metadata,
_EMPTY_FLAGS),
], "Server prologue")
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
self.assertNotIn(client_result.type, found_server_op_types)
found_server_op_types.add(server_result.type)
if server_result.type == cygrpc.OperationType.receive_message:
self.assertEqual(REQUEST, server_result.received_message.bytes())
elif server_result.type == cygrpc.OperationType.receive_close_on_server:
self.assertFalse(server_result.received_cancelled)
self.assertEqual(set([
cygrpc.OperationType.send_initial_metadata,
cygrpc.OperationType.receive_message,
cygrpc.OperationType.send_message,
cygrpc.OperationType.receive_close_on_server,
cygrpc.OperationType.send_status_from_server
]), found_server_op_types)
client_event_future.result() # force completion
server_event_future.result()
del client_call
del server_call
# Messaging
for _ in range(10):
client_event_future = perform_client_operations([
cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
], "Client message")
server_event_future = perform_server_operations([
cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
cygrpc.operation_receive_message(_EMPTY_FLAGS),
], "Server receive")
client_event_future.result() # force completion
server_event_future.result()
# Epilogue
client_event_future = perform_client_operations([
cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
], "Client epilogue")
server_event_future = perform_server_operations([
cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")
client_event_future.result() # force completion
server_event_future.result()
class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin):
def setUp(self):
self.setUpMixin(None, None, None)
def tearDown(self):
self.tearDownMixin()
class SecureServerSecureClient(unittest.TestCase, ServerClientMixin):
def setUp(self):
server_credentials = cygrpc.server_credentials_ssl(
None, [cygrpc.SslPemKeyCertPair(resources.private_key(),
resources.certificate_chain())], False)
client_credentials = cygrpc.channel_credentials_ssl(
resources.test_root_certificates(), None)
self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE)
def tearDown(self):
self.tearDownMixin()
if __name__ == '__main__':

@ -32,15 +32,35 @@ import threading
from grpc._cython import cygrpc
class CompletionQueuePollFuture:
class SimpleFuture(object):
"""A simple future mechanism."""
def __init__(self, completion_queue, deadline):
def poller_function():
self._event_result = completion_queue.poll(deadline)
self._event_result = None
self._thread = threading.Thread(target=poller_function)
def __init__(self, function, *args, **kwargs):
def wrapped_function():
try:
self._result = function(*args, **kwargs)
except Exception as error:
self._error = error
self._result = None
self._error = None
self._thread = threading.Thread(target=wrapped_function)
self._thread.start()
def result(self):
"""The resulting value of this future.
Re-raises any exceptions.
"""
self._thread.join()
return self._event_result
if self._error:
# TODO(atash): re-raise exceptions in a way that preserves tracebacks
raise self._error
return self._result
class CompletionQueuePollFuture(SimpleFuture):
def __init__(self, completion_queue, deadline):
super(CompletionQueuePollFuture, self).__init__(
lambda: completion_queue.poll(deadline))

@ -37,7 +37,7 @@ OUT_OF_DATE_MESSAGE = """file {} is out of date
Have you called tools/distrib/python/make_grpcio_tools.py since upgrading protobuf?"""
check_protoc_lib_deps_content = make.get_deps(make.BAZEL_DEPS_PROTOC_LIB_QUERY)
check_protoc_lib_deps_content = make.get_deps()
with open(make.GRPC_PYTHON_PROTOC_LIB_DEPS, 'r') as protoc_lib_deps_file:
if protoc_lib_deps_file.read() != check_protoc_lib_deps_content:

@ -126,3 +126,13 @@ Help, I ...
GCC 6.0), this is probably a bug where GCC chokes on constant expressions
when the :code:`-fwrapv` flag is specified. You should consider setting your
environment with :code:`CFLAGS=-fno-wrapv` or using clang (:code:`CC=clang`).
Usage
-----
Given protobuf include directories :code:`$INCLUDE`, an output directory
:code:`$OUTPUT`, and proto files :code:`$PROTO_FILES`, invoke as:
::
$ python -m grpc.tools.protoc -I$INCLUDE --python_out=$OUTPUT --grpc_python_out=$OUTPUT $PROTO_FILES

@ -29,10 +29,13 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import pkg_resources
import sys
from grpc.tools import protoc_compiler
if __name__ == '__main__':
protoc_compiler.run_main(sys.argv)
proto_include = pkg_resources.resource_filename('grpc.tools', '_proto')
protoc_compiler.run_main(
sys.argv + ['-I{}'.format(proto_include)])

File diff suppressed because one or more lines are too long

@ -28,9 +28,11 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from distutils import extension
import errno
import os
import os.path
import shlex
import shutil
import sys
import setuptools
@ -47,18 +49,41 @@ sys.path.insert(0, os.path.abspath('.'))
# ourselves in w.r.t. the multitude of operating systems this ought to build on.
# By default we assume a GCC-like compiler.
EXTRA_COMPILE_ARGS = shlex.split(os.environ.get('GRPC_PYTHON_CFLAGS',
'-frtti -std=c++11'))
'-fno-wrapv -frtti -std=c++11'))
EXTRA_LINK_ARGS = shlex.split(os.environ.get('GRPC_PYTHON_LDFLAGS',
'-lpthread'))
GRPC_PYTHON_TOOLS_PACKAGE = 'grpc.tools'
GRPC_PYTHON_PROTO_RESOURCES_NAME = '_proto'
import protoc_lib_deps
import grpc_version
def package_data():
tools_path = GRPC_PYTHON_TOOLS_PACKAGE.replace('.', os.path.sep)
proto_resources_path = os.path.join(tools_path,
GRPC_PYTHON_PROTO_RESOURCES_NAME)
proto_files = []
for proto_file in protoc_lib_deps.PROTO_FILES:
source = os.path.join(protoc_lib_deps.PROTO_INCLUDE, proto_file)
target = os.path.join(proto_resources_path, proto_file)
relative_target = os.path.join(GRPC_PYTHON_PROTO_RESOURCES_NAME, proto_file)
try:
os.makedirs(os.path.dirname(target))
except OSError as error:
if error.errno == errno.EEXIST:
pass
else:
raise
shutil.copy(source, target)
proto_files.append(relative_target)
return {GRPC_PYTHON_TOOLS_PACKAGE: proto_files}
def protoc_ext_module():
plugin_sources = [
'grpc/tools/main.cc',
'grpc_root/src/compiler/python_generator.cc'] + [
os.path.join('third_party/protobuf/src', cc_file)
os.path.join(protoc_lib_deps.CC_INCLUDE, cc_file)
for cc_file in protoc_lib_deps.CC_FILES]
plugin_ext = extension.Extension(
name='grpc.tools.protoc_compiler',
@ -67,7 +92,7 @@ def protoc_ext_module():
'.',
'grpc_root',
'grpc_root/include',
'third_party/protobuf/src',
protoc_lib_deps.CC_INCLUDE,
],
language='c++',
define_macros=[('HAVE_PTHREAD', 1)],
@ -88,9 +113,10 @@ setuptools.setup(
protoc_ext_module(),
]),
packages=setuptools.find_packages('.'),
# TODO(atash): Figure out why auditwheel doesn't like namespace packages.
#namespace_packages=['grpc'],
namespace_packages=['grpc'],
install_requires=[
'protobuf>=3.0.0a3',
'grpcio>=0.14.0',
],
package_data=package_data(),
)

@ -67,11 +67,16 @@ DEPS_FILE_CONTENT="""
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# AUTO-GENERATED BY make_grpcio_tools.py!
CC_FILES={}
CC_FILES={cc_files}
PROTO_FILES={proto_files}
CC_INCLUDE={cc_include}
PROTO_INCLUDE={proto_include}
"""
# Bazel query result prefix for expected source files in protobuf.
PROTOBUF_CC_PREFIX = '//:src/'
PROTOBUF_PROTO_PREFIX = '//:src/'
GRPC_ROOT = os.path.abspath(
os.path.join(os.path.dirname(os.path.abspath(__file__)),
@ -79,7 +84,8 @@ GRPC_ROOT = os.path.abspath(
GRPC_PYTHON_ROOT = os.path.join(GRPC_ROOT, 'tools/distrib/python/grpcio_tools')
GRPC_PROTOBUF = os.path.join(GRPC_ROOT, 'third_party/protobuf/src')
GRPC_PYTHON_PROTOBUF_RELATIVE_ROOT = 'third_party/protobuf/src'
GRPC_PROTOBUF = os.path.join(GRPC_ROOT, GRPC_PYTHON_PROTOBUF_RELATIVE_ROOT)
GRPC_PROTOC_PLUGINS = os.path.join(GRPC_ROOT, 'src/compiler')
GRPC_PYTHON_PROTOBUF = os.path.join(GRPC_PYTHON_ROOT,
'third_party/protobuf/src')
@ -93,18 +99,29 @@ GRPC_PYTHON_INCLUDE = os.path.join(GRPC_PYTHON_ROOT, 'grpc_root/include')
BAZEL_DEPS = os.path.join(GRPC_ROOT, 'tools/distrib/python/bazel_deps.sh')
BAZEL_DEPS_PROTOC_LIB_QUERY = '//:protoc_lib'
BAZEL_DEPS_COMMON_PROTOS_QUERY = '//:well_known_protos'
def bazel_query(query):
output = subprocess.check_output([BAZEL_DEPS, query])
return output.splitlines()
def get_deps(query):
def get_deps():
"""Write the result of the bazel query `query` against protobuf to
`out_file`."""
output = subprocess.check_output([BAZEL_DEPS, query])
output = output.splitlines()
cc_files_output = bazel_query(BAZEL_DEPS_PROTOC_LIB_QUERY)
cc_files = [
name for name in output
name[len(PROTOBUF_CC_PREFIX):] for name in cc_files_output
if name.endswith('.cc') and name.startswith(PROTOBUF_CC_PREFIX)]
cc_files = [cc_file[len(PROTOBUF_CC_PREFIX):] for cc_file in cc_files]
deps_file_content = DEPS_FILE_CONTENT.format(cc_files)
proto_files_output = bazel_query(BAZEL_DEPS_COMMON_PROTOS_QUERY)
proto_files = [
name[len(PROTOBUF_PROTO_PREFIX):] for name in proto_files_output
if name.endswith('.proto') and name.startswith(PROTOBUF_PROTO_PREFIX)]
deps_file_content = DEPS_FILE_CONTENT.format(
cc_files=cc_files,
proto_files=proto_files,
cc_include=repr(GRPC_PYTHON_PROTOBUF_RELATIVE_ROOT),
proto_include=repr(GRPC_PYTHON_PROTOBUF_RELATIVE_ROOT))
return deps_file_content
@ -123,7 +140,7 @@ def main():
shutil.copytree(GRPC_INCLUDE, GRPC_PYTHON_INCLUDE)
try:
protoc_lib_deps_content = get_deps(BAZEL_DEPS_PROTOC_LIB_QUERY)
protoc_lib_deps_content = get_deps()
except Exception as error:
# We allow this script to succeed even if we couldn't get the dependencies,
# as then we can assume that even without a successful bazel run the

@ -41,3 +41,10 @@ RUN /opt/python/cp27-cp27mu/bin/pip install cython
RUN /opt/python/cp34-cp34m/bin/pip install cython
RUN /opt/python/cp35-cp35m/bin/pip install cython
####################################################
# Install auditwheel with fix for namespace packages
RUN git clone https://github.com/pypa/auditwheel /usr/local/src/auditwheel
RUN cd /usr/local/src/auditwheel && git checkout bf071b38c9fe78b025ea05c78b1cb61d7cb09939
RUN /opt/python/cp35-cp35m/bin/pip install /usr/local/src/auditwheel
RUN rm /usr/local/bin/auditwheel
RUN cd /usr/local/bin && ln -s /opt/python/cp35-cp35m/bin/auditwheel

@ -41,3 +41,10 @@ RUN /opt/python/cp27-cp27mu/bin/pip install cython
RUN /opt/python/cp34-cp34m/bin/pip install cython
RUN /opt/python/cp35-cp35m/bin/pip install cython
####################################################
# Install auditwheel with fix for namespace packages
RUN git clone https://github.com/pypa/auditwheel /usr/local/src/auditwheel
RUN cd /usr/local/src/auditwheel && git checkout bf071b38c9fe78b025ea05c78b1cb61d7cb09939
RUN /opt/python/cp35-cp35m/bin/pip install /usr/local/src/auditwheel
RUN rm /usr/local/bin/auditwheel
RUN cd /usr/local/bin && ln -s /opt/python/cp35-cp35m/bin/auditwheel

@ -59,12 +59,14 @@ ${SETARCH_CMD} ${PYTHON} setup.py \
${SETARCH_CMD} ${PYTHON} setup.py \
bdist_wheel
# Build gRPC tools package distribution
${PYTHON} tools/distrib/python/make_grpcio_tools.py
# Build gRPC tools package source distribution
${SETARCH_CMD} ${PYTHON} tools/distrib/python/grpcio_tools/setup.py \
sdist
# Build gRPC tools package binary distribution
${PYTHON} tools/distrib/python/make_grpcio_tools.py
CFLAGS="$CFLAGS -fno-wrapv" ${SETARCH_CMD} \
${PYTHON} tools/distrib/python/grpcio_tools/setup.py bdist_wheel

Loading…
Cancel
Save