Update Cython gRPC code

pull/4228/head
Masood Malekghassemi 9 years ago
parent 13ee18261a
commit aed42a895e
  1. 28
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx
  2. 31
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx
  3. 11
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx
  4. 11
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx
  5. 37
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxd
  6. 1
      src/python/grpcio/grpc/_cython/_cygrpc/records.pxd
  7. 84
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx
  8. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx
  9. 2
      src/python/grpcio/grpc/_cython/adapter_low.py
  10. 3
      src/python/grpcio/grpc/_cython/cygrpc.pyx

@ -53,24 +53,24 @@ cdef class Call:
self.c_call, cy_operations.c_ops, cy_operations.c_nops,
<cpython.PyObject *>operation_tag, NULL)
def cancel(self,
grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE,
details=None):
def cancel(
self, grpc.grpc_status_code error_code=grpc.GRPC_STATUS__DO_NOT_USE,
details=None):
if not self.is_valid:
raise ValueError("invalid call object cannot be used from Python")
if (details is None) != (error_code == grpc.GRPC_STATUS__DO_NOT_USE):
raise ValueError("if error_code is specified, so must details "
"(and vice-versa)")
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected details to be str or bytes")
if error_code != grpc.GRPC_STATUS__DO_NOT_USE:
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
details = details.encode()
else:
raise TypeError("expected details to be str or bytes")
self.references.append(details)
return grpc.grpc_call_cancel_with_status(self.c_call, error_code, details,
NULL)
return grpc.grpc_call_cancel_with_status(
self.c_call, error_code, details, NULL)
else:
return grpc.grpc_call_cancel(self.c_call, NULL)
@ -79,6 +79,12 @@ cdef class Call:
return grpc.grpc_call_set_credentials(
self.c_call, call_credentials.c_credentials)
def peer(self):
cdef char *peer = grpc.grpc_call_get_peer(self.c_call)
result = <bytes>peer
grpc.gpr_free(peer)
return result
def __dealloc__(self):
if self.c_call != NULL:
grpc.grpc_call_destroy(self.c_call)

@ -27,6 +27,8 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
cimport cpython
from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport completion_queue
from grpc._cython._cygrpc cimport credentials
@ -70,12 +72,16 @@ cdef class Channel:
method = method.encode()
else:
raise TypeError("expected method to be str or bytes")
if isinstance(host, bytes):
cdef char *host_c_string = NULL
if host is None:
pass
elif isinstance(host, bytes):
host_c_string = host
elif isinstance(host, basestring):
host = host.encode()
host_c_string = host
else:
raise TypeError("expected host to be str or bytes")
raise TypeError("expected host to be str, bytes, or None")
cdef call.Call operation_call = call.Call()
operation_call.references = [self, method, host, queue]
cdef grpc.grpc_call *parent_call = NULL
@ -83,10 +89,29 @@ cdef class Channel:
parent_call = parent.c_call
operation_call.c_call = grpc.grpc_channel_create_call(
self.c_channel, parent_call, flags,
queue.c_completion_queue, method, host, deadline.c_time,
queue.c_completion_queue, method, host_c_string, deadline.c_time,
NULL)
return operation_call
def check_connectivity_state(self, bint try_to_connect):
return grpc.grpc_channel_check_connectivity_state(self.c_channel,
try_to_connect)
def watch_connectivity_state(
self, last_observed_state, records.Timespec deadline not None,
completion_queue.CompletionQueue queue not None, tag):
cdef records.OperationTag operation_tag = records.OperationTag(tag)
cpython.Py_INCREF(operation_tag)
grpc.grpc_channel_watch_connectivity_state(
self.c_channel, last_observed_state, deadline.c_time,
queue.c_completion_queue, <cpython.PyObject *>operation_tag)
def target(self):
cdef char * target = grpc.grpc_channel_get_target(self.c_channel)
result = <bytes>target
grpc.gpr_free(target)
return result
def __dealloc__(self):
if self.c_channel != NULL:
grpc.grpc_channel_destroy(self.c_channel)

@ -62,6 +62,8 @@ cdef class CompletionQueue:
cdef grpc.grpc_event event
# Poll within a critical section
# TODO consider making queue polling contention a hard error to enable
# easier bug discovery
with self.poll_condition:
while self.is_polling:
self.poll_condition.wait(float(deadline) - time.time())
@ -74,10 +76,12 @@ cdef class CompletionQueue:
self.poll_condition.notify()
if event.type == grpc.GRPC_QUEUE_TIMEOUT:
return records.Event(event.type, False, None, None, None, None, None)
return records.Event(
event.type, False, None, None, None, None, False, None)
elif event.type == grpc.GRPC_QUEUE_SHUTDOWN:
self.is_shutdown = True
return records.Event(event.type, True, None, None, None, None, None)
return records.Event(
event.type, True, None, None, None, None, False, None)
else:
if event.tag != NULL:
tag = <records.OperationTag>event.tag
@ -97,7 +101,8 @@ cdef class CompletionQueue:
operation_call.references.extend(tag.references)
return records.Event(
event.type, event.success, user_tag, operation_call,
request_call_details, request_metadata, batch_operations)
request_call_details, request_metadata, tag.is_new_request,
batch_operations)
def shutdown(self):
grpc.grpc_completion_queue_shutdown(self.c_completion_queue)

@ -71,6 +71,7 @@ cdef class ServerCredentials:
def __cinit__(self):
self.c_credentials = NULL
self.references = []
def __dealloc__(self):
if self.c_credentials != NULL:
@ -83,7 +84,7 @@ def channel_credentials_google_default():
return credentials
def channel_credentials_ssl(pem_root_certificates,
records.SslPemKeyCertPair ssl_pem_key_cert_pair):
records.SslPemKeyCertPair ssl_pem_key_cert_pair):
if pem_root_certificates is None:
pass
elif isinstance(pem_root_certificates, bytes):
@ -104,6 +105,7 @@ def channel_credentials_ssl(pem_root_certificates,
else:
credentials.c_credentials = grpc.grpc_ssl_credentials_create(
c_pem_root_certificates, NULL, NULL)
return credentials
def channel_credentials_composite(
ChannelCredentials credentials_1 not None,
@ -135,7 +137,6 @@ def call_credentials_google_compute_engine():
grpc.grpc_google_compute_engine_credentials_create(NULL))
return credentials
#TODO rename to something like client_credentials_service_account_jwt_access.
def call_credentials_service_account_jwt_access(
json_key, records.Timespec token_lifetime not None):
if isinstance(json_key, bytes):
@ -186,12 +187,14 @@ def call_credentials_google_iam(authorization_token, authority_selector):
def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
bint force_client_auth):
cdef char *c_pem_root_certs = NULL
if pem_root_certs is None:
pass
elif isinstance(pem_root_certs, bytes):
pass
c_pem_root_certs = pem_root_certs
elif isinstance(pem_root_certs, basestring):
pem_root_certs = pem_root_certs.encode()
c_pem_root_certs = pem_root_certs
else:
raise TypeError("expected pem_root_certs to be str or bytes")
pem_key_cert_pairs = list(pem_key_cert_pairs)
@ -212,7 +215,7 @@ def server_credentials_ssl(pem_root_certs, pem_key_cert_pairs,
credentials.c_ssl_pem_key_cert_pairs[i] = (
(<records.SslPemKeyCertPair>pem_key_cert_pairs[i]).c_pair)
credentials.c_credentials = grpc.grpc_ssl_server_credentials_create(
pem_root_certs, credentials.c_ssl_pem_key_cert_pairs,
c_pem_root_certs, credentials.c_ssl_pem_key_cert_pairs,
credentials.c_ssl_pem_key_cert_pairs_count, force_client_auth, NULL)
return credentials

@ -132,6 +132,22 @@ cdef extern from "grpc/byte_buffer.h":
cdef extern from "grpc/grpc.h":
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_ENABLE_CENSUS
const char *GRPC_ARG_MAX_CONCURRENT_STREAMS
const char *GRPC_ARG_MAX_MESSAGE_LENGTH
const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
const char *GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER
const char *GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER
const char *GRPC_ARG_DEFAULT_AUTHORITY
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING
const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
const int GRPC_WRITE_BUFFER_HINT
const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK
ctypedef struct grpc_completion_queue:
# We don't care about the internals (and in fact don't know them)
pass
@ -149,9 +165,9 @@ cdef extern from "grpc/grpc.h":
pass
ctypedef enum grpc_arg_type:
grpc_arg_string "GRPC_ARG_STRING"
grpc_arg_integer "GRPC_ARG_INTEGER"
grpc_arg_pointer "GRPC_ARG_POINTER"
GRPC_ARG_STRING
GRPC_ARG_INTEGER
GRPC_ARG_POINTER
ctypedef struct grpc_arg_value_pointer:
void *address "p"
@ -185,6 +201,13 @@ cdef extern from "grpc/grpc.h":
GRPC_CALL_ERROR_INVALID_FLAGS
GRPC_CALL_ERROR_INVALID_METADATA
ctypedef enum grpc_connectivity_state:
GRPC_CHANNEL_IDLE
GRPC_CHANNEL_CONNECTING
GRPC_CHANNEL_READY
GRPC_CHANNEL_TRANSIENT_FAILURE
GRPC_CHANNEL_FATAL_FAILURE
ctypedef struct grpc_metadata:
const char *key
const char *value
@ -279,9 +302,9 @@ cdef extern from "grpc/grpc.h":
grpc_status_code status,
const char *description,
void *reserved)
char *grpc_call_get_peer(grpc_call *call)
void grpc_call_destroy(grpc_call *call)
grpc_channel *grpc_insecure_channel_create(const char *target,
const grpc_channel_args *args,
void *reserved)
@ -291,6 +314,12 @@ cdef extern from "grpc/grpc.h":
grpc_completion_queue *completion_queue,
const char *method, const char *host,
gpr_timespec deadline, void *reserved)
grpc_connectivity_state grpc_channel_check_connectivity_state(
grpc_channel *channel, int try_to_connect)
void grpc_channel_watch_connectivity_state(
grpc_channel *channel, grpc_connectivity_state last_observed_state,
gpr_timespec deadline, grpc_completion_queue *cq, void *tag)
char *grpc_channel_get_target(grpc_channel *channel)
void grpc_channel_destroy(grpc_channel *channel)
grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved)

@ -66,6 +66,7 @@ cdef class Event:
cdef readonly call.Call operation_call
# For Server.request_call
cdef readonly bint is_new_request
cdef readonly CallDetails request_call_details
cdef readonly Metadata request_metadata

@ -32,6 +32,32 @@ from grpc._cython._cygrpc cimport call
from grpc._cython._cygrpc cimport server
class ConnectivityState:
idle = grpc.GRPC_CHANNEL_IDLE
connecting = grpc.GRPC_CHANNEL_CONNECTING
ready = grpc.GRPC_CHANNEL_READY
transient_failure = grpc.GRPC_CHANNEL_TRANSIENT_FAILURE
fatal_failure = grpc.GRPC_CHANNEL_FATAL_FAILURE
class ChannelArgKey:
enable_census = grpc.GRPC_ARG_ENABLE_CENSUS
max_concurrent_streams = grpc.GRPC_ARG_MAX_CONCURRENT_STREAMS
max_message_length = grpc.GRPC_ARG_MAX_MESSAGE_LENGTH
http2_initial_sequence_number = grpc.GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
http2_hpack_table_size_decoder = grpc.GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER
http2_hpack_table_size_encoder = grpc.GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER
default_authority = grpc.GRPC_ARG_DEFAULT_AUTHORITY
primary_user_agent_string = grpc.GRPC_ARG_PRIMARY_USER_AGENT_STRING
secondary_user_agent_string = grpc.GRPC_ARG_SECONDARY_USER_AGENT_STRING
ssl_target_name_override = grpc.GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
class WriteFlag:
buffer_hint = grpc.GRPC_WRITE_BUFFER_HINT
no_compress = grpc.GRPC_WRITE_NO_COMPRESS
class StatusCode:
ok = grpc.GRPC_STATUS_OK
cancelled = grpc.GRPC_STATUS_CANCELLED
@ -88,7 +114,10 @@ cdef class Timespec:
def __cinit__(self, time):
if time is None:
self.c_time = grpc.gpr_now(grpc.GPR_CLOCK_REALTIME)
elif isinstance(time, float):
return
if isinstance(time, int):
time = float(time)
if isinstance(time, float):
if time == float("+inf"):
self.c_time = grpc.gpr_inf_future(grpc.GPR_CLOCK_REALTIME)
elif time == float("-inf"):
@ -97,8 +126,11 @@ cdef class Timespec:
self.c_time.seconds = time
self.c_time.nanoseconds = (time - float(self.c_time.seconds)) * 1e9
self.c_time.clock_type = grpc.GPR_CLOCK_REALTIME
elif isinstance(time, Timespec):
self.c_time = (<Timespec>time).c_time
else:
raise TypeError("expected time to be float")
raise TypeError("expected time to be float, int, or Timespec, not {}"
.format(type(time)))
@property
def seconds(self):
@ -166,6 +198,7 @@ cdef class Event:
object tag, call.Call operation_call,
CallDetails request_call_details,
Metadata request_metadata,
bint is_new_request,
Operations batch_operations):
self.type = type
self.success = success
@ -174,6 +207,7 @@ cdef class Event:
self.request_call_details = request_call_details
self.request_metadata = request_metadata
self.batch_operations = batch_operations
self.is_new_request = is_new_request
cdef class ByteBuffer:
@ -186,8 +220,14 @@ cdef class ByteBuffer:
pass
elif isinstance(data, basestring):
data = data.encode()
elif isinstance(data, ByteBuffer):
data = (<ByteBuffer>data).bytes()
if data is None:
self.c_byte_buffer = NULL
return
else:
raise TypeError("expected value to be of type str or bytes")
raise TypeError("expected value to be of type str, bytes, or "
"ByteBuffer, not {}".format(type(data)))
cdef char *c_data = data
data_slice = grpc.gpr_slice_from_copied_buffer(c_data, len(data))
@ -409,12 +449,22 @@ cdef class Operation:
def type(self):
return self.c_op.type
@property
def has_status(self):
return self.c_op.type == grpc.GRPC_OP_RECV_STATUS_ON_CLIENT
@property
def received_message(self):
if self.c_op.type != grpc.GRPC_OP_RECV_MESSAGE:
raise TypeError("self must be an operation receiving a message")
return self._received_message
@property
def received_message_or_none(self):
if self.c_op.type != grpc.GRPC_OP_RECV_MESSAGE:
return None
return self._received_message
@property
def received_metadata(self):
if (self.c_op.type != grpc.GRPC_OP_RECV_INITIAL_METADATA and
@ -422,12 +472,25 @@ cdef class Operation:
raise TypeError("self must be an operation receiving metadata")
return self._received_metadata
@property
def received_metadata_or_none(self):
if (self.c_op.type != grpc.GRPC_OP_RECV_INITIAL_METADATA and
self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT):
return None
return self._received_metadata
@property
def received_status_code(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
raise TypeError("self must be an operation receiving a status code")
return self._received_status_code
@property
def received_status_code_or_none(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
return self._received_status_code
@property
def received_status_details(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
@ -437,6 +500,15 @@ cdef class Operation:
else:
return None
@property
def received_status_details_or_none(self):
if self.c_op.type != grpc.GRPC_OP_RECV_STATUS_ON_CLIENT:
return None
if self._received_status_details:
return self._received_status_details
else:
return None
@property
def received_cancelled(self):
if self.c_op.type != grpc.GRPC_OP_RECV_CLOSE_ON_SERVER:
@ -444,6 +516,12 @@ cdef class Operation:
"information")
return False if self._received_cancelled == 0 else True
@property
def received_cancelled_or_none(self):
if self.c_op.type != grpc.GRPC_OP_RECV_CLOSE_ON_SERVER:
return None
return False if self._received_cancelled == 0 else True
def __dealloc__(self):
# We *almost* don't need to do anything; most of the objects are handled by
# Python. The remaining one(s) are primitive fields filled in by GRPC core.

@ -132,7 +132,7 @@ cdef class Server:
def cancel_all_calls(self):
if not self.is_shutting_down:
raise ValueError("the server must be shutting down to cancel all calls")
raise RuntimeError("the server must be shutting down to cancel all calls")
elif self.is_shutdown:
return
else:

@ -33,7 +33,7 @@
#
# TODO(atash): Once this is plugged into grpc._adapter._intermediary_low, remove
# both grpc._adapter._intermediary_low and this file. The fore and rear links in
# grpc._adapter should be able to use grpc._cython.types directly.
# grpc._adapter should be able to use grpc._cython.cygrpc directly.
from grpc._adapter import _types as type_interfaces
from grpc._cython import cygrpc

@ -44,6 +44,9 @@ from grpc._cython._cygrpc import completion_queue
from grpc._cython._cygrpc import records
from grpc._cython._cygrpc import server
ConnectivityState = records.ConnectivityState
ChannelArgKey = records.ChannelArgKey
WriteFlag = records.WriteFlag
StatusCode = records.StatusCode
CallError = records.CallError
CompletionType = records.CompletionType

Loading…
Cancel
Save