diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index d7456a3dd16..3572737c874 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -129,12 +129,12 @@ def _abort(state, code, details):
 def _handle_event(event, state, response_deserializer):
     callbacks = []
     for batch_operation in event.batch_operations:
-        operation_type = batch_operation.type
+        operation_type = batch_operation.type()
         state.due.remove(operation_type)
         if operation_type == cygrpc.OperationType.receive_initial_metadata:
-            state.initial_metadata = batch_operation.received_metadata
+            state.initial_metadata = batch_operation.initial_metadata()
         elif operation_type == cygrpc.OperationType.receive_message:
-            serialized_response = batch_operation.received_message.bytes()
+            serialized_response = batch_operation.message()
             if serialized_response is not None:
                 response = _common.deserialize(serialized_response,
                                                response_deserializer)
@@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer):
                 else:
                     state.response = response
         elif operation_type == cygrpc.OperationType.receive_status_on_client:
-            state.trailing_metadata = batch_operation.received_metadata
+            state.trailing_metadata = batch_operation.trailing_metadata()
             if state.code is None:
                 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
-                    batch_operation.received_status_code)
+                    batch_operation.code())
                 if code is None:
                     state.code = grpc.StatusCode.UNKNOWN
                     state.details = _unknown_code_details(
-                        batch_operation.received_status_code,
-                        batch_operation.received_status_details)
+                        code, batch_operation.details())
                 else:
                     state.code = code
-                    state.details = batch_operation.received_status_details
+                    state.details = batch_operation.details()
             callbacks.extend(state.callbacks)
             state.callbacks = None
     return callbacks
@@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call,
                         _abort(state, grpc.StatusCode.INTERNAL, details)
                         return
                     else:
-                        operations = (cygrpc.operation_send_message(
+                        operations = (cygrpc.SendMessageOperation(
                             serialized_request, _EMPTY_FLAGS),)
                         call.start_client_batch(operations, event_handler)
                         state.due.add(cygrpc.OperationType.send_message)
@@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call,
         with state.condition:
             if state.code is None:
                 operations = (
-                    cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),)
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
                 call.start_client_batch(operations, event_handler)
                 state.due.add(cygrpc.OperationType.send_close_from_client)
 
@@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
                 event_handler = _event_handler(self._state, self._call,
                                                self._response_deserializer)
                 self._call.start_client_batch(
-                    (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                     event_handler)
                 self._state.due.add(cygrpc.OperationType.receive_message)
             elif self._state.code is grpc.StatusCode.OK:
@@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
         else:
             state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
             operations = (
-                cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
-                cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
-                cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-                cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
-                cygrpc.operation_receive_message(_EMPTY_FLAGS),
-                cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
+                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
             return state, operations, deadline, deadline_timespec, None
 
     def _blocking(self, request, timeout, metadata, credentials):
@@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
                                            self._response_deserializer)
             with state.condition:
                 call.start_client_batch(
-                    (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+                    (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
                     event_handler)
                 operations = (
-                    cygrpc.operation_send_initial_metadata(
-                        metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(
+                    cygrpc.SendInitialMetadataOperation(
+                        metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(
                             serialized_request, _EMPTY_FLAGS),
-                    cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-                    cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
                 call_error = call.start_client_batch(operations, event_handler)
                 if call_error != cygrpc.CallError.ok:
                     _call_error_set_RPCstate(state, call_error, metadata)
@@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
             call.set_credentials(credentials._credentials)
         with state.condition:
             call.start_client_batch(
-                (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
-                None)
+                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
             operations = (
-                cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
-                cygrpc.operation_receive_message(_EMPTY_FLAGS),
-                cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
             call_error = call.start_client_batch(operations, None)
             _check_call_error(call_error, metadata)
             _consume_request_iterator(request_iterator, state, call,
@@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
         event_handler = _event_handler(state, call, self._response_deserializer)
         with state.condition:
             call.start_client_batch(
-                (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
                 event_handler)
             operations = (
-                cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
-                cygrpc.operation_receive_message(_EMPTY_FLAGS),
-                cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
             call_error = call.start_client_batch(operations, event_handler)
             if call_error != cygrpc.CallError.ok:
                 _call_error_set_RPCstate(state, call_error, metadata)
@@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
         event_handler = _event_handler(state, call, self._response_deserializer)
         with state.condition:
             call.start_client_batch(
-                (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
                 event_handler)
             operations = (
-                cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
-                cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
             call_error = call.start_client_batch(operations, event_handler)
             if call_error != cygrpc.CallError.ok:
                 _call_error_set_RPCstate(state, call_error, metadata)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 660263fc09d..7f919650073 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -17,6 +17,7 @@ cimport libc.time
 
 # Typedef types with approximately the same semantics to provide their names to
 # Cython
+ctypedef unsigned char uint8_t
 ctypedef int int32_t
 ctypedef unsigned uint32_t
 ctypedef long int64_t
@@ -25,6 +26,7 @@ ctypedef long int64_t
 cdef extern from "grpc/support/alloc.h":
 
   void *gpr_malloc(size_t size) nogil
+  void *gpr_zalloc(size_t size) nogil
   void gpr_free(void *ptr) nogil
   void *gpr_realloc(void *p, size_t size) nogil
 
@@ -190,6 +192,18 @@ cdef extern from "grpc/grpc.h":
     size_t arguments_length "num_args"
     grpc_arg *arguments "args"
 
+  ctypedef enum grpc_compression_level:
+    GRPC_COMPRESS_LEVEL_NONE
+    GRPC_COMPRESS_LEVEL_LOW
+    GRPC_COMPRESS_LEVEL_MED
+    GRPC_COMPRESS_LEVEL_HIGH
+
+  ctypedef enum grpc_stream_compression_level:
+    GRPC_STREAM_COMPRESS_LEVEL_NONE
+    GRPC_STREAM_COMPRESS_LEVEL_LOW
+    GRPC_STREAM_COMPRESS_LEVEL_MED
+    GRPC_STREAM_COMPRESS_LEVEL_HIGH
+
   ctypedef enum grpc_call_error:
     GRPC_CALL_OK
     GRPC_CALL_ERROR
@@ -265,9 +279,19 @@ cdef extern from "grpc/grpc.h":
     GRPC_OP_RECV_STATUS_ON_CLIENT
     GRPC_OP_RECV_CLOSE_ON_SERVER
 
+  ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level:
+    uint8_t is_set
+    grpc_compression_level level
+
+  ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level:
+    uint8_t is_set
+    grpc_stream_compression_level level
+
   ctypedef struct grpc_op_data_send_initial_metadata:
     size_t count
     grpc_metadata *metadata
+    grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
+    grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level
 
   ctypedef struct grpc_op_data_send_status_from_server:
     size_t trailing_metadata_count
@@ -524,7 +548,7 @@ cdef extern from "grpc/grpc_security.h":
 
   grpc_auth_property_iterator grpc_auth_context_property_iterator(
       const grpc_auth_context *ctx)
- 
+
   grpc_auth_property_iterator grpc_auth_context_peer_identity(
       const grpc_auth_context *ctx)
 
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
new file mode 100644
index 00000000000..bfbe27785b7
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
@@ -0,0 +1,109 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+  # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this!
+  cdef grpc_op c_op
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+  cdef readonly object _initial_metadata;
+  cdef readonly int _flags
+  cdef grpc_metadata *_c_initial_metadata
+  cdef size_t _c_initial_metadata_count
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class SendMessageOperation(Operation):
+
+  cdef readonly bytes _message
+  cdef readonly int _flags
+  cdef grpc_byte_buffer *_c_message_byte_buffer
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+  cdef readonly int _flags
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+  cdef readonly object _trailing_metadata
+  cdef readonly object _code
+  cdef readonly object _details
+  cdef readonly int _flags
+  cdef grpc_metadata *_c_trailing_metadata
+  cdef size_t _c_trailing_metadata_count
+  cdef grpc_slice _c_details
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+  cdef readonly int _flags
+  cdef tuple _initial_metadata
+  cdef grpc_metadata_array _c_initial_metadata
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+  cdef readonly int _flags
+  cdef grpc_byte_buffer *_c_message_byte_buffer
+  cdef bytes _message
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+  cdef readonly int _flags
+  cdef grpc_metadata_array _c_trailing_metadata
+  cdef grpc_status_code _c_code
+  cdef grpc_slice _c_details
+  cdef tuple _trailing_metadata
+  cdef object _code
+  cdef str _details
+
+  cdef void c(self)
+  cdef void un_c(self)
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+  cdef readonly int _flags
+  cdef object _cancelled
+  cdef int _c_cancelled
+
+  cdef void c(self)
+  cdef void un_c(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
new file mode 100644
index 00000000000..3c91abf7220
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
@@ -0,0 +1,238 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+  cdef void c(self):
+    raise NotImplementedError()
+
+  cdef void un_c(self):
+    raise NotImplementedError()
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+  def __cinit__(self, initial_metadata, flags):
+    self._initial_metadata = initial_metadata
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_SEND_INITIAL_METADATA
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
+    self.c_op.flags = self._flags
+    _store_c_metadata(
+        self._initial_metadata, &self._c_initial_metadata,
+        &self._c_initial_metadata_count)
+    self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
+    self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
+    self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
+    self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0
+
+  cdef void un_c(self):
+    _release_c_metadata(
+        self._c_initial_metadata, self._c_initial_metadata_count)
+
+
+cdef class SendMessageOperation(Operation):
+
+  def __cinit__(self, bytes message, int flags):
+    self._message = message
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_SEND_MESSAGE
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_SEND_MESSAGE
+    self.c_op.flags = self._flags
+    cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
+        self._message, len(self._message))
+    self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
+        &message_slice, 1)
+    grpc_slice_unref(message_slice)
+    self.c_op.data.send_message.send_message = self._c_message_byte_buffer
+
+  cdef void un_c(self):
+    grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+  def __cinit__(self, int flags):
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_SEND_CLOSE_FROM_CLIENT
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+    self.c_op.flags = self._flags
+
+  cdef void un_c(self):
+    pass
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+  def __cinit__(self, trailing_metadata, code, object details, int flags):
+    self._trailing_metadata = trailing_metadata
+    self._code = code
+    self._details = details
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_SEND_STATUS_FROM_SERVER
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
+    self.c_op.flags = self._flags
+    _store_c_metadata(
+        self._trailing_metadata, &self._c_trailing_metadata,
+        &self._c_trailing_metadata_count)
+    self.c_op.data.send_status_from_server.trailing_metadata = (
+        self._c_trailing_metadata)
+    self.c_op.data.send_status_from_server.trailing_metadata_count = (
+        self._c_trailing_metadata_count)
+    self.c_op.data.send_status_from_server.status = self._code
+    self._c_details = _slice_from_bytes(_encode(self._details))
+    self.c_op.data.send_status_from_server.status_details = &self._c_details
+
+  cdef void un_c(self):
+    grpc_slice_unref(self._c_details)
+    _release_c_metadata(
+        self._c_trailing_metadata, self._c_trailing_metadata_count)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+  def __cinit__(self, flags):
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_RECV_INITIAL_METADATA
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
+    self.c_op.flags = self._flags
+    grpc_metadata_array_init(&self._c_initial_metadata)
+    self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
+        &self._c_initial_metadata)
+
+  cdef void un_c(self):
+    self._initial_metadata = _metadata(&self._c_initial_metadata)
+    grpc_metadata_array_destroy(&self._c_initial_metadata)
+
+  def initial_metadata(self):
+    return self._initial_metadata
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+  def __cinit__(self, flags):
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_RECV_MESSAGE
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_RECV_MESSAGE
+    self.c_op.flags = self._flags
+    self.c_op.data.receive_message.receive_message = (
+        &self._c_message_byte_buffer)
+
+  cdef void un_c(self):
+    cdef grpc_byte_buffer_reader message_reader
+    cdef bint message_reader_status
+    cdef grpc_slice message_slice
+    cdef size_t message_slice_length
+    cdef void *message_slice_pointer
+    if self._c_message_byte_buffer != NULL:
+      message_reader_status = grpc_byte_buffer_reader_init(
+          &message_reader, self._c_message_byte_buffer)
+      if message_reader_status:
+        message = bytearray()
+        while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
+          message_slice_pointer = grpc_slice_start_ptr(message_slice)
+          message_slice_length = grpc_slice_length(message_slice)
+          message += (<char *>message_slice_pointer)[:message_slice_length]
+          grpc_slice_unref(message_slice)
+        grpc_byte_buffer_reader_destroy(&message_reader)
+        self._message = bytes(message)
+      else:
+        self._message = None
+      grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+    else:
+      self._message = None
+
+  def message(self):
+    return self._message
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+  def __cinit__(self, flags):
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_RECV_STATUS_ON_CLIENT
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
+    self.c_op.flags = self._flags
+    grpc_metadata_array_init(&self._c_trailing_metadata)
+    self.c_op.data.receive_status_on_client.trailing_metadata = (
+        &self._c_trailing_metadata)
+    self.c_op.data.receive_status_on_client.status = (
+        &self._c_code)
+    self.c_op.data.receive_status_on_client.status_details = (
+        &self._c_details)
+
+  cdef void un_c(self):
+    self._trailing_metadata = _metadata(&self._c_trailing_metadata)
+    grpc_metadata_array_destroy(&self._c_trailing_metadata)
+    self._code = self._c_code
+    self._details = _decode(_slice_bytes(self._c_details))
+    grpc_slice_unref(self._c_details)
+
+  def trailing_metadata(self):
+    return self._trailing_metadata
+
+  def code(self):
+    return self._code
+
+  def details(self):
+    return self._details
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+  def __cinit__(self, flags):
+    self._flags = flags
+
+  def type(self):
+    return GRPC_OP_RECV_CLOSE_ON_SERVER
+
+  cdef void c(self):
+    self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
+    self.c_op.flags = self._flags
+    self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
+
+  cdef void un_c(self):
+    self._cancelled = bool(self._c_cancelled)
+
+  def cancelled(self):
+    return self._cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 594fdb1a8b2..537cf2b537f 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -65,11 +65,6 @@ cdef class Event:
   cdef readonly object batch_operations
 
 
-cdef class ByteBuffer:
-
-  cdef grpc_byte_buffer *c_byte_buffer
-
-
 cdef class SslPemKeyCertPair:
 
   cdef grpc_ssl_pem_key_cert_pair c_pair
@@ -89,22 +84,6 @@ cdef class ChannelArgs:
   cdef list args
 
 
-cdef class Operation:
-
-  cdef grpc_op c_op
-  cdef bint _c_metadata_needs_release
-  cdef size_t _c_metadata_count
-  cdef grpc_metadata *_c_metadata
-  cdef ByteBuffer _received_message
-  cdef bint _c_metadata_array_needs_destruction
-  cdef grpc_metadata_array _c_metadata_array
-  cdef grpc_status_code _received_status_code
-  cdef grpc_slice _status_details
-  cdef int _received_cancelled
-  cdef readonly bint is_valid
-  cdef object references
-
-
 cdef class CompressionOptions:
 
   cdef grpc_compression_options c_options
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 5877591f786..99f8ffa3e70 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -229,13 +229,15 @@ cdef class OperationTag:
     self.c_nops = 0 if self._operations is None else len(self._operations)
     if 0 < self.c_nops:
       self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
-      for index in range(self.c_nops):
-        self.c_ops[index] = (<Operation>(self._operations[index])).c_op
+      for index, operation in enumerate(self._operations):
+        (<Operation>operation).c()
+        self.c_ops[index] = (<Operation>operation).c_op
 
   cdef object release_ops(self):
     if 0 < self.c_nops:
       for index, operation in enumerate(self._operations):
         (<Operation>operation).c_op = self.c_ops[index]
+        (<Operation>operation).un_c()
       gpr_free(self.c_ops)
       return self._operations
     else:
@@ -260,69 +262,6 @@ cdef class Event:
     self.is_new_request = is_new_request
 
 
-cdef class ByteBuffer:
-
-  def __cinit__(self, bytes data):
-    grpc_init()
-    if data is None:
-      self.c_byte_buffer = NULL
-      return
-
-    cdef char *c_data = data
-    cdef grpc_slice data_slice
-    cdef size_t data_length = len(data)
-    with nogil:
-      data_slice = grpc_slice_from_copied_buffer(c_data, data_length)
-    with nogil:
-      self.c_byte_buffer = grpc_raw_byte_buffer_create(
-          &data_slice, 1)
-    with nogil:
-      grpc_slice_unref(data_slice)
-
-  def bytes(self):
-    cdef grpc_byte_buffer_reader reader
-    cdef grpc_slice data_slice
-    cdef size_t data_slice_length
-    cdef void *data_slice_pointer
-    cdef bint reader_status
-    if self.c_byte_buffer != NULL:
-      with nogil:
-        reader_status = grpc_byte_buffer_reader_init(
-            &reader, self.c_byte_buffer)
-      if not reader_status:
-        return None
-      result = bytearray()
-      with nogil:
-        while grpc_byte_buffer_reader_next(&reader, &data_slice):
-          data_slice_pointer = grpc_slice_start_ptr(data_slice)
-          data_slice_length = grpc_slice_length(data_slice)
-          with gil:
-            result += (<char *>data_slice_pointer)[:data_slice_length]
-          grpc_slice_unref(data_slice)
-      with nogil:
-        grpc_byte_buffer_reader_destroy(&reader)
-      return bytes(result)
-    else:
-      return None
-
-  def __len__(self):
-    cdef size_t result
-    if self.c_byte_buffer != NULL:
-      with nogil:
-        result = grpc_byte_buffer_length(self.c_byte_buffer)
-      return result
-    else:
-      return 0
-
-  def __str__(self):
-    return self.bytes()
-
-  def __dealloc__(self):
-    if self.c_byte_buffer != NULL:
-      grpc_byte_buffer_destroy(self.c_byte_buffer)
-    grpc_shutdown()
-
-
 cdef class SslPemKeyCertPair:
 
   def __cinit__(self, bytes private_key, bytes certificate_chain):
@@ -365,7 +304,7 @@ cdef class ChannelArg:
     elif hasattr(value, '__int__'):
       # Pointer objects must override __int__() to return
       # the underlying C address (Python ints are word size).  The
-      # lifecycle of the pointer is fixed to the lifecycle of the 
+      # lifecycle of the pointer is fixed to the lifecycle of the
       # python object wrapping it.
       self.ptr_vtable.copy = &copy_ptr
       self.ptr_vtable.destroy = &destroy_ptr
@@ -407,185 +346,6 @@ cdef class ChannelArgs:
     return self.args[i]
 
 
-cdef class Operation:
-
-  def __cinit__(self):
-    grpc_init()
-    self.references = []
-    self._c_metadata_needs_release = False
-    self._c_metadata_array_needs_destruction = False
-    self._status_details = grpc_empty_slice()
-    self.is_valid = False
-
-  @property
-  def type(self):
-    return self.c_op.type
-
-  @property
-  def flags(self):
-    return self.c_op.flags
-
-  @property
-  def has_status(self):
-    return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT
-
-  @property
-  def received_message(self):
-    if self.c_op.type != GRPC_OP_RECV_MESSAGE:
-      raise TypeError("self must be an operation receiving a message")
-    return self._received_message
-
-  @property
-  def received_message_or_none(self):
-    if self.c_op.type != GRPC_OP_RECV_MESSAGE:
-      return None
-    return self._received_message
-
-  @property
-  def received_metadata(self):
-    if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
-        self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
-      raise TypeError("self must be an operation receiving metadata")
-    return _metadata(&self._c_metadata_array)
-
-  @property
-  def received_status_code(self):
-    if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
-      raise TypeError("self must be an operation receiving a status code")
-    return self._received_status_code
-
-  @property
-  def received_status_code_or_none(self):
-    if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
-      return None
-    return self._received_status_code
-
-  @property
-  def received_status_details(self):
-    if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
-      raise TypeError("self must be an operation receiving status details")
-    return _slice_bytes(self._status_details)
-
-  @property
-  def received_status_details_or_none(self):
-    if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
-      return None
-    return _slice_bytes(self._status_details)
-
-  @property
-  def received_cancelled(self):
-    if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
-      raise TypeError("self must be an operation receiving cancellation "
-                      "information")
-    return False if self._received_cancelled == 0 else True
-
-  @property
-  def received_cancelled_or_none(self):
-    if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
-      return None
-    return False if self._received_cancelled == 0 else True
-
-  def __dealloc__(self):
-    if self._c_metadata_needs_release:
-      _release_c_metadata(self._c_metadata, self._c_metadata_count)
-    if self._c_metadata_array_needs_destruction:
-      grpc_metadata_array_destroy(&self._c_metadata_array)
-    grpc_slice_unref(self._status_details)
-    grpc_shutdown()
-
-def operation_send_initial_metadata(metadata, int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
-  op.c_op.flags = flags
-  _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
-  op._c_metadata_needs_release = True
-  op.c_op.data.send_initial_metadata.count = op._c_metadata_count
-  op.c_op.data.send_initial_metadata.metadata = op._c_metadata
-  op.is_valid = True
-  return op
-
-def operation_send_message(data, int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_SEND_MESSAGE
-  op.c_op.flags = flags
-  byte_buffer = ByteBuffer(data)
-  op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer
-  op.references.append(byte_buffer)
-  op.is_valid = True
-  return op
-
-def operation_send_close_from_client(int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
-  op.c_op.flags = flags
-  op.is_valid = True
-  return op
-
-def operation_send_status_from_server(
-    metadata, grpc_status_code code, bytes details, int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
-  op.c_op.flags = flags
-  _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
-  op._c_metadata_needs_release = True
-  op.c_op.data.send_status_from_server.trailing_metadata_count = (
-      op._c_metadata_count)
-  op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
-  op.c_op.data.send_status_from_server.status = code
-  grpc_slice_unref(op._status_details)
-  op._status_details = _slice_from_bytes(details)
-  op.c_op.data.send_status_from_server.status_details = &op._status_details
-  op.is_valid = True
-  return op
-
-def operation_receive_initial_metadata(int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
-  op.c_op.flags = flags
-  grpc_metadata_array_init(&op._c_metadata_array)
-  op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
-      &op._c_metadata_array)
-  op._c_metadata_array_needs_destruction = True
-  op.is_valid = True
-  return op
-
-def operation_receive_message(int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_RECV_MESSAGE
-  op.c_op.flags = flags
-  op._received_message = ByteBuffer(None)
-  # n.b. the c_op.data.receive_message field needs to be deleted by us,
-  # anyway, so we just let that be handled by the ByteBuffer() we allocated
-  # the line before.
-  op.c_op.data.receive_message.receive_message = (
-      &op._received_message.c_byte_buffer)
-  op.is_valid = True
-  return op
-
-def operation_receive_status_on_client(int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
-  op.c_op.flags = flags
-  grpc_metadata_array_init(&op._c_metadata_array)
-  op.c_op.data.receive_status_on_client.trailing_metadata = (
-      &op._c_metadata_array)
-  op._c_metadata_array_needs_destruction = True
-  op.c_op.data.receive_status_on_client.status = (
-      &op._received_status_code)
-  op.c_op.data.receive_status_on_client.status_details = (
-      &op._status_details)
-  op.is_valid = True
-  return op
-
-def operation_receive_close_on_server(int flags):
-  cdef Operation op = Operation()
-  op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
-  op.c_op.flags = flags
-  op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
-  op.is_valid = True
-  return op
-
-
 cdef class CompressionOptions:
 
   def __cinit__(self):
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 6fc5638d5d8..ad229de0ae6 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -19,6 +19,7 @@ include "_cygrpc/channel.pxd.pxi"
 include "_cygrpc/credentials.pxd.pxi"
 include "_cygrpc/completion_queue.pxd.pxi"
 include "_cygrpc/metadata.pxd.pxi"
+include "_cygrpc/operation.pxd.pxi"
 include "_cygrpc/records.pxd.pxi"
 include "_cygrpc/security.pxd.pxi"
 include "_cygrpc/server.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index d6052298221..0964fb66ab7 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -26,6 +26,7 @@ include "_cygrpc/channel.pyx.pxi"
 include "_cygrpc/credentials.pyx.pxi"
 include "_cygrpc/completion_queue.pyx.pxi"
 include "_cygrpc/metadata.pyx.pxi"
+include "_cygrpc/operation.pyx.pxi"
 include "_cygrpc/records.pyx.pxi"
 include "_cygrpc/security.pyx.pxi"
 include "_cygrpc/server.pyx.pxi"
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 02d3af8706d..eec31bdcf6b 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0
 
 
 def _serialized_request(request_event):
-    return request_event.batch_operations[0].received_message.bytes()
+    return request_event.batch_operations[0].message()
 
 
 def _application_code(code):
@@ -130,13 +130,13 @@ def _abort(state, call, code, details):
         effective_code = _abortion_code(state, code)
         effective_details = details if state.details is None else state.details
         if state.initial_metadata_allowed:
-            operations = (cygrpc.operation_send_initial_metadata(
-                (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
+            operations = (cygrpc.SendInitialMetadataOperation(
+                None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation(
                     state.trailing_metadata, effective_code, effective_details,
                     _EMPTY_FLAGS),)
             token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
         else:
-            operations = (cygrpc.operation_send_status_from_server(
+            operations = (cygrpc.SendStatusFromServerOperation(
                 state.trailing_metadata, effective_code, effective_details,
                 _EMPTY_FLAGS),)
             token = _SEND_STATUS_FROM_SERVER_TOKEN
@@ -150,8 +150,7 @@ def _receive_close_on_server(state):
 
     def receive_close_on_server(receive_close_on_server_event):
         with state.condition:
-            if receive_close_on_server_event.batch_operations[
-                    0].received_cancelled:
+            if receive_close_on_server_event.batch_operations[0].cancelled():
                 state.client = _CANCELLED
             elif state.client is _OPEN:
                 state.client = _CLOSED
@@ -262,7 +261,7 @@ class _Context(grpc.ServicerContext):
                 _raise_rpc_error(self._state)
             else:
                 if self._state.initial_metadata_allowed:
-                    operation = cygrpc.operation_send_initial_metadata(
+                    operation = cygrpc.SendInitialMetadataOperation(
                         initial_metadata, _EMPTY_FLAGS)
                     self._rpc_event.operation_call.start_server_batch(
                         (operation,), _send_initial_metadata(self._state))
@@ -305,7 +304,7 @@ class _RequestIterator(object):
             raise StopIteration()
         else:
             self._call.start_server_batch(
-                (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+                (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                 _receive_message(self._state, self._call,
                                  self._request_deserializer))
             self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
@@ -348,7 +347,7 @@ def _unary_request(rpc_event, state, request_deserializer):
                 return None
             else:
                 rpc_event.operation_call.start_server_batch(
-                    (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                     _receive_message(state, rpc_event.operation_call,
                                      request_deserializer))
                 state.due.add(_RECEIVE_MESSAGE_TOKEN)
@@ -424,14 +423,15 @@ def _send_response(rpc_event, state, serialized_response):
             return False
         else:
             if state.initial_metadata_allowed:
-                operations = (cygrpc.operation_send_initial_metadata(
-                    (), _EMPTY_FLAGS), cygrpc.operation_send_message(
-                        serialized_response, _EMPTY_FLAGS),)
+                operations = (cygrpc.SendInitialMetadataOperation(None,
+                                                                  _EMPTY_FLAGS),
+                              cygrpc.SendMessageOperation(serialized_response,
+                                                          _EMPTY_FLAGS),)
                 state.initial_metadata_allowed = False
                 token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
             else:
-                operations = (cygrpc.operation_send_message(serialized_response,
-                                                            _EMPTY_FLAGS),)
+                operations = (cygrpc.SendMessageOperation(serialized_response,
+                                                          _EMPTY_FLAGS),)
                 token = _SEND_MESSAGE_TOKEN
             rpc_event.operation_call.start_server_batch(
                 operations, _send_message(state, token))
@@ -448,16 +448,16 @@ def _status(rpc_event, state, serialized_response):
             code = _completion_code(state)
             details = _details(state)
             operations = [
-                cygrpc.operation_send_status_from_server(
+                cygrpc.SendStatusFromServerOperation(
                     state.trailing_metadata, code, details, _EMPTY_FLAGS),
             ]
             if state.initial_metadata_allowed:
                 operations.append(
-                    cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS))
+                    cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS))
             if serialized_response is not None:
                 operations.append(
-                    cygrpc.operation_send_message(serialized_response,
-                                                  _EMPTY_FLAGS))
+                    cygrpc.SendMessageOperation(serialized_response,
+                                                _EMPTY_FLAGS))
             rpc_event.operation_call.start_server_batch(
                 operations,
                 _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
@@ -563,10 +563,10 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
 
 
 def _reject_rpc(rpc_event, status, details):
-    operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
-                  cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
-                  cygrpc.operation_send_status_from_server((), status, details,
-                                                           _EMPTY_FLAGS),)
+    operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS),
+                  cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+                  cygrpc.SendStatusFromServerOperation(None, status, details,
+                                                       _EMPTY_FLAGS),)
     rpc_state = _RPCState()
     rpc_event.operation_call.start_server_batch(
         operations, lambda ignored_event: (rpc_state, (),))
@@ -577,7 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
     state = _RPCState()
     with state.condition:
         rpc_event.operation_call.start_server_batch(
-            (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+            (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
             _receive_close_on_server(state))
         state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
         if method_handler.request_streaming:
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 3bf53087495..e033c1063f4 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -34,6 +34,7 @@
   "unit._cython._no_messages_server_completion_queue_per_call_test.Test",
   "unit._cython._no_messages_single_server_completion_queue_test.Test",
   "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
+  "unit._cython._server_test.Test",
   "unit._cython.cygrpc_test.InsecureServerInsecureClient",
   "unit._cython.cygrpc_test.SecureServerSecureClient",
   "unit._cython.cygrpc_test.TypeSmokeTest",
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index a8a7175cc71..458b4fe542a 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -65,10 +65,10 @@ class _Handler(object):
 
         with self._lock:
             self._call.start_server_batch(
-                (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+                (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
                 _RECEIVE_CLOSE_ON_SERVER_TAG)
             self._call.start_server_batch(
-                (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+                (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                 _RECEIVE_MESSAGE_TAG)
         first_event = self._completion_queue.poll()
         if _is_cancellation_event(first_event):
@@ -76,10 +76,10 @@ class _Handler(object):
         else:
             with self._lock:
                 operations = (
-                    cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
-                                                           _EMPTY_FLAGS),
-                    cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS),
-                    cygrpc.operation_send_status_from_server(
+                    cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+                                                        _EMPTY_FLAGS),
+                    cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS),
+                    cygrpc.SendStatusFromServerOperation(
                         _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
                         _EMPTY_FLAGS),)
                 self._call.start_server_batch(operations,
@@ -169,13 +169,13 @@ class CancelManyCallsTest(unittest.TestCase):
                     None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies',
                     None, _INFINITE_FUTURE)
                 operations = (
-                    cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
-                                                           _EMPTY_FLAGS),
-                    cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS),
-                    cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-                    cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
-                    cygrpc.operation_receive_message(_EMPTY_FLAGS),
-                    cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+                    cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+                                                        _EMPTY_FLAGS),
+                    cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
                 tag = 'client_complete_call_{0:04d}_tag'.format(index)
                 client_call.start_client_batch(operations, tag)
                 client_due.add(tag)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index d08003af444..41291cc88f4 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase):
         with self.client_condition:
             client_receive_initial_metadata_start_batch_result = (
                 client_call.start_client_batch([
-                    cygrpc.operation_receive_initial_metadata(
-                        _common.EMPTY_FLAGS),
+                    cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
                 ], client_receive_initial_metadata_tag))
+            self.assertEqual(cygrpc.CallError.ok,
+                             client_receive_initial_metadata_start_batch_result)
             client_complete_rpc_start_batch_result = client_call.start_client_batch(
                 [
-                    cygrpc.operation_send_initial_metadata(
+                    cygrpc.SendInitialMetadataOperation(
                         _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
-                    cygrpc.operation_send_close_from_client(
-                        _common.EMPTY_FLAGS),
-                    cygrpc.operation_receive_status_on_client(
-                        _common.EMPTY_FLAGS),
+                    cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
                 ], client_complete_rpc_tag)
+            self.assertEqual(cygrpc.CallError.ok,
+                             client_complete_rpc_start_batch_result)
             self.client_driver.add_due({
                 client_receive_initial_metadata_tag,
                 client_complete_rpc_tag,
@@ -72,7 +73,7 @@ class Test(_common.RpcTest, unittest.TestCase):
         with server_call_condition:
             server_send_initial_metadata_start_batch_result = (
                 server_request_call_event.operation_call.start_server_batch([
-                    cygrpc.operation_send_initial_metadata(
+                    cygrpc.SendInitialMetadataOperation(
                         _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
             server_call_driver.add_due({
@@ -84,9 +85,8 @@ class Test(_common.RpcTest, unittest.TestCase):
         with server_call_condition:
             server_complete_rpc_start_batch_result = (
                 server_request_call_event.operation_call.start_server_batch([
-                    cygrpc.operation_receive_close_on_server(
-                        _common.EMPTY_FLAGS),
-                    cygrpc.operation_send_status_from_server(
+                    cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+                    cygrpc.SendStatusFromServerOperation(
                         _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
                         b'test details', _common.EMPTY_FLAGS),
                 ], server_complete_rpc_tag))
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index d0166a2b29f..b429a20ed75 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase):
         with self.client_condition:
             client_receive_initial_metadata_start_batch_result = (
                 client_call.start_client_batch([
-                    cygrpc.operation_receive_initial_metadata(
-                        _common.EMPTY_FLAGS),
+                    cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
                 ], client_receive_initial_metadata_tag))
             client_complete_rpc_start_batch_result = client_call.start_client_batch(
                 [
-                    cygrpc.operation_send_initial_metadata(
+                    cygrpc.SendInitialMetadataOperation(
                         _common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
-                    cygrpc.operation_send_close_from_client(
-                        _common.EMPTY_FLAGS),
-                    cygrpc.operation_receive_status_on_client(
-                        _common.EMPTY_FLAGS),
+                    cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
                 ], client_complete_rpc_tag)
             self.client_driver.add_due({
                 client_receive_initial_metadata_tag,
@@ -67,7 +64,7 @@ class Test(_common.RpcTest, unittest.TestCase):
         with self.server_condition:
             server_send_initial_metadata_start_batch_result = (
                 server_request_call_event.operation_call.start_server_batch([
-                    cygrpc.operation_send_initial_metadata(
+                    cygrpc.SendInitialMetadataOperation(
                         _common.INITIAL_METADATA, _common.EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
             self.server_driver.add_due({
@@ -79,11 +76,10 @@ class Test(_common.RpcTest, unittest.TestCase):
         with self.server_condition:
             server_complete_rpc_start_batch_result = (
                 server_request_call_event.operation_call.start_server_batch([
-                    cygrpc.operation_receive_close_on_server(
-                        _common.EMPTY_FLAGS),
-                    cygrpc.operation_send_status_from_server(
+                    cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+                    cygrpc.SendStatusFromServerOperation(
                         _common.TRAILING_METADATA, cygrpc.StatusCode.ok,
-                        b'test details', _common.EMPTY_FLAGS),
+                        'test details', _common.EMPTY_FLAGS),
                 ], server_complete_rpc_tag))
             self.server_driver.add_due({
                 server_complete_rpc_tag,
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index 1deb15ba032..a6d92f22030 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -158,15 +158,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
         with client_condition:
             client_receive_initial_metadata_start_batch_result = (
                 client_call.start_client_batch([
-                    cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+                    cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
                 ], client_receive_initial_metadata_tag))
             client_due.add(client_receive_initial_metadata_tag)
             client_complete_rpc_start_batch_result = (
                 client_call.start_client_batch([
-                    cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
-                                                           _EMPTY_FLAGS),
-                    cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-                    cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+                    cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+                                                        _EMPTY_FLAGS),
+                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
                 ], client_complete_rpc_tag))
             client_due.add(client_complete_rpc_tag)
 
@@ -175,12 +175,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
         with server_call_condition:
             server_send_initial_metadata_start_batch_result = (
                 server_rpc_event.operation_call.start_server_batch([
-                    cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
-                                                           _EMPTY_FLAGS),
+                    cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+                                                        _EMPTY_FLAGS),
                 ], server_send_initial_metadata_tag))
             server_send_first_message_start_batch_result = (
                 server_rpc_event.operation_call.start_server_batch([
-                    cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+                    cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
                 ], server_send_first_message_tag))
         server_send_initial_metadata_event = server_call_driver.event_with_tag(
             server_send_initial_metadata_tag)
@@ -189,12 +189,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
         with server_call_condition:
             server_send_second_message_start_batch_result = (
                 server_rpc_event.operation_call.start_server_batch([
-                    cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+                    cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
                 ], server_send_second_message_tag))
             server_complete_rpc_start_batch_result = (
                 server_rpc_event.operation_call.start_server_batch([
-                    cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
-                    cygrpc.operation_send_status_from_server(
+                    cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+                    cygrpc.SendStatusFromServerOperation(
                         (), cygrpc.StatusCode.ok, b'test details',
                         _EMPTY_FLAGS),
                 ], server_complete_rpc_tag))
@@ -208,7 +208,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
             client_receive_first_message_tag = 'client_receive_first_message_tag'
             client_receive_first_message_start_batch_result = (
                 client_call.start_client_batch([
-                    cygrpc.operation_receive_message(_EMPTY_FLAGS),
+                    cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
                 ], client_receive_first_message_tag))
             client_due.add(client_receive_first_message_tag)
         client_receive_first_message_event = client_driver.event_with_tag(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
new file mode 100644
index 00000000000..12bf40be6b3
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
@@ -0,0 +1,49 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test servers at the level of the Cython API."""
+
+import threading
+import time
+import unittest
+
+from grpc._cython import cygrpc
+
+
+class Test(unittest.TestCase):
+
+    def test_lonely_server(self):
+        server_call_completion_queue = cygrpc.CompletionQueue()
+        server_shutdown_completion_queue = cygrpc.CompletionQueue()
+        server = cygrpc.Server(cygrpc.ChannelArgs([]))
+        server.register_completion_queue(server_call_completion_queue)
+        server.register_completion_queue(server_shutdown_completion_queue)
+        port = server.add_http2_port(b'[::]:0')
+        server.start()
+
+        server_request_call_tag = 'server_request_call_tag'
+        server_request_call_start_batch_result = server.request_call(
+            server_call_completion_queue, server_call_completion_queue,
+            server_request_call_tag)
+
+        time.sleep(4)
+
+        server_shutdown_tag = 'server_shutdown_tag'
+        server_shutdown_result = server.shutdown(
+            server_shutdown_completion_queue, server_shutdown_tag)
+        server_request_call_event = server_call_completion_queue.poll()
+        server_shutdown_event = server_shutdown_completion_queue.poll()
+
+
+if __name__ == '__main__':
+    unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 4eda6854865..002fb9b7f40 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -35,11 +35,6 @@ def _metadata_plugin(context, callback):
 
 class TypeSmokeTest(unittest.TestCase):
 
-    def testOperationFlags(self):
-        operation = cygrpc.operation_send_message(b'asdf',
-                                                  cygrpc.WriteFlag.no_compress)
-        self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
-
     def testTimespec(self):
         now = time.time()
         now_timespec_a = cygrpc.Timespec(now)
@@ -170,7 +165,7 @@ class ServerClientMixin(object):
         SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
         SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
         SERVER_STATUS_CODE = cygrpc.StatusCode.ok
-        SERVER_STATUS_DETAILS = b'our work is never over'
+        SERVER_STATUS_DETAILS = 'our work is never over'
         REQUEST = b'in death a member of project mayhem has a name'
         RESPONSE = b'his name is robert paulson'
         METHOD = b'twinkies'
@@ -192,13 +187,13 @@ class ServerClientMixin(object):
             (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,),
             (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),)
         client_start_batch_result = client_call.start_client_batch([
-            cygrpc.operation_send_initial_metadata(client_initial_metadata,
-                                                   _EMPTY_FLAGS),
-            cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
-            cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-            cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
-            cygrpc.operation_receive_message(_EMPTY_FLAGS),
-            cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+            cygrpc.SendInitialMetadataOperation(client_initial_metadata,
+                                                _EMPTY_FLAGS),
+            cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
+            cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+            cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+            cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
         ], client_call_tag)
         self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
         client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -227,12 +222,12 @@ class ServerClientMixin(object):
         server_trailing_metadata = (
             (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),)
         server_start_batch_result = server_call.start_server_batch([
-            cygrpc.operation_send_initial_metadata(
+            cygrpc.SendInitialMetadataOperation(
                 server_initial_metadata,
-                _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
-            cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
-            cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
-            cygrpc.operation_send_status_from_server(
+                _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+            cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
+            cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+            cygrpc.SendStatusFromServerOperation(
                 server_trailing_metadata, SERVER_STATUS_CODE,
                 SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
         ], server_call_tag)
@@ -245,25 +240,24 @@ class ServerClientMixin(object):
         found_client_op_types = set()
         for client_result in client_event.batch_operations:
             # we expect each op type to be unique
-            self.assertNotIn(client_result.type, found_client_op_types)
-            found_client_op_types.add(client_result.type)
-            if client_result.type == cygrpc.OperationType.receive_initial_metadata:
+            self.assertNotIn(client_result.type(), found_client_op_types)
+            found_client_op_types.add(client_result.type())
+            if client_result.type(
+            ) == cygrpc.OperationType.receive_initial_metadata:
                 self.assertTrue(
                     test_common.metadata_transmitted(
                         server_initial_metadata,
-                        client_result.received_metadata))
-            elif client_result.type == cygrpc.OperationType.receive_message:
-                self.assertEqual(RESPONSE,
-                                 client_result.received_message.bytes())
-            elif client_result.type == cygrpc.OperationType.receive_status_on_client:
+                        client_result.initial_metadata()))
+            elif client_result.type() == cygrpc.OperationType.receive_message:
+                self.assertEqual(RESPONSE, client_result.message())
+            elif client_result.type(
+            ) == cygrpc.OperationType.receive_status_on_client:
                 self.assertTrue(
                     test_common.metadata_transmitted(
                         server_trailing_metadata,
-                        client_result.received_metadata))
-                self.assertEqual(SERVER_STATUS_DETAILS,
-                                 client_result.received_status_details)
-                self.assertEqual(SERVER_STATUS_CODE,
-                                 client_result.received_status_code)
+                        client_result.trailing_metadata()))
+                self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
+                self.assertEqual(SERVER_STATUS_CODE, client_result.code())
         self.assertEqual(
             set([
                 cygrpc.OperationType.send_initial_metadata,
@@ -277,13 +271,13 @@ class ServerClientMixin(object):
         self.assertEqual(5, len(server_event.batch_operations))
         found_server_op_types = set()
         for server_result in server_event.batch_operations:
-            self.assertNotIn(client_result.type, found_server_op_types)
-            found_server_op_types.add(server_result.type)
-            if server_result.type == cygrpc.OperationType.receive_message:
-                self.assertEqual(REQUEST,
-                                 server_result.received_message.bytes())
-            elif server_result.type == cygrpc.OperationType.receive_close_on_server:
-                self.assertFalse(server_result.received_cancelled)
+            self.assertNotIn(client_result.type(), found_server_op_types)
+            found_server_op_types.add(server_result.type())
+            if server_result.type() == cygrpc.OperationType.receive_message:
+                self.assertEqual(REQUEST, server_result.message())
+            elif server_result.type(
+            ) == cygrpc.OperationType.receive_close_on_server:
+                self.assertFalse(server_result.cancelled())
         self.assertEqual(
             set([
                 cygrpc.OperationType.send_initial_metadata,
@@ -319,9 +313,8 @@ class ServerClientMixin(object):
                                             cygrpc_deadline, description)
 
         client_event_future = perform_client_operations([
-            cygrpc.operation_send_initial_metadata(empty_metadata,
-                                                   _EMPTY_FLAGS),
-            cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+            cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
+            cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
         ], "Client prologue")
 
         request_event = self.server_completion_queue.poll(cygrpc_deadline)
@@ -333,8 +326,7 @@ class ServerClientMixin(object):
                                             cygrpc_deadline, description)
 
         server_event_future = perform_server_operations([
-            cygrpc.operation_send_initial_metadata(empty_metadata,
-                                                   _EMPTY_FLAGS),
+            cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
         ], "Server prologue")
 
         client_event_future.result()  # force completion
@@ -343,12 +335,12 @@ class ServerClientMixin(object):
         # Messaging
         for _ in range(10):
             client_event_future = perform_client_operations([
-                cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
-                cygrpc.operation_receive_message(_EMPTY_FLAGS),
+                cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
             ], "Client message")
             server_event_future = perform_server_operations([
-                cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
-                cygrpc.operation_receive_message(_EMPTY_FLAGS),
+                cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
             ], "Server receive")
 
             client_event_future.result()  # force completion
@@ -356,13 +348,13 @@ class ServerClientMixin(object):
 
         # Epilogue
         client_event_future = perform_client_operations([
-            cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
-            cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+            cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
         ], "Client epilogue")
 
         server_event_future = perform_server_operations([
-            cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
-            cygrpc.operation_send_status_from_server(
+            cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+            cygrpc.SendStatusFromServerOperation(
                 empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
         ], "Server epilogue")