Merge github.com:grpc/grpc into error

pull/6897/head
Craig Tiller 9 years ago
commit 81be250ee7
  1. 5
      BUILD
  2. 4
      Makefile
  3. 1
      build.yaml
  4. 73
      examples/python/helloworld/helloworld_pb2.py
  5. 189
      examples/python/route_guide/route_guide_pb2.py
  6. 1
      gRPC.podspec
  7. 1
      grpc.gemspec
  8. 32
      include/grpc++/impl/codegen/core_codegen_interface.h
  9. 161
      include/grpc++/impl/codegen/proto_utils.h
  10. 21
      include/grpc/byte_buffer_reader.h
  11. 57
      include/grpc/impl/codegen/byte_buffer_reader.h
  12. 4
      include/grpc/impl/codegen/grpc_types.h
  13. 1
      package.xml
  14. 47
      src/compiler/python_generator.cc
  15. 4
      src/core/lib/channel/channel_args.h
  16. 204
      src/cpp/common/core_codegen.cc
  17. 26
      src/cpp/common/core_codegen.h
  18. 63
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  19. 2
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  20. 191
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
  21. 441
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  22. 183
      src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
  23. 5
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  24. 61
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  25. 48
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  26. 4
      src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
  27. 17
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  28. 4
      src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
  29. 7
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  30. 1
      src/csharp/tests.json
  31. 4
      src/proto/grpc/testing/echo.proto
  32. 4
      src/ruby/.rubocop.yml
  33. 17
      src/ruby/lib/grpc/generic/rpc_server.rb
  34. 2
      test/core/end2end/fixtures/h2_census.c
  35. 1
      test/core/surface/public_headers_must_be_c89.c
  36. 338
      test/cpp/end2end/async_end2end_test.cc
  37. 3
      tools/distrib/check_include_guards.py
  38. 1
      tools/doxygen/Doxyfile.c++
  39. 1
      tools/doxygen/Doxyfile.c++.internal
  40. 1
      tools/doxygen/Doxyfile.core
  41. 1
      tools/doxygen/Doxyfile.core.internal
  42. 2
      tools/run_tests/sources_and_headers.json
  43. 1
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  44. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  45. 1
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  46. 3
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
  47. 1
      vsprojects/vcxproj/grpc/grpc.vcxproj
  48. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  49. 1
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  50. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
  51. 1
      vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj
  52. 3
      vsprojects/vcxproj/test/codegen_test_full/codegen_test_full.vcxproj.filters
  53. 1
      vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj
  54. 3
      vsprojects/vcxproj/test/codegen_test_minimal/codegen_test_minimal.vcxproj.filters

@ -465,6 +465,7 @@ cc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@ -778,6 +779,7 @@ cc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@ -950,6 +952,7 @@ cc_library(
"include/grpc++/impl/codegen/sync_stream.h",
"include/grpc++/impl/codegen/time.h",
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@ -1095,6 +1098,7 @@ cc_library(
"include/grpc++/impl/codegen/sync_stream.h",
"include/grpc++/impl/codegen/time.h",
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@ -1485,6 +1489,7 @@ objc_library(
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",

@ -2651,6 +2651,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@ -2972,6 +2973,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@ -3258,6 +3260,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \
@ -3561,6 +3564,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \

@ -353,6 +353,7 @@ filegroups:
- name: grpc_codegen
public_headers:
- include/grpc/impl/codegen/byte_buffer.h
- include/grpc/impl/codegen/byte_buffer_reader.h
- include/grpc/impl/codegen/compression_types.h
- include/grpc/impl/codegen/connectivity_state.h
- include/grpc/impl/codegen/grpc_types.h

@ -1,6 +1,8 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: helloworld.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@ -17,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='helloworld.proto',
package='helloworld',
syntax='proto3',
serialized_pb=b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x18\n\x10io.grpc.examples\xa2\x02\x03HLWb\x06proto3'
serialized_pb=_b('\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -34,7 +36,7 @@ _HELLOREQUEST = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='name', full_name='helloworld.HelloRequest.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@ -65,7 +67,7 @@ _HELLOREPLY = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='message', full_name='helloworld.HelloReply.message', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@ -104,69 +106,28 @@ _sym_db.RegisterMessage(HelloReply)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), b'\n\020io.grpc.examples\242\002\003HLW')
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW'))
import abc
import six
from grpc.beta import implementations as beta_implementations
from grpc.early_adopter import implementations as early_adopter_implementations
from grpc.framework.alpha import utilities as alpha_utilities
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
class EarlyAdopterGreeterServicer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def SayHello(self, request, context):
raise NotImplementedError()
class EarlyAdopterGreeterServer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
raise NotImplementedError()
class EarlyAdopterGreeterStub(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def SayHello(self, request):
raise NotImplementedError()
SayHello.async = None
def early_adopter_create_Greeter_server(servicer, port, private_key=None, certificate_chain=None):
import helloworld_pb2
import helloworld_pb2
method_service_descriptions = {
"SayHello": alpha_utilities.unary_unary_service_description(
servicer.SayHello,
helloworld_pb2.HelloRequest.FromString,
helloworld_pb2.HelloReply.SerializeToString,
),
}
return early_adopter_implementations.server("helloworld.Greeter", method_service_descriptions, port, private_key=private_key, certificate_chain=certificate_chain)
def early_adopter_create_Greeter_stub(host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, server_host_override=None):
import helloworld_pb2
import helloworld_pb2
method_invocation_descriptions = {
"SayHello": alpha_utilities.unary_unary_invocation_description(
helloworld_pb2.HelloRequest.SerializeToString,
helloworld_pb2.HelloReply.FromString,
),
}
return early_adopter_implementations.stub("helloworld.Greeter", method_invocation_descriptions, host, port, metadata_transformer=metadata_transformer, secure=secure, root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain, server_host_override=server_host_override)
class BetaGreeterServicer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
"""The greeting service definition.
"""
def SayHello(self, request, context):
raise NotImplementedError()
"""Sends a greeting
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaGreeterStub(object):
"""The interface to which stubs will conform."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
"""The greeting service definition.
"""
def SayHello(self, request, timeout):
"""Sends a greeting
"""
raise NotImplementedError()
SayHello.future = None

@ -1,6 +1,8 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: route_guide.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
@ -17,7 +19,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='route_guide.proto',
package='routeguide',
syntax='proto3',
serialized_pb=b'\n\x11route_guide.proto\x12\nrouteguide\",\n\x05Point\x12\x10\n\x08latitude\x18\x01 \x01(\x05\x12\x11\n\tlongitude\x18\x02 \x01(\x05\"I\n\tRectangle\x12\x1d\n\x02lo\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x1d\n\x02hi\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"<\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x08location\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"A\n\tRouteNote\x12#\n\x08location\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x0f\n\x07message\x18\x02 \x01(\t\"b\n\x0cRouteSummary\x12\x13\n\x0bpoint_count\x18\x01 \x01(\x05\x12\x15\n\rfeature_count\x18\x02 \x01(\x05\x12\x10\n\x08\x64istance\x18\x03 \x01(\x05\x12\x14\n\x0c\x65lapsed_time\x18\x04 \x01(\x05\x32\x85\x02\n\nRouteGuide\x12\x36\n\nGetFeature\x12\x11.routeguide.Point\x1a\x13.routeguide.Feature\"\x00\x12>\n\x0cListFeatures\x12\x15.routeguide.Rectangle\x1a\x13.routeguide.Feature\"\x00\x30\x01\x12>\n\x0bRecordRoute\x12\x11.routeguide.Point\x1a\x18.routeguide.RouteSummary\"\x00(\x01\x12?\n\tRouteChat\x12\x15.routeguide.RouteNote\x1a\x15.routeguide.RouteNote\"\x00(\x01\x30\x01\x42\x0f\n\x07\x65x.grpc\xa2\x02\x03RTGb\x06proto3'
serialized_pb=_b('\n\x11route_guide.proto\x12\nrouteguide\",\n\x05Point\x12\x10\n\x08latitude\x18\x01 \x01(\x05\x12\x11\n\tlongitude\x18\x02 \x01(\x05\"I\n\tRectangle\x12\x1d\n\x02lo\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x1d\n\x02hi\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"<\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x08location\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"A\n\tRouteNote\x12#\n\x08location\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x0f\n\x07message\x18\x02 \x01(\t\"b\n\x0cRouteSummary\x12\x13\n\x0bpoint_count\x18\x01 \x01(\x05\x12\x15\n\rfeature_count\x18\x02 \x01(\x05\x12\x10\n\x08\x64istance\x18\x03 \x01(\x05\x12\x14\n\x0c\x65lapsed_time\x18\x04 \x01(\x05\x32\x85\x02\n\nRouteGuide\x12\x36\n\nGetFeature\x12\x11.routeguide.Point\x1a\x13.routeguide.Feature\"\x00\x12>\n\x0cListFeatures\x12\x15.routeguide.Rectangle\x1a\x13.routeguide.Feature\"\x00\x30\x01\x12>\n\x0bRecordRoute\x12\x11.routeguide.Point\x1a\x18.routeguide.RouteSummary\"\x00(\x01\x12?\n\tRouteChat\x12\x15.routeguide.RouteNote\x1a\x15.routeguide.RouteNote\"\x00(\x01\x30\x01\x42\x36\n\x1bio.grpc.examples.routeguideB\x0fRouteGuideProtoP\x01\xa2\x02\x03RTGb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@ -110,7 +112,7 @@ _FEATURE = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='name', full_name='routeguide.Feature.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@ -155,7 +157,7 @@ _ROUTENOTE = _descriptor.Descriptor(
_descriptor.FieldDescriptor(
name='message', full_name='routeguide.RouteNote.message', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@ -274,149 +276,86 @@ _sym_db.RegisterMessage(RouteSummary)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), b'\n\007ex.grpc\242\002\003RTG')
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n\033io.grpc.examples.routeguideB\017RouteGuideProtoP\001\242\002\003RTG'))
import abc
import six
from grpc.beta import implementations as beta_implementations
from grpc.early_adopter import implementations as early_adopter_implementations
from grpc.framework.alpha import utilities as alpha_utilities
from grpc.beta import interfaces as beta_interfaces
from grpc.framework.common import cardinality
from grpc.framework.interfaces.face import utilities as face_utilities
class EarlyAdopterRouteGuideServicer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def GetFeature(self, request, context):
raise NotImplementedError()
@abc.abstractmethod
def ListFeatures(self, request, context):
raise NotImplementedError()
@abc.abstractmethod
def RecordRoute(self, request_iterator, context):
raise NotImplementedError()
@abc.abstractmethod
def RouteChat(self, request_iterator, context):
raise NotImplementedError()
class EarlyAdopterRouteGuideServer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
raise NotImplementedError()
class EarlyAdopterRouteGuideStub(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def GetFeature(self, request):
raise NotImplementedError()
GetFeature.async = None
@abc.abstractmethod
def ListFeatures(self, request):
raise NotImplementedError()
ListFeatures.async = None
@abc.abstractmethod
def RecordRoute(self, request_iterator):
raise NotImplementedError()
RecordRoute.async = None
@abc.abstractmethod
def RouteChat(self, request_iterator):
raise NotImplementedError()
RouteChat.async = None
def early_adopter_create_RouteGuide_server(servicer, port, private_key=None, certificate_chain=None):
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
method_service_descriptions = {
"GetFeature": alpha_utilities.unary_unary_service_description(
servicer.GetFeature,
route_guide_pb2.Point.FromString,
route_guide_pb2.Feature.SerializeToString,
),
"ListFeatures": alpha_utilities.unary_stream_service_description(
servicer.ListFeatures,
route_guide_pb2.Rectangle.FromString,
route_guide_pb2.Feature.SerializeToString,
),
"RecordRoute": alpha_utilities.stream_unary_service_description(
servicer.RecordRoute,
route_guide_pb2.Point.FromString,
route_guide_pb2.RouteSummary.SerializeToString,
),
"RouteChat": alpha_utilities.stream_stream_service_description(
servicer.RouteChat,
route_guide_pb2.RouteNote.FromString,
route_guide_pb2.RouteNote.SerializeToString,
),
}
return early_adopter_implementations.server("routeguide.RouteGuide", method_service_descriptions, port, private_key=private_key, certificate_chain=certificate_chain)
def early_adopter_create_RouteGuide_stub(host, port, metadata_transformer=None, secure=False, root_certificates=None, private_key=None, certificate_chain=None, server_host_override=None):
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
import route_guide_pb2
method_invocation_descriptions = {
"GetFeature": alpha_utilities.unary_unary_invocation_description(
route_guide_pb2.Point.SerializeToString,
route_guide_pb2.Feature.FromString,
),
"ListFeatures": alpha_utilities.unary_stream_invocation_description(
route_guide_pb2.Rectangle.SerializeToString,
route_guide_pb2.Feature.FromString,
),
"RecordRoute": alpha_utilities.stream_unary_invocation_description(
route_guide_pb2.Point.SerializeToString,
route_guide_pb2.RouteSummary.FromString,
),
"RouteChat": alpha_utilities.stream_stream_invocation_description(
route_guide_pb2.RouteNote.SerializeToString,
route_guide_pb2.RouteNote.FromString,
),
}
return early_adopter_implementations.stub("routeguide.RouteGuide", method_invocation_descriptions, host, port, metadata_transformer=metadata_transformer, secure=secure, root_certificates=root_certificates, private_key=private_key, certificate_chain=certificate_chain, server_host_override=server_host_override)
class BetaRouteGuideServicer(object):
"""<fill me in later!>"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
"""Interface exported by the server.
"""
def GetFeature(self, request, context):
raise NotImplementedError()
@abc.abstractmethod
"""A simple RPC.
Obtains the feature at a given position.
A feature with an empty name is returned if there's no feature at the given
position.
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def ListFeatures(self, request, context):
raise NotImplementedError()
@abc.abstractmethod
"""A server-to-client streaming RPC.
Obtains the Features available within the given Rectangle. Results are
streamed rather than returned at once (e.g. in a response message with a
repeated field), as the rectangle may cover a large area and contain a
huge number of features.
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def RecordRoute(self, request_iterator, context):
raise NotImplementedError()
@abc.abstractmethod
"""A client-to-server streaming RPC.
Accepts a stream of Points on a route being traversed, returning a
RouteSummary when traversal is completed.
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
def RouteChat(self, request_iterator, context):
raise NotImplementedError()
"""A Bidirectional streaming RPC.
Accepts a stream of RouteNotes sent while a route is being traversed,
while receiving other RouteNotes (e.g. from other users).
"""
context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)
class BetaRouteGuideStub(object):
"""The interface to which stubs will conform."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
"""Interface exported by the server.
"""
def GetFeature(self, request, timeout):
"""A simple RPC.
Obtains the feature at a given position.
A feature with an empty name is returned if there's no feature at the given
position.
"""
raise NotImplementedError()
GetFeature.future = None
@abc.abstractmethod
def ListFeatures(self, request, timeout):
"""A server-to-client streaming RPC.
Obtains the Features available within the given Rectangle. Results are
streamed rather than returned at once (e.g. in a response message with a
repeated field), as the rectangle may cover a large area and contain a
huge number of features.
"""
raise NotImplementedError()
@abc.abstractmethod
def RecordRoute(self, request_iterator, timeout):
"""A client-to-server streaming RPC.
Accepts a stream of Points on a route being traversed, returning a
RouteSummary when traversal is completed.
"""
raise NotImplementedError()
RecordRoute.future = None
@abc.abstractmethod
def RouteChat(self, request_iterator, timeout):
"""A Bidirectional streaming RPC.
Accepts a stream of RouteNotes sent while a route is being traversed,
while receiving other RouteNotes (e.g. from other users).
"""
raise NotImplementedError()
def beta_create_RouteGuide_server(servicer, pool=None, pool_size=None, default_timeout=None, maximum_timeout=None):

@ -306,6 +306,7 @@ Pod::Spec.new do |s|
'include/grpc/grpc.h',
'include/grpc/status.h',
'include/grpc/impl/codegen/byte_buffer.h',
'include/grpc/impl/codegen/byte_buffer_reader.h',
'include/grpc/impl/codegen/compression_types.h',
'include/grpc/impl/codegen/connectivity_state.h',
'include/grpc/impl/codegen/grpc_types.h',

@ -149,6 +149,7 @@ Gem::Specification.new do |s|
s.files += %w( include/grpc/grpc.h )
s.files += %w( include/grpc/status.h )
s.files += %w( include/grpc/impl/codegen/byte_buffer.h )
s.files += %w( include/grpc/impl/codegen/byte_buffer_reader.h )
s.files += %w( include/grpc/impl/codegen/compression_types.h )
s.files += %w( include/grpc/impl/codegen/connectivity_state.h )
s.files += %w( include/grpc/impl/codegen/grpc_types.h )

@ -49,18 +49,6 @@ namespace grpc {
/// \warning This interface should be considered internal and private.
class CoreCodegenInterface {
public:
// Serialize the msg into a buffer created inside the function. The caller
// should destroy the returned buffer when done with it. If serialization
// fails,
// false is returned and buffer is left unchanged.
virtual Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer) = 0;
// The caller keeps ownership of buffer and msg.
virtual Status DeserializeProto(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) = 0;
/// Upon a failed assertion, log the error.
virtual void assert_fail(const char* failed_assertion) = 0;
@ -76,9 +64,29 @@ class CoreCodegenInterface {
virtual void gpr_free(void* p) = 0;
virtual void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) = 0;
virtual void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) = 0;
virtual void grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) = 0;
virtual int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
gpr_slice* slice) = 0;
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice,
size_t nslices) = 0;
virtual gpr_slice gpr_slice_malloc(size_t length) = 0;
virtual void gpr_slice_unref(gpr_slice slice) = 0;
virtual gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) = 0;
virtual void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) = 0;
virtual void gpr_slice_buffer_pop(gpr_slice_buffer* sb) = 0;
virtual void grpc_metadata_array_init(grpc_metadata_array* array) = 0;
virtual void grpc_metadata_array_destroy(grpc_metadata_array* array) = 0;
virtual const Status& ok() = 0;
virtual const Status& cancelled() = 0;
virtual gpr_timespec gpr_inf_future(gpr_clock_type type) = 0;
};

@ -41,26 +41,179 @@
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/status.h>
#include <grpc/impl/codegen/byte_buffer.h>
#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/impl/codegen/log.h>
#include <grpc/impl/codegen/slice.h>
namespace grpc {
extern CoreCodegenInterface* g_core_codegen_interface;
namespace {
const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
if (have_backup_) {
g_core_codegen_interface->gpr_slice_unref(backup_slice_);
}
}
bool Next(void** data, int* size) GRPC_OVERRIDE {
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
slice_ = g_core_codegen_interface->gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
void BackUp(int count) GRPC_OVERRIDE {
g_core_codegen_interface->gpr_slice_buffer_pop(slice_buffer_);
if (count == block_size_) {
backup_slice_ = slice_;
} else {
backup_slice_ = g_core_codegen_interface->gpr_slice_split_tail(
&slice_, GPR_SLICE_LENGTH(slice_) - count);
g_core_codegen_interface->gpr_slice_buffer_add(slice_buffer_, slice_);
}
have_backup_ = true;
byte_count_ -= count;
}
grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
private:
const int block_size_;
int64_t byte_count_;
gpr_slice_buffer* slice_buffer_;
bool have_backup_;
gpr_slice backup_slice_;
gpr_slice slice_;
};
class GrpcBufferReader GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_, buffer);
}
~GrpcBufferReader() GRPC_OVERRIDE {
g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
}
bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_;
GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
*size = (int)backup_count_;
backup_count_ = 0;
return true;
}
if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
&slice_)) {
return false;
}
g_core_codegen_interface->gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_CODEGEN_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
return true;
}
void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
bool Skip(int count) GRPC_OVERRIDE {
const void* data;
int size;
while (Next(&data, &size)) {
if (size >= count) {
BackUp(size - count);
return true;
}
// size < count;
count -= size;
}
// error or we have too large count;
return false;
}
grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
return byte_count_ - backup_count_;
}
private:
int64_t byte_count_;
int64_t backup_count_;
grpc_byte_buffer_reader reader_;
gpr_slice slice_;
};
} // namespace
template <class T>
class SerializationTraits<T, typename std::enable_if<std::is_base_of<
grpc::protobuf::Message, T>::value>::type> {
public:
static Status Serialize(const grpc::protobuf::Message& msg,
grpc_byte_buffer** buffer, bool* own_buffer) {
grpc_byte_buffer** bp, bool* own_buffer) {
*own_buffer = true;
return g_core_codegen_interface->SerializeProto(msg, buffer);
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = g_core_codegen_interface->gpr_slice_malloc(byte_size);
GPR_CODEGEN_ASSERT(
GPR_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
*bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
g_core_codegen_interface->gpr_slice_unref(slice);
return g_core_codegen_interface->ok();
} else {
GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? g_core_codegen_interface->ok()
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
static Status Deserialize(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) {
return g_core_codegen_interface->DeserializeProto(buffer, msg,
max_message_size);
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
Status result = g_core_codegen_interface->ok();
{
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
return result;
}
};

@ -34,25 +34,6 @@
#ifndef GRPC_BYTE_BUFFER_READER_H
#define GRPC_BYTE_BUFFER_READER_H
#include <grpc/byte_buffer.h>
#include <grpc/grpc.h>
#ifdef __cplusplus
extern "C" {
#endif
struct grpc_byte_buffer_reader {
grpc_byte_buffer *buffer_in;
grpc_byte_buffer *buffer_out;
/* Different current objects correspond to different types of byte buffers */
union {
/* Index into a slice buffer's array of slices */
unsigned index;
} current;
};
#ifdef __cplusplus
}
#endif
#include <grpc/impl/codegen/byte_buffer_reader.h>
#endif /* GRPC_BYTE_BUFFER_READER_H */

@ -0,0 +1,57 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H
#define GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H
#include <grpc/impl/codegen/byte_buffer.h>
#ifdef __cplusplus
extern "C" {
#endif
struct grpc_byte_buffer_reader {
grpc_byte_buffer *buffer_in;
grpc_byte_buffer *buffer_out;
/* Different current objects correspond to different types of byte buffers */
union {
/* Index into a slice buffer's array of slices */
unsigned index;
} current;
};
#ifdef __cplusplus
}
#endif
#endif /* GRPC_IMPL_CODEGEN_BYTE_BUFFER_READER_H */

@ -307,7 +307,9 @@ typedef enum {
GRPC_OP_RECV_STATUS_ON_CLIENT,
/** Receive close on the server: one and only one must be made on the
server.
This op completes after the close has been received by the server. */
This op completes after the close has been received by the server.
This operation always succeeds, meaning ops paired with this operation
will also appear to succeed, even though they may not have. */
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;

@ -156,6 +156,7 @@
<file baseinstalldir="/" name="include/grpc/grpc.h" role="src" />
<file baseinstalldir="/" name="include/grpc/status.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/byte_buffer.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/byte_buffer_reader.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/compression_types.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/connectivity_state.h" role="src" />
<file baseinstalldir="/" name="include/grpc/impl/codegen/grpc_types.h" role="src" />

@ -182,18 +182,40 @@ bool GetModuleAndMessagePath(const Descriptor* type,
return true;
}
// Get all comments (leading, leading_detached, trailing) and print them as a
// docstring. Any leading space of a line will be removed, but the line wrapping
// will not be changed.
template <typename DescriptorType>
static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
std::vector<grpc::string> comments;
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED,
&comments);
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING,
&comments);
grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING,
&comments);
if (comments.empty()) {
return;
}
printer->Print("\"\"\"");
for (auto it = comments.begin(); it != comments.end(); ++it) {
size_t start_pos = it->find_first_not_of(' ');
if (start_pos != grpc::string::npos) {
printer->Print(it->c_str() + start_pos);
}
printer->Print("\n");
}
printer->Print("\"\"\"\n");
}
bool PrintBetaServicer(const ServiceDescriptor* service,
Printer* out) {
grpc::string doc = "<fill me in later!>";
map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
out->Print("\n");
out->Print(dict, "class Beta$Service$Servicer(object):\n");
out->Print("class Beta$Service$Servicer(object):\n", "Service",
service->name());
{
IndentScope raii_class_indent(out);
out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
auto meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@ -202,6 +224,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
"Method", meth->name(), "ArgName", arg_name);
{
IndentScope raii_method_indent(out);
PrintAllComments(meth, out);
out->Print("context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)\n");
}
}
@ -211,16 +234,11 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
bool PrintBetaStub(const ServiceDescriptor* service,
Printer* out) {
grpc::string doc = "The interface to which stubs will conform.";
map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
"Documentation", doc,
});
out->Print("\n");
out->Print(dict, "class Beta$Service$Stub(object):\n");
out->Print("class Beta$Service$Stub(object):\n", "Service", service->name());
{
IndentScope raii_class_indent(out);
out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@ -229,6 +247,7 @@ bool PrintBetaStub(const ServiceDescriptor* service,
out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n");
{
IndentScope raii_method_indent(out);
PrintAllComments(meth, out);
out->Print("raise NotImplementedError()\n");
}
if (!meth->server_streaming()) {

@ -56,10 +56,6 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
* is specified in channel args, otherwise returns 0. */
int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
/** Returns the compression algorithm set in \a a. */
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);

@ -48,124 +48,6 @@
#include "src/core/lib/profiling/timers.h"
namespace {
const int kGrpcBufferWriterMaxBufferLength = 8192;
class GrpcBufferWriter GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyOutputStream {
public:
explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {
if (have_backup_) {
gpr_slice_unref(backup_slice_);
}
}
bool Next(void** data, int* size) GRPC_OVERRIDE {
if (have_backup_) {
slice_ = backup_slice_;
have_backup_ = false;
} else {
slice_ = gpr_slice_malloc(block_size_);
}
*data = GPR_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
gpr_slice_buffer_add(slice_buffer_, slice_);
return true;
}
void BackUp(int count) GRPC_OVERRIDE {
gpr_slice_buffer_pop(slice_buffer_);
if (count == block_size_) {
backup_slice_ = slice_;
} else {
backup_slice_ =
gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
gpr_slice_buffer_add(slice_buffer_, slice_);
}
have_backup_ = true;
byte_count_ -= count;
}
grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
private:
const int block_size_;
int64_t byte_count_;
gpr_slice_buffer* slice_buffer_;
bool have_backup_;
gpr_slice backup_slice_;
gpr_slice slice_;
};
class GrpcBufferReader GRPC_FINAL
: public ::grpc::protobuf::io::ZeroCopyInputStream {
public:
explicit GrpcBufferReader(grpc_byte_buffer* buffer)
: byte_count_(0), backup_count_(0) {
grpc_byte_buffer_reader_init(&reader_, buffer);
}
~GrpcBufferReader() GRPC_OVERRIDE {
grpc_byte_buffer_reader_destroy(&reader_);
}
bool Next(const void** data, int* size) GRPC_OVERRIDE {
if (backup_count_ > 0) {
*data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
backup_count_;
GPR_ASSERT(backup_count_ <= INT_MAX);
*size = (int)backup_count_;
backup_count_ = 0;
return true;
}
if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
return false;
}
gpr_slice_unref(slice_);
*data = GPR_SLICE_START_PTR(slice_);
// On win x64, int is only 32bit
GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
return true;
}
void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
bool Skip(int count) GRPC_OVERRIDE {
const void* data;
int size;
while (Next(&data, &size)) {
if (size >= count) {
BackUp(size - count);
return true;
}
// size < count;
count -= size;
}
// error or we have too large count;
return false;
}
grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
return byte_count_ - backup_count_;
}
private:
int64_t byte_count_;
int64_t backup_count_;
grpc_byte_buffer_reader reader_;
gpr_slice slice_;
};
} // namespace
namespace grpc {
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
@ -192,6 +74,44 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) {
::grpc_byte_buffer_reader_init(reader, buffer);
}
void CoreCodegen::grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) {
::grpc_byte_buffer_reader_destroy(reader);
}
int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
gpr_slice* slice) {
return ::grpc_byte_buffer_reader_next(reader, slice);
}
grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(gpr_slice* slice,
size_t nslices) {
return ::grpc_raw_byte_buffer_create(slice, nslices);
}
gpr_slice CoreCodegen::gpr_slice_malloc(size_t length) {
return ::gpr_slice_malloc(length);
}
void CoreCodegen::gpr_slice_unref(gpr_slice slice) { ::gpr_slice_unref(slice); }
gpr_slice CoreCodegen::gpr_slice_split_tail(gpr_slice* s, size_t split) {
return ::gpr_slice_split_tail(s, split);
}
void CoreCodegen::gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) {
::gpr_slice_buffer_add(sb, slice);
}
void CoreCodegen::gpr_slice_buffer_pop(gpr_slice_buffer* sb) {
::gpr_slice_buffer_pop(sb);
}
void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) {
::grpc_metadata_array_init(array);
}
@ -200,6 +120,10 @@ void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) {
::grpc_metadata_array_destroy(array);
}
const Status& CoreCodegen::ok() { return grpc::Status::OK; }
const Status& CoreCodegen::cancelled() { return grpc::Status::CANCELLED; }
gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) {
return ::gpr_inf_future(type);
}
@ -209,48 +133,4 @@ void CoreCodegen::assert_fail(const char* failed_assertion) {
abort();
}
Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp) {
GPR_TIMER_SCOPE("SerializeProto", 0);
int byte_size = msg.ByteSize();
if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
gpr_slice slice = gpr_slice_malloc(byte_size);
GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
*bp = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return Status::OK;
} else {
GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
return msg.SerializeToZeroCopyStream(&writer)
? Status::OK
: Status(StatusCode::INTERNAL, "Failed to serialize message");
}
}
Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) {
GPR_TIMER_SCOPE("DeserializeProto", 0);
if (buffer == nullptr) {
return Status(StatusCode::INTERNAL, "No payload");
}
Status result = Status::OK;
{
GrpcBufferReader reader(buffer);
::grpc::protobuf::io::CodedInputStream decoder(&reader);
if (max_message_size > 0) {
decoder.SetTotalBytesLimit(max_message_size, max_message_size);
}
if (!msg->ParseFromCodedStream(&decoder)) {
result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
}
if (!decoder.ConsumedEntireMessage()) {
result = Status(StatusCode::INTERNAL, "Did not read entire message");
}
}
grpc_byte_buffer_destroy(buffer);
return result;
}
} // namespace grpc

@ -42,13 +42,6 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
private:
Status SerializeProto(const grpc::protobuf::Message& msg,
grpc_byte_buffer** bp) override;
Status DeserializeProto(grpc_byte_buffer* buffer,
grpc::protobuf::Message* msg,
int max_message_size) override;
grpc_completion_queue* grpc_completion_queue_create(void* reserved) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
@ -60,11 +53,30 @@ class CoreCodegen : public CoreCodegenInterface {
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
grpc_byte_buffer* buffer) override;
void grpc_byte_buffer_reader_destroy(
grpc_byte_buffer_reader* reader) override;
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
gpr_slice* slice) override;
grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice,
size_t nslices) override;
gpr_slice gpr_slice_malloc(size_t length) override;
void gpr_slice_unref(gpr_slice slice) override;
gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) override;
void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) override;
void gpr_slice_buffer_pop(gpr_slice_buffer* sb) override;
void grpc_metadata_array_init(grpc_metadata_array* array) override;
void grpc_metadata_array_destroy(grpc_metadata_array* array) override;
gpr_timespec gpr_inf_future(gpr_clock_type type) override;
virtual const Status& ok() override;
virtual const Status& cancelled() override;
void assert_fail(const char* failed_assertion) override;
};

@ -166,6 +166,37 @@ namespace Grpc.Core.Tests
Assert.IsNotNull("xyz", call.GetTrailers()[0].Key);
}
[Test]
public async Task ServerStreamingCall_EndOfStreamIsIdempotent()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
Assert.IsFalse(await call.ResponseStream.MoveNext());
Assert.IsFalse(await call.ResponseStream.MoveNext());
}
[Test]
public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
context.Status = new Status(StatusCode.InvalidArgument, "");
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
// attempting MoveNext again should result in throwing the same exception.
var ex2 = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode);
}
[Test]
public async Task DuplexStreamingCall()
{
@ -208,6 +239,38 @@ namespace Grpc.Core.Tests
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
}
[Test]
public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
{
var handlerStartedBarrier = new TaskCompletionSource<object>();
var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
var successTcs = new TaskCompletionSource<string>();
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
handlerStartedBarrier.SetResult(null);
// wait for cancellation to be delivered.
context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
await cancelNotificationReceivedBarrier.Task;
var moveNextResult = await requestStream.MoveNext();
successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
return "";
});
var cts = new CancellationTokenSource();
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
await handlerStartedBarrier.Task;
cts.Cancel();
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
Assert.AreEqual("SUCCESS", await successTcs.Task);
}
[Test]
public async Task AsyncUnaryCall_EchoMetadata()
{

@ -84,6 +84,8 @@
<Compile Include="SanityTest.cs" />
<Compile Include="HalfcloseTest.cs" />
<Compile Include="NUnitMain.cs" />
<Compile Include="Internal\FakeNativeCall.cs" />
<Compile Include="Internal\AsyncCallServerTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,191 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// Uses fake native call to test interaction of <c>AsyncCallServer</c> wrapping code with C core in different situations.
/// </summary>
public class AsyncCallServerTest
{
Server server;
FakeNativeCall fakeCall;
AsyncCallServer<string, string> asyncCallServer;
[SetUp]
public void Init()
{
var environment = GrpcEnvironment.AddRef();
// Create a fake server just so we have an instance to refer to.
// The server won't actually be used at all.
server = new Server()
{
Ports = { { "localhost", 0, ServerCredentials.Insecure } }
};
server.Start();
fakeCall = new FakeNativeCall();
asyncCallServer = new AsyncCallServer<string, string>(
Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
environment,
server);
asyncCallServer.InitializeForTesting(fakeCall);
}
[TearDown]
public void Cleanup()
{
server.ShutdownAsync().Wait();
GrpcEnvironment.Release();
}
[Test]
public void CancelNotificationAfterStartDisposes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
fakeCall.ReceivedMessageHandler(true, null);
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void ReadAfterCancelNotificationCanSucceed()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
// Check that starting a read after cancel notification has been processed is legal.
var moveNextTask = requestStream.MoveNext();
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void ReadCompletionFailureClosesRequestStream()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null);
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAfterCancelNotificationFails()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteCompletionFailureThrows()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
// TODO(jtattermusch): should we throw a different exception type instead?
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
[Test]
public void WriteAndWriteStatusCanRunConcurrently()
{
var finishedTask = asyncCallServer.ServerSideCallAsync();
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
Assert.DoesNotThrowAsync(async () => await writeTask);
Assert.DoesNotThrowAsync(async () => await writeStatusTask);
fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
}
static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
{
Assert.IsTrue(fakeCall.IsDisposed);
Assert.IsTrue(finishedTask.IsCompleted);
Assert.DoesNotThrow(() => finishedTask.Wait());
}
}
}

@ -32,6 +32,7 @@
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@ -40,6 +41,9 @@ using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
/// </summary>
public class AsyncCallTest
{
Channel channel;
@ -75,8 +79,8 @@ namespace Grpc.Core.Internal.Tests
public void AsyncUnary_StreamingOperationsNotAllowed()
{
asyncCall.UnaryCallAsync("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartReadMessage((x,y) => {}));
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
@ -118,6 +122,14 @@ namespace Grpc.Core.Internal.Tests
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_StreamingReadNotAllowed()
{
asyncCall.ClientStreamingCallAsync();
Assert.ThrowsAsync(typeof(InvalidOperationException),
async () => await asyncCall.ReadMessageAsync());
}
[Test]
public void ClientStreaming_NoRequest_Success()
{
@ -142,6 +154,283 @@ namespace Grpc.Core.Internal.Tests
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
}
[Test]
public void ClientStreaming_MoreRequests_Success()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(true);
writeTask.Wait();
var writeTask2 = requestStream.WriteAsync("request2");
fakeCall.SendCompletionHandler(true);
writeTask2.Wait();
var completeTask = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
completeTask.Wait();
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
public void ClientStreaming_WriteFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Internal),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
}
[Test]
public void ClientStreaming_WriteAfterReceivingStatusFails()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
}
[Test]
public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
fakeCall.UnaryResponseClientHandler(true,
new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
}
[Test]
public void ClientStreaming_WriteAfterCancellationRequestFails()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
fakeCall.UnaryResponseClientHandler(true,
CreateClientSideStatus(StatusCode.Cancelled),
CreateResponsePayload(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
}
[Test]
public void ServerStreaming_StreamingSendNotAllowed()
{
asyncCall.StartServerStreamingCall("request1");
Assert.Throws(typeof(InvalidOperationException),
() => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
}
[Test]
public void ServerStreaming_NoResponse_Success1()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void ServerStreaming_NoResponse_Success2()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
// try alternative order of completions
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void ServerStreaming_NoResponse_ReadFailure()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
}
[Test]
public void ServerStreaming_MoreResponses_Success()
{
asyncCall.StartServerStreamingCall("request1");
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask2.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask3 = responseStream.MoveNext();
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
fakeCall.ReceivedMessageHandler(true, null);
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
[Test]
public void DuplexStreaming_NoRequestNoResponse_Success()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var writeTask1 = requestStream.CompleteAsync();
fakeCall.SendCompletionHandler(true);
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
[Test]
public void DuplexStreaming_WriteAfterReceivingStatusFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
}
[Test]
public void DuplexStreaming_CompleteAfterReceivingStatusFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
}
[Test]
public void DuplexStreaming_WriteAfterCancellationRequestFails()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var responseStream = new ClientResponseStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
var readTask = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
}
[Test]
public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
{
asyncCall.StartDuplexStreamingCall();
var responseStream = new ClientResponseStream<string, string>(asyncCall);
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
var readTask1 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
[Test]
public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
{
asyncCall.StartDuplexStreamingCall();
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request
asyncCall.Cancel();
Assert.IsTrue(fakeCall.IsCancelled);
fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
Assert.IsTrue(readTask1.Result);
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
fakeCall.ReceivedMessageHandler(true, null);
fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
}
ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
{
return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
@ -163,6 +452,16 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual("response1", resultTask.Result);
}
static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
{
Assert.IsTrue(moveNextTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
Assert.IsFalse(moveNextTask.Result);
Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
{
Assert.IsTrue(resultTask.IsCompleted);
@ -175,135 +474,15 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
{
get;
set;
}
public bool IsCancelled
{
get;
set;
}
public bool IsDisposed
{
get;
set;
}
public void Cancel()
{
IsCancelled = true;
}
public void CancelWithStatus(Status status)
{
IsCancelled = true;
}
public string GetPeer()
{
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
UnaryResponseClientHandler = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
UnaryResponseClientHandler = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
SendCompletionHandler = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
ReceivedCloseOnServerHandler = callback;
}
public void Dispose()
{
IsDisposed = true;
}
static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
{
Assert.IsTrue(moveNextTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
}
}

@ -0,0 +1,183 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
/// <summary>
/// For testing purposes.
/// </summary>
internal class FakeNativeCall : INativeCall
{
public UnaryResponseClientHandler UnaryResponseClientHandler
{
get;
set;
}
public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
{
get;
set;
}
public ReceivedMessageHandler ReceivedMessageHandler
{
get;
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
set;
}
public SendCompletionHandler SendStatusFromServerHandler
{
get;
set;
}
public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
{
get;
set;
}
public bool IsCancelled
{
get;
set;
}
public bool IsDisposed
{
get;
set;
}
public void Cancel()
{
IsCancelled = true;
}
public void CancelWithStatus(Status status)
{
IsCancelled = true;
}
public string GetPeer()
{
return "PEER";
}
public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
UnaryResponseClientHandler = callback;
}
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
throw new NotImplementedException();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
{
UnaryResponseClientHandler = callback;
}
public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
{
ReceivedStatusOnClientHandler = callback;
}
public void StartReceiveMessage(ReceivedMessageHandler callback)
{
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
}
public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
SendCompletionHandler = callback;
}
public void StartSendCloseFromClient(SendCompletionHandler callback)
{
SendCompletionHandler = callback;
}
public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
SendStatusFromServerHandler = callback;
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
ReceivedCloseOnServerHandler = callback;
}
public void Dispose()
{
IsDisposed = true;
}
}
}

@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
public Task<TResponse> ReadMessageAsync()
{
StartReadMessageInternal(completionDelegate);
return ReadMessageInternalAsync();
}
/// <summary>

@ -68,7 +68,8 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@ -150,15 +151,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
GrpcPreconditions.CheckState(started);
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
// and maintain its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
streamingReadTcs = new TaskCompletionSource<TRead>();
return streamingReadTcs.Task;
}
}
@ -213,15 +224,6 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
protected virtual void CheckReadingAllowed()
{
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
}
protected void CheckNotCancelled()
{
if (cancelRequested)
@ -322,22 +324,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
origCompletionDelegate = sendCompletionDelegate;
sendCompletionDelegate = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
FireCompletion(origCompletionDelegate, null, null);
sendStatusFromServerTcs.SetResult(null);
}
}
@ -346,15 +344,17 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
// if success == false, received message will be null. It that case we will
// treat this completion as the last read an rely on C core to handle the failed
// read (e.g. deliver approriate statusCode on the clientside).
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
origCompletionDelegate = readCompletionDelegate;
readCompletionDelegate = null;
origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@ -364,20 +364,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
// TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
if (!readingDone)
{
streamingReadTcs = null;
}
ReleaseResourcesIfPossible();
}
// TODO: handle the case when success==false
if (deserializeException != null && !IsClient)
{
FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
FireCompletion(origCompletionDelegate, msg, null);
origTcs.SetResult(msg);
}
}
}

@ -64,6 +64,15 @@ namespace Grpc.Core.Internal
InitializeInternal(call);
}
/// <summary>
/// Only for testing purposes.
/// </summary>
public void InitializeForTesting(INativeCall call)
{
server.AddCallReference(this);
InitializeInternal(call);
}
/// <summary>
/// Starts a server side call.
/// </summary>
@ -91,11 +100,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
public Task<TRequest> ReadMessageAsync()
{
StartReadMessageInternal(completionDelegate);
return ReadMessageInternalAsync();
}
/// <summary>
@ -128,24 +136,25 @@ namespace Grpc.Core.Internal
}
/// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
public Task SendStatusFromServerAsync(Status status, Metadata trailers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed(allowFinished: false);
GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!disposed);
GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
return sendStatusFromServerTcs.Task;
}
}
@ -174,12 +183,6 @@ namespace Grpc.Core.Internal
get { return false; }
}
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();
GrpcPreconditions.CheckArgument(!cancelRequested);
}
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@ -190,12 +193,21 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
// success will be always set to true.
lock (myLock)
{
finished = true;
if (streamingReadTcs == null)
{
// if there's no pending read, readingDone=true will dispose now.
// if there is a pending read, we will dispose once that read finishes.
readingDone = true;
streamingReadTcs = new TaskCompletionSource<TRequest>();
streamingReadTcs.SetResult(default(TRequest));
}
ReleaseResourcesIfPossible();
}
// TODO(jtattermusch): handle error
if (cancelled)
{

@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task.ConfigureAwait(false);
var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)

@ -80,8 +80,6 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
var result = await handler(request, context).ConfigureAwait(false);
status = context.Status;
await responseStream.WriteAsync(result).ConfigureAwait(false);
@ -93,7 +91,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -136,8 +134,6 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
@ -149,7 +145,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -209,7 +205,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -260,7 +256,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
@ -282,9 +278,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@ -300,7 +294,6 @@ namespace Grpc.Core.Internal
return rpcException.Status;
}
// TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}

@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
var result = await taskSource.Task.ConfigureAwait(false);
var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}

@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();

@ -1,5 +1,6 @@
{
"Grpc.Core.Tests": [
"Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",

@ -45,3 +45,7 @@ service EchoTestService {
service UnimplementedService {
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
// A service without any rpc defined to test coverage.
service NoRpcService {
}

@ -11,10 +11,10 @@ AllCops:
- 'pb/test/**/*'
Metrics/CyclomaticComplexity:
Max: 8
Max: 9
Metrics/PerceivedComplexity:
Max: 8
Max: 9
Metrics/ClassLength:
Max: 250

@ -332,15 +332,13 @@ module GRPC
# the current thread to terminate it.
def run_till_terminated
GRPC.trap_signals
stopped = false
t = Thread.new do
run
stopped = true
end
t.abort_on_exception = true
wait_till_running
loop do
until running_state == :stopped
sleep SIGNAL_CHECK_PERIOD
break if stopped
break unless GRPC.handle_signals
end
stop
@ -416,7 +414,7 @@ module GRPC
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::RESOURCE_EXHAUSTED, '')
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@ -427,7 +425,7 @@ module GRPC
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNIMPLEMENTED, '')
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@ -443,7 +441,12 @@ module GRPC
unless active_call.nil?
@pool.schedule(active_call) do |ac|
c, mth = ac
rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
begin
rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
rescue StandardError
c.send_status(GRPC::Core::StatusCodes::INTERNAL,
'Server handler failed')
end
end
end
rescue Core::CallError, RuntimeError => e

@ -111,7 +111,7 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) {
/* All test configurations */
static grpc_end2end_test_config configs[] = {
{"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
{"chttp2/fullstack+census", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION,
chttp2_create_fixture_fullstack, chttp2_init_client_fullstack,
chttp2_init_server_fullstack, chttp2_tear_down_fullstack},
};

@ -41,6 +41,7 @@
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/atm.h>
#include <grpc/impl/codegen/byte_buffer.h>
#include <grpc/impl/codegen/byte_buffer_reader.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/connectivity_state.h>
#include <grpc/impl/codegen/grpc_types.h>

@ -51,6 +51,7 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/lib/iomgr/ev_posix.h"
@ -58,6 +59,7 @@
using grpc::testing::EchoRequest;
using grpc::testing::EchoResponse;
using grpc::testing::kTlsCredentialsType;
using std::chrono::system_clock;
GPR_TLS_DECL(g_is_async_end2end_test);
@ -197,20 +199,37 @@ class Verifier {
bool spin_;
};
class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
class TestScenario {
public:
TestScenario(bool non_block, const grpc::string& creds_type,
const grpc::string& content)
: disable_blocking(non_block),
credentials_type(creds_type),
message_content(content) {}
void Log() const {
gpr_log(GPR_INFO,
"Scenario: disable_blocking %d, credentials %s, message size %d",
disable_blocking, credentials_type.c_str(), message_content.size());
}
bool disable_blocking;
const grpc::string credentials_type;
const grpc::string message_content;
};
class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
AsyncEnd2endTest() {}
AsyncEnd2endTest() { GetParam().Log(); }
void SetUp() GRPC_OVERRIDE {
poll_overrider_.reset(new PollingOverrider(!GetParam()));
poll_overrider_.reset(new PollingOverrider(!GetParam().disable_blocking));
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
// Setup server
ServerBuilder builder;
builder.AddListeningPort(server_address_.str(),
grpc::InsecureServerCredentials());
auto server_creds = GetServerCredentials(GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
@ -230,8 +249,11 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
}
void ResetStub() {
ChannelArguments args;
auto channel_creds =
GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
CreateCustomChannel(server_address_.str(), channel_creds, args);
stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
@ -247,22 +269,23 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -302,7 +325,7 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@ -310,23 +333,22 @@ TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
std::chrono::system_clock::now());
std::chrono::system_clock::time_point time_limit(
std::chrono::system_clock::now() + std::chrono::seconds(10));
Verifier(GetParam()).Verify(cq_.get(), time_now);
Verifier(GetParam()).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
Verifier(GetParam().disable_blocking).Verify(cq_.get(), time_now);
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Verify(cq_.get(), time_limit);
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam())
.Expect(3, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam())
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get(), std::chrono::system_clock::time_point::max());
@ -347,41 +369,48 @@ TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
ServerContext srv_ctx;
ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(2, true)
.Expect(1, true)
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->Write(send_request, tag(5));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_stream->WritesDone(tag(7));
Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_stream.Finish(send_response, Status::OK, tag(9));
Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -400,39 +429,45 @@ TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
ServerContext srv_ctx;
ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Write(send_response, tag(5));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
srv_stream.Finish(Status::OK, tag(7));
Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(8));
Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(9));
Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(9, true).Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@ -450,41 +485,48 @@ TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
ServerContext srv_ctx;
ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(1, true)
.Expect(2, true)
.Verify(cq_.get());
cli_stream->Write(send_request, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
srv_stream.Write(send_response, tag(5));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
cli_stream->Read(&recv_response, tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
cli_stream->WritesDone(tag(7));
Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
srv_stream.Read(&recv_request, tag(8));
Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(7, true)
.Expect(8, false)
.Verify(cq_.get());
srv_stream.Finish(Status::OK, tag(9));
Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(9, true)
.Expect(10, true)
.Verify(cq_.get());
EXPECT_TRUE(recv_status.ok());
}
@ -503,7 +545,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
@ -516,7 +558,7 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@ -529,11 +571,11 @@ TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -552,7 +594,7 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@ -561,15 +603,15 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta1.second,
ToString(server_initial_metadata.find(meta1.first)->second));
@ -579,10 +621,11 @@ TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(5));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
@ -601,7 +644,7 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2("key2", "val2");
@ -610,20 +653,22 @@ TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
send_response.set_message(recv_request.message());
srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
response_writer.Finish(send_response, Status::OK, tag(4));
response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(5));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -647,7 +692,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::pair<grpc::string, grpc::string> meta1("key1", "val1");
std::pair<grpc::string, grpc::string> meta2(
"key2-bin",
@ -671,7 +716,7 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
auto client_initial_metadata = srv_ctx.client_metadata();
EXPECT_EQ(meta1.second,
@ -683,9 +728,9 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
response_writer.SendInitialMetadata(tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
response_reader->ReadInitialMetadata(tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(4, true).Verify(cq_.get());
auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
EXPECT_EQ(meta3.second,
ToString(server_initial_metadata.find(meta3.first)->second));
@ -697,11 +742,13 @@ TEST_P(AsyncEnd2endTest, MetadataRpc) {
srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
response_writer.Finish(send_response, Status::OK, tag(5));
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(5, true)
.Expect(6, true)
.Verify(cq_.get());
response_reader->Finish(&recv_response, &recv_status, tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
@ -726,7 +773,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@ -734,15 +781,15 @@ TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
cli_ctx.TryCancel();
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(5, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
}
@ -761,7 +808,7 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
ServerContext srv_ctx;
grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
@ -769,25 +816,29 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
send_response.set_message(recv_request.message());
response_writer.Finish(send_response, Status::OK, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(3, true)
.Expect(4, true)
.Expect(5, true)
.Verify(cq_.get());
EXPECT_FALSE(srv_ctx.IsCancelled());
EXPECT_EQ(send_response.message(), recv_response.message());
EXPECT_TRUE(recv_status.ok());
}
TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
ChannelArguments args;
auto channel_creds =
GetChannelCredentials(GetParam().credentials_type, &args);
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
CreateCustomChannel(server_address_.str(), channel_creds, args);
std::unique_ptr<grpc::testing::UnimplementedService::Stub> stub;
stub = grpc::testing::UnimplementedService::NewStub(channel);
EchoRequest send_request;
@ -795,12 +846,12 @@ TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
Status recv_status;
ClientContext cli_ctx;
send_request.set_message("Hello");
send_request.set_message(GetParam().message_content);
std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
response_reader->Finish(&recv_response, &recv_status, tag(4));
Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(4, false).Verify(cq_.get());
EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
EXPECT_EQ("", recv_status.error_message());
@ -847,23 +898,25 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'RequestStream' call on client
std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'RequestStream' calls
// and receive the 'RequestStream' call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends 3 messages (tags 3, 4 and 5)
for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
send_request.set_message("Ping " + std::to_string(tag_idx));
cli_stream->Write(send_request, tag(tag_idx));
Verifier(GetParam()).Expect(tag_idx, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, true)
.Verify(cq_.get());
}
cli_stream->WritesDone(tag(6));
Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get());
bool expected_server_cq_result = true;
bool ignore_cq_result = false;
@ -871,7 +924,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// Since cancellation is done before server reads any results, we know
@ -881,7 +934,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@ -939,13 +992,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server sends the final message and cancelled status (but the RPC is
// already cancelled at this point. So we expect the operation to fail)
srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
// TODO(sreek): The expectation here should be true. This is a bug (github
// issue #4972)
Verifier(GetParam()).Expect(10, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(10, false).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@ -979,13 +1032,13 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the 'ResponseStream' call on the client
std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of 'ResponseStream' calls and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
cq_.get(), cq_.get(), tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
EXPECT_EQ(send_request.message(), recv_request.message());
bool expected_cq_result = true;
@ -994,7 +1047,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@ -1004,7 +1057,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@ -1064,7 +1117,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Client attemts to read the three messages from the server
for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
cli_stream->Read(&recv_response, tag(tag_idx));
Verifier(GetParam())
Verifier(GetParam().disable_blocking)
.Expect(tag_idx, expected_cq_result)
.Verify(cq_.get(), ignore_cq_result);
}
@ -1075,11 +1128,11 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Server finishes the stream (but the RPC is already cancelled)
srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
// Client will see the cancellation
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@ -1114,19 +1167,19 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// Initiate the call from the client side
std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
Verifier(GetParam()).Expect(1, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get());
// On the server, request to be notified of the 'BidiStream' call and
// receive the call just made by the client
srv_ctx.AsyncNotifyWhenDone(tag(11));
service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
tag(2));
Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get());
// Client sends the first and the only message
send_request.set_message("Ping");
cli_stream->Write(send_request, tag(3));
Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get());
bool expected_cq_result = true;
bool ignore_cq_result = false;
@ -1134,7 +1187,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
srv_ctx.TryCancel();
Verifier(GetParam()).Expect(11, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get());
EXPECT_TRUE(srv_ctx.IsCancelled());
// We know for sure that all cq results will be false from this point
@ -1144,7 +1197,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
std::thread* server_try_cancel_thd = NULL;
auto verif = Verifier(GetParam());
auto verif = Verifier(GetParam().disable_blocking);
if (server_try_cancel == CANCEL_DURING_PROCESSING) {
server_try_cancel_thd =
@ -1244,10 +1297,10 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
// know that cq results are supposed to return false on server.
srv_stream.Finish(Status::CANCELLED, tag(9));
Verifier(GetParam()).Expect(9, false).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(9, false).Verify(cq_.get());
cli_stream->Finish(&recv_status, tag(10));
Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get());
EXPECT_FALSE(recv_status.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
}
@ -1289,11 +1342,48 @@ TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
}
std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
bool test_secure,
int test_big_limit) {
std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types;
std::vector<grpc::string> messages;
credentials_types.push_back(kInsecureCredentialsType);
auto sec_list = GetSecureCredentialsTypeList();
for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
credentials_types.push_back(*sec);
}
messages.push_back("Hello");
for (int sz = 1; sz < test_big_limit; sz *= 2) {
grpc::string big_msg;
for (int i = 0; i < sz * 1024; i++) {
char c = 'a' + (i % 26);
big_msg += c;
}
messages.push_back(big_msg);
}
for (auto cred = credentials_types.begin(); cred != credentials_types.end();
++cred) {
for (auto msg = messages.begin(); msg != messages.end(); msg++) {
scenarios.push_back(TestScenario(false, *cred, *msg));
if (test_disable_blocking) {
scenarios.push_back(TestScenario(true, *cred, *msg));
}
}
}
return scenarios;
}
INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
::testing::Values(false, true));
::testing::ValuesIn(CreateTestScenarios(true, true,
1024)));
INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
AsyncEnd2endServerTryCancelTest,
::testing::Values(false));
::testing::ValuesIn(CreateTestScenarios(false, false,
0)));
} // namespace
} // namespace testing

@ -31,6 +31,7 @@
import argparse
import os
import os.path
import re
import sys
import subprocess
@ -187,6 +188,8 @@ filename_list = []
try:
filename_list = subprocess.check_output(FILE_LIST_COMMAND,
shell=True).splitlines()
# Filter out non-existent files (ie, file removed or renamed)
filename_list = (f for f in filename_list if os.path.isfile(f))
except subprocess.CalledProcessError:
sys.exit(0)

@ -833,6 +833,7 @@ include/grpc++/impl/codegen/sync_no_cxx11.h \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \

@ -833,6 +833,7 @@ include/grpc++/impl/codegen/sync_no_cxx11.h \
include/grpc++/impl/codegen/sync_stream.h \
include/grpc++/impl/codegen/time.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \

@ -766,6 +766,7 @@ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \

@ -766,6 +766,7 @@ include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/impl/codegen/byte_buffer.h \
include/grpc/impl/codegen/byte_buffer_reader.h \
include/grpc/impl/codegen/compression_types.h \
include/grpc/impl/codegen/connectivity_state.h \
include/grpc/impl/codegen/grpc_types.h \

@ -5906,6 +5906,7 @@
],
"headers": [
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",
@ -5916,6 +5917,7 @@
"name": "grpc_codegen",
"src": [
"include/grpc/impl/codegen/byte_buffer.h",
"include/grpc/impl/codegen/byte_buffer_reader.h",
"include/grpc/impl/codegen/compression_types.h",
"include/grpc/impl/codegen/connectivity_state.h",
"include/grpc/impl/codegen/grpc_types.h",

@ -331,6 +331,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -315,6 +315,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

@ -331,6 +331,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -300,6 +300,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

@ -273,6 +273,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\grpc.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\status.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -519,6 +519,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

@ -264,6 +264,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\grpc.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\status.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -459,6 +459,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

@ -191,6 +191,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -120,6 +120,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

@ -191,6 +191,7 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\sync_stream.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc++\impl\codegen\time.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\connectivity_state.h" />
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\grpc_types.h" />

@ -120,6 +120,9 @@
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\byte_buffer_reader.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\include\grpc\impl\codegen\compression_types.h">
<Filter>include\grpc\impl\codegen</Filter>
</ClInclude>

Loading…
Cancel
Save