merge with parent pR

pull/2590/head
yang-g 10 years ago
commit 88b77c64d9
  1. 18
      BUILD
  2. 2
      INSTALL
  3. 6
      Makefile
  4. 10
      build.json
  5. 13
      gRPC.podspec
  6. 11
      include/grpc/census.h
  7. 16
      include/grpc/grpc.h
  8. 6
      include/grpc/support/host_port.h
  9. 3
      include/grpc/support/time.h
  10. 14
      src/compiler/csharp_generator.cc
  11. 38
      src/core/census/record_stat.c
  12. 46
      src/core/census/rpc_stat_id.h
  13. 5
      src/core/channel/channel_stack.c
  14. 5
      src/core/channel/channel_stack.h
  15. 49
      src/core/channel/client_channel.c
  16. 3
      src/core/channel/compress_filter.c
  17. 6
      src/core/channel/connected_channel.c
  18. 16
      src/core/channel/http_client_filter.c
  19. 3
      src/core/channel/http_server_filter.c
  20. 1
      src/core/channel/noop_filter.c
  21. 4
      src/core/client_config/README.md
  22. 299
      src/core/client_config/resolvers/sockaddr_resolver.c
  23. 6
      src/core/client_config/resolvers/sockaddr_resolver.h
  24. 195
      src/core/client_config/resolvers/unix_resolver_posix.c
  25. 11
      src/core/client_config/subchannel.c
  26. 3
      src/core/client_config/subchannel.h
  27. 12
      src/core/iomgr/alarm.c
  28. 4
      src/core/iomgr/endpoint.c
  29. 3
      src/core/iomgr/endpoint.h
  30. 8
      src/core/iomgr/endpoint_pair_posix.c
  31. 6
      src/core/iomgr/endpoint_pair_windows.c
  32. 10
      src/core/iomgr/iomgr.c
  33. 4
      src/core/iomgr/pollset_posix.c
  34. 2
      src/core/iomgr/pollset_windows.c
  35. 33
      src/core/iomgr/sockaddr_utils.c
  36. 2
      src/core/iomgr/sockaddr_utils.h
  37. 15
      src/core/iomgr/tcp_client_posix.c
  38. 7
      src/core/iomgr/tcp_client_windows.c
  39. 26
      src/core/iomgr/tcp_posix.c
  40. 3
      src/core/iomgr/tcp_posix.h
  41. 7
      src/core/iomgr/tcp_server_posix.c
  42. 13
      src/core/iomgr/tcp_server_windows.c
  43. 17
      src/core/iomgr/tcp_windows.c
  44. 2
      src/core/iomgr/tcp_windows.h
  45. 8
      src/core/security/client_auth_filter.c
  46. 11
      src/core/security/secure_endpoint.c
  47. 8
      src/core/security/server_auth_filter.c
  48. 10
      src/core/support/host_port.c
  49. 3
      src/core/support/sync_posix.c
  50. 4
      src/core/support/sync_win32.c
  51. 27
      src/core/support/time.c
  52. 2
      src/core/support/time_posix.c
  53. 2
      src/core/support/time_win32.c
  54. 11
      src/core/surface/call.c
  55. 15
      src/core/surface/channel.c
  56. 2
      src/core/surface/channel.h
  57. 5
      src/core/surface/channel_connectivity.c
  58. 3
      src/core/surface/channel_create.c
  59. 4
      src/core/surface/completion_queue.c
  60. 7
      src/core/surface/init.c
  61. 18
      src/core/surface/lame_client.c
  62. 7
      src/core/surface/secure_channel_create.c
  63. 5
      src/core/surface/server.c
  64. 1
      src/core/transport/chttp2/internal.h
  65. 9
      src/core/transport/chttp2/stream_encoder.c
  66. 16
      src/core/transport/chttp2_transport.c
  67. 4
      src/core/transport/transport.c
  68. 3
      src/core/transport/transport.h
  69. 3
      src/core/transport/transport_impl.h
  70. 2
      src/core/transport/transport_op_string.c
  71. 4
      src/cpp/client/create_channel.cc
  72. 133
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  73. 21
      src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
  74. 13
      src/csharp/Grpc.Core.Tests/TimespecTest.cs
  75. 18
      src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
  76. 24
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  77. 24
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  78. 106
      src/csharp/Grpc.Core/AsyncUnaryCall.cs
  79. 10
      src/csharp/Grpc.Core/Calls.cs
  80. 25
      src/csharp/Grpc.Core/Channel.cs
  81. 30
      src/csharp/Grpc.Core/ChannelOptions.cs
  82. 9
      src/csharp/Grpc.Core/Grpc.Core.csproj
  83. 43
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  84. 7
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  85. 146
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  86. 6
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  87. 46
      src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
  88. 79
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  89. 4
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  90. 14
      src/csharp/Grpc.Core/Internal/Timespec.cs
  91. 5
      src/csharp/Grpc.Core/Metadata.cs
  92. 28
      src/csharp/Grpc.Core/Server.cs
  93. 90
      src/csharp/Grpc.Core/ServerCallContext.cs
  94. 8
      src/csharp/Grpc.Core/ServerMethods.cs
  95. 2
      src/csharp/Grpc.Core/Version.cs
  96. 13
      src/csharp/Grpc.Core/VersionInfo.cs
  97. 2
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  98. 7
      src/csharp/Grpc.Examples/MathExamples.cs
  99. 12
      src/csharp/Grpc.Examples/MathGrpc.cs
  100. 8
      src/csharp/Grpc.Examples/MathServiceImpl.cs
  101. Some files were not shown because too many files have changed in this diff Show More

18
BUILD

@ -168,7 +168,7 @@ cc_library(
"src/core/client_config/resolver_factory.h",
"src/core/client_config/resolver_registry.h",
"src/core/client_config/resolvers/dns_resolver.h",
"src/core/client_config/resolvers/unix_resolver_posix.h",
"src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h",
"src/core/client_config/uri_parser.h",
@ -246,6 +246,7 @@ cc_library(
"src/core/transport/transport.h",
"src/core/transport/transport_impl.h",
"src/core/census/context.h",
"src/core/census/rpc_stat_id.h",
"src/core/httpcli/format_request.c",
"src/core/httpcli/httpcli.c",
"src/core/httpcli/httpcli_security_connector.c",
@ -287,7 +288,7 @@ cc_library(
"src/core/client_config/resolver_factory.c",
"src/core/client_config/resolver_registry.c",
"src/core/client_config/resolvers/dns_resolver.c",
"src/core/client_config/resolvers/unix_resolver_posix.c",
"src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c",
"src/core/client_config/uri_parser.c",
@ -382,6 +383,7 @@ cc_library(
"src/core/transport/transport_op_string.c",
"src/core/census/context.c",
"src/core/census/initialize.c",
"src/core/census/record_stat.c",
],
hdrs = [
"include/grpc/grpc_security.h",
@ -425,7 +427,7 @@ cc_library(
"src/core/client_config/resolver_factory.h",
"src/core/client_config/resolver_registry.h",
"src/core/client_config/resolvers/dns_resolver.h",
"src/core/client_config/resolvers/unix_resolver_posix.h",
"src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h",
"src/core/client_config/uri_parser.h",
@ -503,6 +505,7 @@ cc_library(
"src/core/transport/transport.h",
"src/core/transport/transport_impl.h",
"src/core/census/context.h",
"src/core/census/rpc_stat_id.h",
"src/core/surface/init_unsecure.c",
"src/core/census/grpc_context.c",
"src/core/channel/channel_args.c",
@ -521,7 +524,7 @@ cc_library(
"src/core/client_config/resolver_factory.c",
"src/core/client_config/resolver_registry.c",
"src/core/client_config/resolvers/dns_resolver.c",
"src/core/client_config/resolvers/unix_resolver_posix.c",
"src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c",
"src/core/client_config/uri_parser.c",
@ -616,6 +619,7 @@ cc_library(
"src/core/transport/transport_op_string.c",
"src/core/census/context.c",
"src/core/census/initialize.c",
"src/core/census/record_stat.c",
],
hdrs = [
"include/grpc/byte_buffer.h",
@ -1000,7 +1004,7 @@ objc_library(
"src/core/client_config/resolver_factory.c",
"src/core/client_config/resolver_registry.c",
"src/core/client_config/resolvers/dns_resolver.c",
"src/core/client_config/resolvers/unix_resolver_posix.c",
"src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c",
"src/core/client_config/uri_parser.c",
@ -1095,6 +1099,7 @@ objc_library(
"src/core/transport/transport_op_string.c",
"src/core/census/context.c",
"src/core/census/initialize.c",
"src/core/census/record_stat.c",
],
hdrs = [
"include/grpc/grpc_security.h",
@ -1140,7 +1145,7 @@ objc_library(
"src/core/client_config/resolver_factory.h",
"src/core/client_config/resolver_registry.h",
"src/core/client_config/resolvers/dns_resolver.h",
"src/core/client_config/resolvers/unix_resolver_posix.h",
"src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h",
"src/core/client_config/uri_parser.h",
@ -1218,6 +1223,7 @@ objc_library(
"src/core/transport/transport.h",
"src/core/transport/transport_impl.h",
"src/core/census/context.h",
"src/core/census/rpc_stat_id.h",
],
includes = [
"include",

@ -117,7 +117,7 @@ most Mac installations. Do the "git submodule" command listed above.
Then execute the following for all the needed build dependencies
$ sudo /opt/local/bin/port install autoconf automake libtool gflags cmake
$ mkdir ~/gtest
$ mkdir ~/gtest-svn
$ svn checkout http://googletest.googlecode.com/svn/trunk/ gtest-svn
$ mkdir mybuild
$ cd mybuild

@ -3706,7 +3706,7 @@ LIBGRPC_SRC = \
src/core/client_config/resolver_factory.c \
src/core/client_config/resolver_registry.c \
src/core/client_config/resolvers/dns_resolver.c \
src/core/client_config/resolvers/unix_resolver_posix.c \
src/core/client_config/resolvers/sockaddr_resolver.c \
src/core/client_config/subchannel.c \
src/core/client_config/subchannel_factory.c \
src/core/client_config/uri_parser.c \
@ -3801,6 +3801,7 @@ LIBGRPC_SRC = \
src/core/transport/transport_op_string.c \
src/core/census/context.c \
src/core/census/initialize.c \
src/core/census/record_stat.c \
PUBLIC_HEADERS_C += \
include/grpc/grpc_security.h \
@ -3971,7 +3972,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/client_config/resolver_factory.c \
src/core/client_config/resolver_registry.c \
src/core/client_config/resolvers/dns_resolver.c \
src/core/client_config/resolvers/unix_resolver_posix.c \
src/core/client_config/resolvers/sockaddr_resolver.c \
src/core/client_config/subchannel.c \
src/core/client_config/subchannel_factory.c \
src/core/client_config/uri_parser.c \
@ -4066,6 +4067,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/transport/transport_op_string.c \
src/core/census/context.c \
src/core/census/initialize.c \
src/core/census/record_stat.c \
PUBLIC_HEADERS_C += \
include/grpc/byte_buffer.h \

@ -18,11 +18,13 @@
"include/grpc/census.h"
],
"headers": [
"src/core/census/context.h"
"src/core/census/context.h",
"src/core/census/rpc_stat_id.h"
],
"src": [
"src/core/census/context.c",
"src/core/census/initialize.c"
"src/core/census/initialize.c",
"src/core/census/record_stat.c"
]
},
{
@ -129,7 +131,7 @@
"src/core/client_config/resolver_factory.h",
"src/core/client_config/resolver_registry.h",
"src/core/client_config/resolvers/dns_resolver.h",
"src/core/client_config/resolvers/unix_resolver_posix.h",
"src/core/client_config/resolvers/sockaddr_resolver.h",
"src/core/client_config/subchannel.h",
"src/core/client_config/subchannel_factory.h",
"src/core/client_config/uri_parser.h",
@ -225,7 +227,7 @@
"src/core/client_config/resolver_factory.c",
"src/core/client_config/resolver_registry.c",
"src/core/client_config/resolvers/dns_resolver.c",
"src/core/client_config/resolvers/unix_resolver_posix.c",
"src/core/client_config/resolvers/sockaddr_resolver.c",
"src/core/client_config/subchannel.c",
"src/core/client_config/subchannel_factory.c",
"src/core/client_config/uri_parser.c",

@ -170,7 +170,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolver_factory.h',
'src/core/client_config/resolver_registry.h',
'src/core/client_config/resolvers/dns_resolver.h',
'src/core/client_config/resolvers/unix_resolver_posix.h',
'src/core/client_config/resolvers/sockaddr_resolver.h',
'src/core/client_config/subchannel.h',
'src/core/client_config/subchannel_factory.h',
'src/core/client_config/uri_parser.h',
@ -248,6 +248,7 @@ Pod::Spec.new do |s|
'src/core/transport/transport.h',
'src/core/transport/transport_impl.h',
'src/core/census/context.h',
'src/core/census/rpc_stat_id.h',
'grpc/grpc_security.h',
'grpc/byte_buffer.h',
'grpc/byte_buffer_reader.h',
@ -296,7 +297,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolver_factory.c',
'src/core/client_config/resolver_registry.c',
'src/core/client_config/resolvers/dns_resolver.c',
'src/core/client_config/resolvers/unix_resolver_posix.c',
'src/core/client_config/resolvers/sockaddr_resolver.c',
'src/core/client_config/subchannel.c',
'src/core/client_config/subchannel_factory.c',
'src/core/client_config/uri_parser.c',
@ -390,7 +391,8 @@ Pod::Spec.new do |s|
'src/core/transport/transport.c',
'src/core/transport/transport_op_string.c',
'src/core/census/context.c',
'src/core/census/initialize.c'
'src/core/census/initialize.c',
'src/core/census/record_stat.c'
ss.private_header_files = 'src/core/support/env.h',
'src/core/support/file.h',
@ -435,7 +437,7 @@ Pod::Spec.new do |s|
'src/core/client_config/resolver_factory.h',
'src/core/client_config/resolver_registry.h',
'src/core/client_config/resolvers/dns_resolver.h',
'src/core/client_config/resolvers/unix_resolver_posix.h',
'src/core/client_config/resolvers/sockaddr_resolver.h',
'src/core/client_config/subchannel.h',
'src/core/client_config/subchannel_factory.h',
'src/core/client_config/uri_parser.h',
@ -512,7 +514,8 @@ Pod::Spec.new do |s|
'src/core/transport/stream_op.h',
'src/core/transport/transport.h',
'src/core/transport/transport_impl.h',
'src/core/census/context.h'
'src/core/census/context.h',
'src/core/census/rpc_stat_id.h'
ss.header_mappings_dir = '.'

@ -100,6 +100,17 @@ int census_context_deserialize(const char *buffer, census_context **context);
* future census calls will result in undefined behavior. */
void census_context_destroy(census_context *context);
/* A census statistic to be recorded comprises two parts: an ID for the
* particular statistic and the value to be recorded against it. */
typedef struct {
int id;
double value;
} census_stat;
/* Record new stats against the given context. */
void census_record_stat(census_context *context, census_stat *stats,
size_t nstats);
#ifdef __cplusplus
}
#endif

@ -450,6 +450,20 @@ grpc_call *grpc_channel_create_registered_call(
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag);
/** Returns a newly allocated string representing the endpoint to which this
call is communicating with. The string is in the uri format accepted by
grpc_channel_create.
The returned string should be disposed of with gpr_free().
WARNING: this value is never authenticated or subject to any security
related code. It must not be used for any authentication related
functionality. Instead, use grpc_auth_context. */
char *grpc_call_get_peer(grpc_call *call);
/** Return a newly allocated string representing the target a channel was
created for. */
char *grpc_channel_get_target(grpc_channel *channel);
/** Create a client channel to 'target'. Additional channel level configuration
MAY be provided by grpc_channel_args, though the expectation is that most
clients will want to simply pass NULL. See grpc_channel_args definition for
@ -459,7 +473,7 @@ grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args);
/** Create a lame client: this client fails every operation attempted on it. */
grpc_channel *grpc_lame_client_channel_create(void);
grpc_channel *grpc_lame_client_channel_create(const char *target);
/** Close and destroy a grpc channel */
void grpc_channel_destroy(grpc_channel *channel);

@ -52,8 +52,10 @@ int gpr_join_host_port(char **out, const char *host, int port);
/* Given a name in the form "host:port" or "[ho:st]:port", split into hostname
and port number, into newly allocated strings, which must later be
destroyed using gpr_free(). */
void gpr_split_host_port(const char *name, char **host, char **port);
destroyed using gpr_free().
Return 1 on success, 0 on failure. Guarantees *host and *port == NULL on
failure. */
int gpr_split_host_port(const char *name, char **host, char **port);
#ifdef __cplusplus
}

@ -83,6 +83,9 @@ void gpr_time_init(void);
/* Return the current time measured from the given clocks epoch. */
gpr_timespec gpr_now(gpr_clock_type clock);
/* Convert a timespec from one clock to another */
gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type target_clock);
/* Return -ve, 0, or +ve according to whether a < b, a == b, or a > b
respectively. */
int gpr_time_cmp(gpr_timespec a, gpr_timespec b);

@ -149,7 +149,7 @@ std::string GetMethodRequestParamMaybe(const MethodDescriptor *method) {
std::string GetMethodReturnTypeClient(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
return "Task<" + GetClassName(method->output_type()) + ">";
return "AsyncUnaryCall<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_CLIENT_STREAMING:
return "AsyncClientStreamingCall<" + GetClassName(method->input_type())
+ ", " + GetClassName(method->output_type()) + ">";
@ -298,11 +298,13 @@ void GenerateServerInterface(Printer* out, const ServiceDescriptor *service) {
out->Indent();
for (int i = 0; i < service->method_count(); i++) {
const MethodDescriptor *method = service->method(i);
out->Print("$returntype$ $methodname$(ServerCallContext context, $request$$response_stream_maybe$);\n",
"methodname", method->name(), "returntype",
GetMethodReturnTypeServer(method), "request",
GetMethodRequestParamServer(method), "response_stream_maybe",
GetMethodResponseStreamMaybe(method));
out->Print(
"$returntype$ $methodname$($request$$response_stream_maybe$, "
"ServerCallContext context);\n",
"methodname", method->name(), "returntype",
GetMethodReturnTypeServer(method), "request",
GetMethodRequestParamServer(method), "response_stream_maybe",
GetMethodResponseStreamMaybe(method));
}
out->Outdent();
out->Print("}\n");

@ -0,0 +1,38 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/census.h>
#include "src/core/census/rpc_stat_id.h"
void census_record_stat(census_context *context, census_stat *stats,
size_t nstats) {}

@ -0,0 +1,46 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CENSUS_RPC_STAT_ID_H
#define CENSUS_RPC_STAT_ID_H
/* Stats ID's used for RPC measurements. */
#define CENSUS_INVALID_STAT_ID 0 /* ID 0 is always invalid */
#define CENSUS_RPC_CLIENT_REQUESTS 1 /* Count of client requests sent. */
#define CENSUS_RPC_SERVER_REQUESTS 2 /* Count of server requests sent. */
#define CENSUS_RPC_CLIENT_ERRORS 3 /* Client error counts. */
#define CENSUS_RPC_SERVER_ERRORS 4 /* Server error counts. */
#define CENSUS_RPC_CLIENT_LATENCY 5 /* Client side request latency. */
#define CENSUS_RPC_SERVER_LATENCY 6 /* Server side request latency. */
#endif /* CENSUS_RPC_STAT_ID_H */

@ -191,6 +191,11 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op) {
next_elem->filter->start_transport_stream_op(next_elem, op);
}
char *grpc_call_next_get_peer(grpc_call_element *elem) {
grpc_call_element *next_elem = elem + 1;
return next_elem->filter->get_peer(next_elem);
}
void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op) {
grpc_channel_element *next_elem = elem + 1;
next_elem->filter->start_transport_op(next_elem, op);

@ -104,6 +104,9 @@ typedef struct {
The filter does not need to do any chaining */
void (*destroy_channel_elem)(grpc_channel_element *elem);
/* Implement grpc_call_get_peer() */
char *(*get_peer)(grpc_call_element *elem);
/* The name of this filter */
const char *name;
} grpc_channel_filter;
@ -173,6 +176,8 @@ void grpc_call_next_op(grpc_call_element *elem, grpc_transport_stream_op *op);
/* Call the next operation (depending on call directionality) in a channel
stack */
void grpc_channel_next_op(grpc_channel_element *elem, grpc_transport_op *op);
/* Pass through a request to get_peer to the next child element */
char *grpc_call_next_get_peer(grpc_call_element *elem);
/* Given the top element of a channel stack, get the channel stack itself */
grpc_channel_stack *grpc_channel_stack_from_top_element(

@ -249,21 +249,6 @@ static void picked_target(void *arg, int iomgr_success) {
}
}
static void pick_target(grpc_lb_policy *lb_policy, call_data *calld) {
grpc_metadata_batch *initial_metadata;
grpc_transport_stream_op *op = &calld->waiting_op;
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
initial_metadata = &op->send_ops->ops[0].data.metadata;
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(lb_policy, op->bind_pollset, initial_metadata,
&calld->picked_channel, &calld->async_setup_task);
}
static grpc_iomgr_closure *merge_into_waiting_op(
grpc_call_element *elem, grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
@ -293,6 +278,26 @@ static grpc_iomgr_closure *merge_into_waiting_op(
return consumed_op;
}
static char *cc_get_peer(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
char *result;
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_ACTIVE) {
subchannel_call = calld->subchannel_call;
GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
gpr_mu_unlock(&calld->mu_state);
result = grpc_subchannel_call_get_peer(subchannel_call);
GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer");
return result;
} else {
gpr_mu_unlock(&calld->mu_state);
return grpc_channel_get_target(chand->master);
}
}
static void perform_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
@ -371,12 +376,23 @@ static void perform_transport_stream_op(grpc_call_element *elem,
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy) {
grpc_transport_stream_op *op = &calld->waiting_op;
grpc_pollset *bind_pollset = op->bind_pollset;
grpc_metadata_batch *initial_metadata = &op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
GPR_ASSERT(op->bind_pollset);
GPR_ASSERT(op->send_ops);
GPR_ASSERT(op->send_ops->nops >= 1);
GPR_ASSERT(
op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
pick_target(lb_policy, calld);
grpc_iomgr_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel, &calld->async_setup_task);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@ -665,6 +681,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
cc_get_peer,
"client-channel",
};

@ -200,7 +200,7 @@ static void process_send_ops(grpc_call_element *elem,
channeld->default_compression_algorithm;
calld->has_compression_algorithm = 1; /* GPR_TRUE */
}
grpc_metadata_batch_add_head(
grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->compression_algorithm_storage,
grpc_mdelem_ref(channeld->mdelem_compression_algorithms
[calld->compression_algorithm]));
@ -322,4 +322,5 @@ const grpc_channel_filter grpc_compress_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"compress"};

@ -119,6 +119,11 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
grpc_transport_destroy(cd->transport);
}
static char *con_get_peer(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
return grpc_transport_get_peer(chand->transport);
}
const grpc_channel_filter grpc_connected_channel_filter = {
con_start_transport_stream_op,
con_start_transport_op,
@ -128,6 +133,7 @@ const grpc_channel_filter grpc_connected_channel_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
con_get_peer,
"connected",
};

@ -98,6 +98,18 @@ static void hc_on_recv(void *user_data, int success) {
calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success);
}
static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
channel_data *channeld = elem->channel_data;
/* eat the things we'd like to set ourselves */
if (md->key == channeld->method->key) return NULL;
if (md->key == channeld->scheme->key) return NULL;
if (md->key == channeld->te_trailers->key) return NULL;
if (md->key == channeld->content_type->key) return NULL;
if (md->key == channeld->user_agent->key) return NULL;
return md;
}
static void hc_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
/* grab pointers to our data from the call element */
@ -111,6 +123,7 @@ static void hc_mutate_op(grpc_call_element *elem,
grpc_stream_op *op = &ops[i];
if (op->type != GRPC_OP_METADATA) continue;
calld->sent_initial_metadata = 1;
grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem);
/* Send : prefixed headers, which have to be before any application
layer headers. */
grpc_metadata_batch_add_head(&op->data.metadata, &calld->method,
@ -267,4 +280,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
const grpc_channel_filter grpc_http_client_filter = {
hc_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "http-client"};
init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
"http-client"};

@ -280,4 +280,5 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
const grpc_channel_filter grpc_http_server_filter = {
hs_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "http-server"};
init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer,
"http-server"};

@ -127,4 +127,5 @@ const grpc_channel_filter grpc_no_op_filter = {noop_start_transport_stream_op,
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"no-op"};

@ -60,3 +60,7 @@ unix:path - the unix scheme is used to create and connect to unix domain
sockets - the authority must be empty, and the path
represents the absolute or relative path to the desired
socket
ipv4:host:port - a pre-resolved ipv4 dotted decimal address/port combination
ipv6:[host]:port - a pre-resolved ipv6 address/port combination

@ -0,0 +1,299 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include <stdio.h>
#include <string.h>
#ifdef GPR_POSIX_SOCKET
#include <sys/un.h>
#endif
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/string_util.h>
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
typedef struct {
/** base class: must be first */
grpc_resolver base;
/** refcount */
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
/** load balancing policy factory */
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
/** the address that we've 'resolved' */
struct sockaddr_storage addr;
int addr_len;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
int published;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
} sockaddr_resolver;
static void sockaddr_destroy(grpc_resolver *r);
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r);
static void sockaddr_shutdown(grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
sockaddr_next};
static void sockaddr_shutdown(grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
/* TODO(ctiller): add delayed callback */
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
}
static void sockaddr_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *sa, int len) {}
static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_config = target_config;
sockaddr_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
}
static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_subchannel *subchannel;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
cfg = grpc_client_config_create();
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)&r->addr;
args.addr_len = r->addr_len;
subchannel =
grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
lb_policy = r->lb_policy_factory(&subchannel, 1);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
}
static void sockaddr_destroy(grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r);
}
#ifdef GPR_POSIX_SOCKET
static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, int *len) {
struct sockaddr_un *un = (struct sockaddr_un *)addr;
un->sun_family = AF_UNIX;
strcpy(un->sun_path, uri->path);
*len = strlen(un->sun_path) + sizeof(un->sun_family) + 1;
return 1;
}
#endif
static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) {
const char *host_port = uri->path;
char *host;
char *port;
int port_num;
int result = 0;
struct sockaddr_in *in = (struct sockaddr_in *)addr;
if (*host_port == '/') ++host_port;
if (!gpr_split_host_port(host_port, &host, &port)) {
return 0;
}
memset(in, 0, sizeof(*in));
*len = sizeof(*in);
in->sin_family = AF_INET;
if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done;
}
if (port != NULL) {
if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
port_num > 65535) {
gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
goto done;
}
in->sin_port = htons(port_num);
} else {
gpr_log(GPR_ERROR, "no port given for ipv4 scheme");
goto done;
}
result = 1;
done:
gpr_free(host);
gpr_free(port);
return result;
}
static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) {
const char *host_port = uri->path;
char *host;
char *port;
int port_num;
int result = 0;
struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr;
if (*host_port == '/') ++host_port;
if (!gpr_split_host_port(host_port, &host, &port)) {
return 0;
}
memset(in6, 0, sizeof(*in6));
*len = sizeof(*in6);
in6->sin6_family = AF_INET6;
if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
goto done;
}
if (port != NULL) {
if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 ||
port_num > 65535) {
gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
goto done;
}
in6->sin6_port = htons(port_num);
} else {
gpr_log(GPR_ERROR, "no port given for ipv6 scheme");
goto done;
}
result = 1;
done:
gpr_free(host);
gpr_free(port);
return result;
}
static grpc_resolver *sockaddr_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) {
sockaddr_resolver *r;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
return NULL;
}
r = gpr_malloc(sizeof(sockaddr_resolver));
memset(r, 0, sizeof(*r));
if (!parse(uri, &r->addr, &r->addr_len)) {
gpr_free(r);
return NULL;
}
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
/*
* FACTORY
*/
static void sockaddr_factory_ref(grpc_resolver_factory *factory) {}
static void sockaddr_factory_unref(grpc_resolver_factory *factory) {}
#define DECL_FACTORY(name) \
static grpc_resolver *name##_factory_create_resolver( \
grpc_resolver_factory *factory, grpc_uri *uri, \
grpc_subchannel_factory *subchannel_factory) { \
return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \
subchannel_factory, parse_##name); \
} \
static const grpc_resolver_factory_vtable name##_factory_vtable = { \
sockaddr_factory_ref, sockaddr_factory_unref, \
name##_factory_create_resolver}; \
static grpc_resolver_factory name##_resolver_factory = { \
&name##_factory_vtable}; \
grpc_resolver_factory *grpc_##name##_resolver_factory_create() { \
return &name##_resolver_factory; \
}
#ifdef GPR_POSIX_SOCKET
DECL_FACTORY(unix)
#endif
DECL_FACTORY(ipv4)
DECL_FACTORY(ipv6)

@ -38,7 +38,13 @@
#include "src/core/client_config/resolver_factory.h"
grpc_resolver_factory *grpc_ipv4_resolver_factory_create(void);
grpc_resolver_factory *grpc_ipv6_resolver_factory_create(void);
#ifdef GPR_POSIX_SOCKET
/** Create a unix resolver factory */
grpc_resolver_factory *grpc_unix_resolver_factory_create(void);
#endif
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_UNIX_RESOLVER_H */

@ -1,195 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <grpc/support/port_platform.h>
#ifdef GPR_POSIX_SOCKET
#include "src/core/client_config/resolvers/unix_resolver_posix.h"
#include <string.h>
#include <sys/un.h>
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
typedef struct {
/** base class: must be first */
grpc_resolver base;
/** refcount */
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
/** load balancing policy factory */
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels);
/** the address that we've 'resolved' */
struct sockaddr_un addr;
int addr_len;
/** mutex guarding the rest of the state */
gpr_mu mu;
/** have we published? */
int published;
/** pending next completion, or NULL */
grpc_iomgr_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
} unix_resolver;
static void unix_destroy(grpc_resolver *r);
static void unix_maybe_finish_next_locked(unix_resolver *r);
static void unix_shutdown(grpc_resolver *r);
static void unix_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void unix_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable unix_resolver_vtable = {
unix_destroy, unix_shutdown, unix_channel_saw_error, unix_next};
static void unix_shutdown(grpc_resolver *resolver) {
unix_resolver *r = (unix_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
/* TODO(ctiller): add delayed callback */
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
}
static void unix_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
int len) {}
static void unix_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
unix_resolver *r = (unix_resolver *)resolver;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_config = target_config;
unix_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
}
static void unix_maybe_finish_next_locked(unix_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_subchannel *subchannel;
grpc_subchannel_args args;
if (r->next_completion != NULL && !r->published) {
cfg = grpc_client_config_create();
memset(&args, 0, sizeof(args));
args.addr = (struct sockaddr *)&r->addr;
args.addr_len = r->addr_len;
subchannel =
grpc_subchannel_factory_create_subchannel(r->subchannel_factory, &args);
lb_policy = r->lb_policy_factory(&subchannel, 1);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "unix");
r->published = 1;
*r->target_config = cfg;
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
}
static void unix_destroy(grpc_resolver *gr) {
unix_resolver *r = (unix_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r);
}
static grpc_resolver *unix_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory) {
unix_resolver *r;
if (0 != strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "authority based uri's not supported");
return NULL;
}
r = gpr_malloc(sizeof(unix_resolver));
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &unix_resolver_vtable);
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
r->addr.sun_family = AF_UNIX;
strcpy(r->addr.sun_path, uri->path);
r->addr_len = strlen(r->addr.sun_path) + sizeof(r->addr.sun_family) + 1;
grpc_subchannel_factory_ref(subchannel_factory);
return &r->base;
}
/*
* FACTORY
*/
static void unix_factory_ref(grpc_resolver_factory *factory) {}
static void unix_factory_unref(grpc_resolver_factory *factory) {}
static grpc_resolver *unix_factory_create_resolver(
grpc_resolver_factory *factory, grpc_uri *uri,
grpc_subchannel_factory *subchannel_factory) {
return unix_create(uri, grpc_create_pick_first_lb_policy, subchannel_factory);
}
static const grpc_resolver_factory_vtable unix_factory_vtable = {
unix_factory_ref, unix_factory_unref, unix_factory_create_resolver};
static grpc_resolver_factory unix_resolver_factory = {&unix_factory_vtable};
grpc_resolver_factory *grpc_unix_resolver_factory_create() {
return &unix_resolver_factory;
}
#endif

@ -322,7 +322,8 @@ static void continue_connect(grpc_subchannel *c) {
static void start_connect(grpc_subchannel *c) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), c->backoff_delta);
c->next_attempt = gpr_time_add(
gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
continue_connect(c);
}
@ -639,7 +640,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
if (c->connecting_result.transport != NULL) {
publish_transport(c);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
@ -707,6 +708,12 @@ void grpc_subchannel_call_unref(
}
}
char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
return top_elem->filter->get_peer(top_elem);
}
void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
grpc_transport_stream_op *op) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);

@ -100,6 +100,9 @@ void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op *op);
/** continue querying for peer */
char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call);
struct grpc_subchannel_args {
/** Channel filters for this channel - wrapped factories will likely
want to mutate this */

@ -36,6 +36,7 @@
#include "src/core/iomgr/alarm_heap.h"
#include "src/core/iomgr/alarm_internal.h"
#include "src/core/iomgr/time_averaged_stats.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
@ -67,6 +68,7 @@ typedef struct {
static gpr_mu g_mu;
/* Allow only one run_some_expired_alarms at once */
static gpr_mu g_checker_mu;
static gpr_clock_type g_clock_type;
static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
@ -85,6 +87,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
gpr_mu_init(&g_mu);
gpr_mu_init(&g_checker_mu);
g_clock_type = now.clock_type;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@ -102,7 +105,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown(void) {
int i;
while (run_some_expired_alarms(NULL, gpr_inf_future(GPR_CLOCK_REALTIME), NULL,
while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL,
0))
;
for (i = 0; i < NUM_SHARDS; i++) {
@ -175,6 +178,8 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline,
gpr_timespec now) {
int is_first_alarm = 0;
shard_type *shard = &g_shards[shard_idx(alarm)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
alarm->cb = alarm_cb;
alarm->cb_arg = alarm_cb_arg;
alarm->deadline = deadline;
@ -355,9 +360,10 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
}
int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
GPR_ASSERT(now.clock_type == g_clock_type);
return run_some_expired_alarms(
drop_mu, now, next,
gpr_time_cmp(now, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0);
drop_mu, now, next,
gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
}
gpr_timespec grpc_alarm_list_next_timeout(void) {

@ -57,3 +57,7 @@ void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *polls
void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); }
void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); }
char *grpc_endpoint_get_peer(grpc_endpoint *ep) {
return ep->vtable->get_peer(ep);
}

@ -74,12 +74,15 @@ struct grpc_endpoint_vtable {
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
};
/* When data is available on the connection, calls the callback with slices. */
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
void *user_data);
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
/* Write slices out to the socket.
If the connection is ready for more data after the end of the call, it

@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
p.client =
grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size);
p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
"socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
p.server =
grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size);
p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
"socketpair-client");
gpr_free(final_name);
return p;
}

@ -81,8 +81,10 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read
SOCKET sv[2];
grpc_endpoint_pair p;
create_sockets(sv);
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"));
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"));
p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"),
"endpoint:server");
p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"),
"endpoint:client");
return p;
}

@ -57,9 +57,9 @@ static grpc_iomgr_object g_root_object;
static void background_callback_executor(void *ignored) {
gpr_mu_lock(&g_mu);
while (!g_shutdown) {
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN));
if (g_cbs_head) {
grpc_iomgr_closure *closure = g_cbs_head;
g_cbs_head = closure->next;
@ -67,7 +67,7 @@ static void background_callback_executor(void *ignored) {
gpr_mu_unlock(&g_mu);
closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_REALTIME),
} else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC),
&deadline)) {
} else {
gpr_mu_unlock(&g_mu);
@ -90,7 +90,7 @@ void grpc_iomgr_init(void) {
gpr_thd_id id;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
grpc_alarm_list_init(gpr_now(GPR_CLOCK_REALTIME));
grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
@ -145,7 +145,7 @@ void grpc_iomgr_shutdown(void) {
} while (g_cbs_head);
continue;
}
if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_REALTIME), NULL)) {
if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
continue;
}
if (g_root_object.next != &g_root_object) {

@ -136,7 +136,7 @@ static void finish_shutdown(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
/* pollset->mu already held */
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0;
}
@ -205,7 +205,7 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
static const int max_spin_polling_us = 10;
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
return -1;
}
if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(

@ -70,7 +70,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
now = gpr_now(GPR_CLOCK_REALTIME);
now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(now, deadline) > 0) {
return 0 /* GPR_FALSE */;
}

@ -36,12 +36,18 @@
#include <errno.h>
#include <string.h>
#include "src/core/support/string.h"
#ifdef GPR_POSIX_SOCKET
#include <sys/un.h>
#endif
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
#include "src/core/support/string.h"
static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0xff, 0xff};
@ -161,6 +167,31 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
return ret;
}
char *grpc_sockaddr_to_uri(const struct sockaddr *addr) {
char *temp;
char *result;
switch (addr->sa_family) {
case AF_INET:
grpc_sockaddr_to_string(&temp, addr, 0);
gpr_asprintf(&result, "ipv4:%s", temp);
gpr_free(temp);
return result;
case AF_INET6:
grpc_sockaddr_to_string(&temp, addr, 0);
gpr_asprintf(&result, "ipv6:%s", temp);
gpr_free(temp);
return result;
#ifdef GPR_POSIX_SOCKET
case AF_UNIX:
gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path);
return result;
#endif
}
return NULL;
}
int grpc_sockaddr_get_port(const struct sockaddr *addr) {
switch (addr->sa_family) {
case AF_INET:

@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port);
int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr,
int normalize);
char *grpc_sockaddr_to_uri(const struct sockaddr *addr);
#endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */

@ -64,6 +64,7 @@ typedef struct {
int refs;
grpc_iomgr_closure write_closure;
grpc_pollset_set *interested_parties;
char *addr_str;
} async_connect;
static int prepare_socket(const struct sockaddr *addr, int fd) {
@ -99,6 +100,7 @@ static void tc_on_alarm(void *acp, int success) {
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
gpr_free(ac);
}
}
@ -164,7 +166,7 @@ static void on_writable(void *acp, int success) {
}
} else {
grpc_pollset_set_del_fd(ac->interested_parties, fd);
ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str);
fd = NULL;
goto finish;
}
@ -185,6 +187,7 @@ finish:
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
gpr_free(ac);
}
cb(cb_arg, ep);
@ -229,13 +232,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
err = connect(fd, addr, addr_len);
} while (err < 0 && errno == EINTR);
grpc_sockaddr_to_string(&addr_str, addr, 1);
addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
if (err >= 0) {
cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
goto done;
}
@ -253,14 +256,16 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->cb_arg = arg;
ac->fd = fdobj;
ac->interested_parties = interested_parties;
ac->addr_str = addr_str;
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
gpr_mu_lock(&ac->mu);
grpc_alarm_init(&ac->alarm, deadline, tc_on_alarm, ac,
gpr_now(GPR_CLOCK_REALTIME));
grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);

@ -58,6 +58,7 @@ typedef struct {
grpc_winsocket *socket;
gpr_timespec deadline;
grpc_alarm alarm;
char *addr_name;
int refs;
int aborted;
} async_connect;
@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) {
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
}
}
@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) {
gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
gpr_free(utf8_message);
} else if (!aborted) {
ep = grpc_tcp_create(ac->socket);
ep = grpc_tcp_create(ac->socket, ac->addr_name);
}
} else {
gpr_log(GPR_ERROR, "on_connect is shutting down");
@ -213,10 +215,11 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
ac->socket = socket;
gpr_mu_init(&ac->mu);
ac->refs = 2;
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->aborted = 0;
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac,
gpr_now(GPR_CLOCK_REALTIME));
gpr_now(GPR_CLOCK_MONOTONIC));
socket->write_info.outstanding = 1;
grpc_socket_notify_on_write(socket, on_connect, ac);
return;

@ -44,15 +44,17 @@
#include <sys/socket.h>
#include <unistd.h>
#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
#include "src/core/profiling/timers.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/support/string.h"
#include "src/core/debug/trace.h"
#include "src/core/profiling/timers.h"
#ifdef GPR_HAVE_MSG_NOSIGNAL
#define SENDMSG_FLAGS MSG_NOSIGNAL
#else
@ -282,6 +284,8 @@ typedef struct {
grpc_iomgr_closure write_closure;
grpc_iomgr_closure handle_read_closure;
char *peer_string;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan");
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
}
@ -572,13 +577,22 @@ static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pol
grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
static char *grpc_tcp_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static const grpc_endpoint_vtable vtable = {
grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset,
grpc_tcp_add_to_pollset_set, grpc_tcp_shutdown, grpc_tcp_destroy};
grpc_tcp_notify_on_read, grpc_tcp_write,
grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set,
grpc_tcp_shutdown, grpc_tcp_destroy,
grpc_tcp_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;

@ -53,6 +53,7 @@ extern int grpc_tcp_trace;
/* Create a tcp endpoint given a file desciptor and a read slice size.
Takes ownership of fd. */
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size);
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */

@ -332,7 +332,7 @@ static void on_read(void *arg, int success) {
grpc_set_socket_no_sigpipe_if_possible(fd);
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
fdobj = grpc_fd_create(fd, name);
@ -342,8 +342,9 @@ static void on_read(void *arg, int success) {
for (i = 0; i < sp->server->pollset_count; i++) {
grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
}
sp->server->cb(sp->server->cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
sp->server->cb(
sp->server->cb_arg,
grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
gpr_free(name);
gpr_free(addr_str);

@ -243,6 +243,10 @@ static void on_accept(void *arg, int from_iocp) {
SOCKET sock = sp->new_socket;
grpc_winsocket_callback_info *info = &sp->socket->read_info;
grpc_endpoint *ep = NULL;
struct sockaddr_storage peer_name;
char *peer_name_string;
char *fd_name;
int peer_name_len = sizeof(peer_name);
DWORD transfered_bytes;
DWORD flags;
BOOL wsa_success;
@ -277,8 +281,13 @@ static void on_accept(void *arg, int from_iocp) {
}
} else {
if (!sp->shutting_down) {
/* TODO(ctiller): add sockaddr address to label */
ep = grpc_tcp_create(grpc_winsocket_create(sock, "server"));
getpeername(sock, (struct sockaddr *)&peer_name, &peer_name_len);
peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name);
gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
peer_name_string);
gpr_free(fd_name);
gpr_free(peer_name_string);
}
}

@ -96,6 +96,8 @@ typedef struct grpc_tcp {
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
char *peer_string;
} grpc_tcp;
static void tcp_ref(grpc_tcp *tcp) {
@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) {
gpr_slice_buffer_destroy(&tcp->write_slices);
grpc_winsocket_orphan(tcp->socket);
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
}
@ -393,11 +396,16 @@ static void win_destroy(grpc_endpoint *ep) {
tcp_unref(tcp);
}
static grpc_endpoint_vtable vtable = {
win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy
};
static char *win_get_peer(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return gpr_strdup(tcp->peer_string);
}
static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write,
win_add_to_pollset, win_shutdown,
win_destroy, win_get_peer};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp));
memset(tcp, 0, sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
@ -405,6 +413,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) {
gpr_mu_init(&tcp->mu);
gpr_slice_buffer_init(&tcp->write_slices);
gpr_ref_init(&tcp->refcount, 1);
tcp->peer_string = gpr_strdup(peer_string);
return &tcp->base;
}

@ -50,7 +50,7 @@
/* Create a tcp endpoint given a winsock handle.
* Takes ownership of the handle.
*/
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket);
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string);
int grpc_tcp_prepare_socket(SOCKET sock);

@ -344,6 +344,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_auth_filter = {
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "client-auth"};
auth_start_transport_op, grpc_channel_next_op,
sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, "client-auth"};

@ -337,9 +337,16 @@ static void endpoint_add_to_pollset_set(grpc_endpoint *secure_ep,
grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
}
static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
return grpc_endpoint_get_peer(ep->wrapped_ep);
}
static const grpc_endpoint_vtable vtable = {
endpoint_notify_on_read, endpoint_write, endpoint_add_to_pollset,
endpoint_add_to_pollset_set, endpoint_shutdown, endpoint_unref};
endpoint_notify_on_read, endpoint_write,
endpoint_add_to_pollset, endpoint_add_to_pollset_set,
endpoint_shutdown, endpoint_unref,
endpoint_get_peer};
grpc_endpoint *grpc_secure_endpoint_create(
struct tsi_frame_protector *protector, grpc_endpoint *transport,

@ -120,6 +120,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_server_auth_filter = {
auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "server-auth"};
auth_start_transport_op, grpc_channel_next_op,
sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem,
grpc_call_next_get_peer, "server-auth"};

@ -50,7 +50,7 @@ int gpr_join_host_port(char **out, const char *host, int port) {
}
}
void gpr_split_host_port(const char *name, char **host, char **port) {
int gpr_split_host_port(const char *name, char **host, char **port) {
const char *host_start;
size_t host_len;
const char *port_start;
@ -63,7 +63,7 @@ void gpr_split_host_port(const char *name, char **host, char **port) {
const char *rbracket = strchr(name, ']');
if (rbracket == NULL) {
/* Unmatched [ */
return;
return 0;
}
if (rbracket[1] == '\0') {
/* ]<end> */
@ -73,14 +73,14 @@ void gpr_split_host_port(const char *name, char **host, char **port) {
port_start = rbracket + 2;
} else {
/* ]<invalid> */
return;
return 0;
}
host_start = name + 1;
host_len = (size_t)(rbracket - host_start);
if (memchr(host_start, ':', host_len) == NULL) {
/* Require all bracketed hosts to contain a colon, because a hostname or
IPv4 address should never use brackets. */
return;
return 0;
}
} else {
const char *colon = strchr(name, ':');
@ -105,4 +105,6 @@ void gpr_split_host_port(const char *name, char **host, char **port) {
if (port_start != NULL) {
*port = gpr_strdup(port_start);
}
return 1;
}

@ -63,10 +63,11 @@ void gpr_cv_destroy(gpr_cv *cv) { GPR_ASSERT(pthread_cond_destroy(cv) == 0); }
int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int err = 0;
if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) {
err = pthread_cond_wait(cv, mu);
} else {
struct timespec abs_deadline_ts;
abs_deadline = gpr_convert_clock_type(abs_deadline, GPR_CLOCK_REALTIME);
abs_deadline_ts.tv_sec = abs_deadline.tv_sec;
abs_deadline_ts.tv_nsec = abs_deadline.tv_nsec;
err = pthread_cond_timedwait(cv, mu, &abs_deadline_ts);

@ -83,10 +83,10 @@ int gpr_cv_wait(gpr_cv *cv, gpr_mu *mu, gpr_timespec abs_deadline) {
int timeout = 0;
DWORD timeout_max_ms;
mu->locked = 0;
if (gpr_time_cmp(abs_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) == 0) {
if (gpr_time_cmp(abs_deadline, gpr_inf_future(abs_deadline.clock_type)) == 0) {
SleepConditionVariableCS(cv, &mu->cs, INFINITE);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
gpr_timespec now = gpr_now(abs_deadline.clock_type);
gpr_int64 now_ms = now.tv_sec * 1000 + now.tv_nsec / 1000000;
gpr_int64 deadline_ms =
abs_deadline.tv_sec * 1000 + abs_deadline.tv_nsec / 1000000;

@ -290,3 +290,30 @@ gpr_int32 gpr_time_to_millis(gpr_timespec t) {
double gpr_timespec_to_micros(gpr_timespec t) {
return (double)t.tv_sec * GPR_US_PER_SEC + t.tv_nsec * 1e-3;
}
gpr_timespec gpr_convert_clock_type(gpr_timespec t, gpr_clock_type clock_type) {
if (t.clock_type == clock_type) {
return t;
}
if (t.tv_nsec == 0) {
if (t.tv_sec == TYPE_MAX(time_t)) {
t.clock_type = clock_type;
return t;
}
if (t.tv_sec == TYPE_MIN(time_t)) {
t.clock_type = clock_type;
return t;
}
}
if (clock_type == GPR_TIMESPAN) {
return gpr_time_sub(t, gpr_now(t.clock_type));
}
if (t.clock_type == GPR_TIMESPAN) {
return gpr_time_add(gpr_now(clock_type), t);
}
return gpr_time_add(gpr_now(clock_type), gpr_time_sub(t, gpr_now(t.clock_type)));
}

@ -120,7 +120,7 @@ void gpr_sleep_until(gpr_timespec until) {
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
* slightly less portable. */
now = gpr_now(GPR_CLOCK_REALTIME);
now = gpr_now(until.clock_type);
if (gpr_time_cmp(until, now) <= 0) {
return;
}

@ -80,7 +80,7 @@ void gpr_sleep_until(gpr_timespec until) {
for (;;) {
/* We could simplify by using clock_nanosleep instead, but it might be
* slightly less portable. */
now = gpr_now(GPR_CLOCK_REALTIME);
now = gpr_now(until.clock_type);
if (gpr_time_cmp(until, now) <= 0) {
return;
}

@ -348,7 +348,7 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
}
grpc_call_stack_init(channel_stack, server_transport_data, initial_op_ptr,
CALL_STACK_FROM_CALL(call));
if (gpr_time_cmp(send_deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) != 0) {
set_deadline_alarm(call, send_deadline);
}
return call;
@ -1253,6 +1253,11 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
elem->filter->start_transport_stream_op(elem, op);
}
char *grpc_call_get_peer(grpc_call *call) {
grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
return elem->filter->get_peer(elem);
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
@ -1278,8 +1283,8 @@ static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline) {
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
grpc_alarm_init(&call->alarm, deadline, call_alarm, call,
gpr_now(GPR_CLOCK_REALTIME));
grpc_alarm_init(&call->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata

@ -36,12 +36,14 @@
#include <stdlib.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include "src/core/iomgr/iomgr.h"
#include "src/core/support/string.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
/** Cache grpc-status: X mdelems for X = 0..NUM_CACHED_STATUS_ELEMS.
* Avoids needing to take a metadata context lock for sending status
@ -73,6 +75,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
grpc_iomgr_closure destroy_closure;
char *target;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@ -85,13 +88,14 @@ struct grpc_channel {
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
const char *target, const grpc_channel_filter **filters, size_t num_filters,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client) {
size_t i;
size_t size =
sizeof(grpc_channel) + grpc_channel_stack_size(filters, num_filters);
grpc_channel *channel = gpr_malloc(size);
memset(channel, 0, sizeof(*channel));
channel->target = gpr_strdup(target);
GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
channel->is_client = is_client;
/* decremented by grpc_channel_destroy */
@ -137,6 +141,10 @@ grpc_channel *grpc_channel_create_from_filters(
return channel;
}
char *grpc_channel_get_target(grpc_channel *channel) {
return gpr_strdup(channel->target);
}
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
@ -222,6 +230,7 @@ static void destroy_channel(void *p, int ok) {
}
grpc_mdctx_unref(channel->metadata_context);
gpr_mu_destroy(&channel->registered_call_mu);
gpr_free(channel->target);
gpr_free(channel);
}

@ -38,7 +38,7 @@
#include "src/core/client_config/subchannel_factory.h"
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t count,
const char *target, const grpc_channel_filter **filters, size_t count,
const grpc_channel_args *args, grpc_mdctx *mdctx, int is_client);
/** Get a (borrowed) pointer to this channels underlying channel stack */

@ -172,8 +172,9 @@ void grpc_channel_watch_connectivity_state(
w->tag = tag;
w->channel = channel;
grpc_alarm_init(&w->alarm, deadline, timeout_complete, w,
gpr_now(GPR_CLOCK_REALTIME));
grpc_alarm_init(
&w->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,

@ -171,7 +171,8 @@ grpc_channel *grpc_channel_create(const char *target,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
channel =
grpc_channel_create_from_filters(target, filters, n, args, mdctx, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;

@ -148,6 +148,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
grpc_event ret;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "next");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@ -188,6 +190,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
grpc_cq_completion *c;
grpc_cq_completion *prev;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
GRPC_CQ_INTERNAL_REF(cc, "pluck");
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {

@ -39,6 +39,7 @@
#include "src/core/channel/channel_stack.h"
#include "src/core/client_config/resolver_registry.h"
#include "src/core/client_config/resolvers/dns_resolver.h"
#include "src/core/client_config/resolvers/sockaddr_resolver.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/profiling/timers.h"
@ -48,10 +49,6 @@
#include "src/core/transport/chttp2_transport.h"
#include "src/core/transport/connectivity_state.h"
#ifdef GPR_POSIX_SOCKET
#include "src/core/client_config/resolvers/unix_resolver_posix.h"
#endif
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
@ -69,6 +66,8 @@ void grpc_init(void) {
gpr_time_init();
grpc_resolver_registry_init("dns:///");
grpc_register_resolver_type("dns", grpc_dns_resolver_factory_create());
grpc_register_resolver_type("ipv4", grpc_ipv4_resolver_factory_create());
grpc_register_resolver_type("ipv6", grpc_ipv6_resolver_factory_create());
#ifdef GPR_POSIX_SOCKET
grpc_register_resolver_type("unix", grpc_unix_resolver_factory_create());
#endif

@ -47,7 +47,10 @@ typedef struct {
grpc_linked_mdelem details;
} call_data;
typedef struct { grpc_mdctx *mdctx; } channel_data;
typedef struct {
grpc_mdctx *mdctx;
grpc_channel *master;
} channel_data;
static void lame_start_transport_stream_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
@ -82,6 +85,11 @@ static void lame_start_transport_stream_op(grpc_call_element *elem,
}
}
static char *lame_get_peer(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
return grpc_channel_get_target(chand->master);
}
static void lame_start_transport_op(grpc_channel_element *elem,
grpc_transport_op *op) {
if (op->on_connectivity_state_change) {
@ -112,6 +120,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
GPR_ASSERT(is_first);
GPR_ASSERT(is_last);
chand->mdctx = mdctx;
chand->master = master;
}
static void destroy_channel_elem(grpc_channel_element *elem) {}
@ -125,11 +134,12 @@ static const grpc_channel_filter lame_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
lame_get_peer,
"lame-client",
};
grpc_channel *grpc_lame_client_channel_create(void) {
grpc_channel *grpc_lame_client_channel_create(const char *target) {
static const grpc_channel_filter *filters[] = {&lame_filter};
return grpc_channel_create_from_filters(filters, 1, NULL, grpc_mdctx_create(),
1);
return grpc_channel_create_from_filters(target, filters, 1, NULL,
grpc_mdctx_create(), 1);
}

@ -199,13 +199,13 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
if (grpc_find_security_connector_in_args(args) != NULL) {
gpr_log(GPR_ERROR, "Cannot set security context in channel args.");
return grpc_lame_client_channel_create();
return grpc_lame_client_channel_create(target);
}
if (grpc_credentials_create_security_connector(
creds, target, args, NULL, &connector, &new_args_from_connector) !=
GRPC_SECURITY_OK) {
return grpc_lame_client_channel_create();
return grpc_lame_client_channel_create(target);
}
mdctx = grpc_mdctx_create();
@ -221,7 +221,8 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
channel =
grpc_channel_create_from_filters(target, filters, n, args_copy, mdctx, 1);
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;

@ -722,6 +722,7 @@ static const grpc_channel_filter server_surface_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"server",
};
@ -878,8 +879,8 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
channel =
grpc_channel_create_from_filters(filters, num_filters, args, mdctx, 0);
channel = grpc_channel_create_from_filters(NULL, filters, num_filters, args,
mdctx, 0);
chand = (channel_data *)grpc_channel_stack_element(
grpc_channel_get_channel_stack(channel), 0)
->channel_data;

@ -286,6 +286,7 @@ struct grpc_chttp2_transport {
grpc_endpoint *ep;
grpc_mdctx *metadata_context;
gpr_refcount refs;
char *peer_string;
gpr_mu mu;

@ -438,7 +438,7 @@ static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_mdelem *mdelem;
grpc_chttp2_encode_timeout(
gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME)), timeout_str);
gpr_time_sub(deadline, gpr_now(deadline.clock_type)), timeout_str);
mdelem = grpc_mdelem_from_metadata_strings(
c->mdctx, GRPC_MDSTR_REF(c->timeout_key_str),
grpc_mdstr_from_string(c->mdctx, timeout_str));
@ -560,6 +560,7 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
grpc_mdctx *mdctx = compressor->mdctx;
grpc_linked_mdelem *l;
int need_unref = 0;
gpr_timespec deadline;
GPR_ASSERT(stream_id != 0);
@ -589,9 +590,9 @@ void grpc_chttp2_encode(grpc_stream_op *ops, size_t ops_count, int eof,
l->md = hpack_enc(compressor, l->md, &st);
need_unref |= l->md != NULL;
}
if (gpr_time_cmp(op->data.metadata.deadline,
gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
deadline_enc(compressor, op->data.metadata.deadline, &st);
deadline = op->data.metadata.deadline;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
deadline_enc(compressor, deadline, &st);
}
curop++;
break;

@ -170,6 +170,7 @@ static void destruct_transport(grpc_chttp2_transport *t) {
grpc_mdctx_unref(t->metadata_context);
gpr_free(t->peer_string);
gpr_free(t);
}
@ -219,6 +220,7 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_ref_init(&t->refs, 2);
gpr_mu_init(&t->mu);
grpc_mdctx_ref(mdctx);
t->peer_string = grpc_endpoint_get_peer(ep);
t->metadata_context = mdctx;
t->endpoint_reading = 1;
t->global.next_stream_id = is_client ? 1 : 2;
@ -1090,9 +1092,17 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
* INTEGRATION GLUE
*/
static const grpc_transport_vtable vtable = {
sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
perform_transport_op, destroy_stream, destroy_transport};
static char *chttp2_get_peer(grpc_transport *t) {
return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
}
static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
init_stream,
perform_stream_op,
perform_transport_op,
destroy_stream,
destroy_transport,
chttp2_get_peer};
grpc_transport *grpc_create_chttp2_transport(
const grpc_channel_args *channel_args, grpc_endpoint *ep, grpc_mdctx *mdctx,

@ -65,6 +65,10 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
transport->vtable->destroy_stream(transport, stream);
}
char *grpc_transport_get_peer(grpc_transport *transport) {
return transport->vtable->get_peer(transport);
}
void grpc_transport_stream_op_finish_with_failure(
grpc_transport_stream_op *op) {
if (op->send_ops) {

@ -184,4 +184,7 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_transport *transport);
/* Get the transports peer */
char *grpc_transport_get_peer(grpc_transport *transport);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_TRANSPORT_H */

@ -58,6 +58,9 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_transport *self);
/* implementation of grpc_transport_get_peer */
char *(*get_peer)(grpc_transport *self);
} grpc_transport_vtable;
/* an instance of a grpc transport */

@ -61,7 +61,7 @@ static void put_metadata_list(gpr_strvec *b, grpc_metadata_batch md) {
if (m != md.list.head) gpr_strvec_add(b, gpr_strdup(", "));
put_metadata(b, m->md);
}
if (gpr_time_cmp(md.deadline, gpr_inf_future(GPR_CLOCK_REALTIME)) != 0) {
if (gpr_time_cmp(md.deadline, gpr_inf_future(md.deadline.clock_type)) != 0) {
char *tmp;
gpr_asprintf(&tmp, " deadline=%d.%09d", md.deadline.tv_sec,
md.deadline.tv_nsec);

@ -51,7 +51,7 @@ std::shared_ptr<ChannelInterface> CreateChannel(
cp_args.SetString(GRPC_ARG_PRIMARY_USER_AGENT_STRING,
user_agent_prefix.str());
return creds ? creds->CreateChannel(target, cp_args)
: std::shared_ptr<ChannelInterface>(
new Channel(target, grpc_lame_client_channel_create()));
: std::shared_ptr<ChannelInterface>(new Channel(
target, grpc_lame_client_channel_create(NULL)));
}
} // namespace grpc

@ -33,6 +33,7 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
@ -99,17 +100,17 @@ namespace Grpc.Core.Tests
[Test]
public void UnaryCall()
{
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", CancellationToken.None));
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None));
}
[Test]
public void UnaryCall_ServerHandlerThrows()
{
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
try
{
Calls.BlockingUnaryCall(call, "THROW", CancellationToken.None);
Calls.BlockingUnaryCall(internalCall, "THROW", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
@ -118,11 +119,41 @@ namespace Grpc.Core.Tests
}
}
[Test]
public void UnaryCall_ServerHandlerThrowsRpcException()
{
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
try
{
Calls.BlockingUnaryCall(internalCall, "THROW_UNAUTHENTICATED", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
}
}
[Test]
public void UnaryCall_ServerHandlerSetsStatus()
{
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
try
{
Calls.BlockingUnaryCall(internalCall, "SET_UNAUTHENTICATED", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
{
Assert.AreEqual(StatusCode.Unauthenticated, e.Status.StatusCode);
}
}
[Test]
public void AsyncUnaryCall()
{
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
var result = Calls.AsyncUnaryCall(call, "ABC", CancellationToken.None).Result;
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
var result = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None).ResponseAsync.Result;
Assert.AreEqual("ABC", result);
}
@ -131,10 +162,10 @@ namespace Grpc.Core.Tests
{
Task.Run(async () =>
{
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
try
{
await Calls.AsyncUnaryCall(call, "THROW", CancellationToken.None);
await Calls.AsyncUnaryCall(internalCall, "THROW", CancellationToken.None);
Assert.Fail();
}
catch (RpcException e)
@ -149,11 +180,11 @@ namespace Grpc.Core.Tests
{
Task.Run(async () =>
{
var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
var callResult = Calls.AsyncClientStreamingCall(call, CancellationToken.None);
var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
var call = Calls.AsyncClientStreamingCall(internalCall, CancellationToken.None);
await callResult.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await callResult.Result);
await call.RequestStream.WriteAll(new string[] { "A", "B", "C" });
Assert.AreEqual("ABC", await call.ResponseAsync);
}).Wait();
}
@ -162,10 +193,10 @@ namespace Grpc.Core.Tests
{
Task.Run(async () =>
{
var call = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
var internalCall = new Call<string, string>(ServiceName, ConcatAndEchoMethod, channel, Metadata.Empty);
var cts = new CancellationTokenSource();
var callResult = Calls.AsyncClientStreamingCall(call, cts.Token);
var call = Calls.AsyncClientStreamingCall(internalCall, cts.Token);
// TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
await Task.Delay(1000);
@ -173,7 +204,7 @@ namespace Grpc.Core.Tests
try
{
await callResult.Result;
await call.ResponseAsync;
}
catch (RpcException e)
{
@ -182,30 +213,54 @@ namespace Grpc.Core.Tests
}).Wait();
}
[Test]
public void AsyncUnaryCall_EchoMetadata()
{
var headers = new Metadata
{
new Metadata.Entry("asciiHeader", "abcdefg"),
new Metadata.Entry("binaryHeader-bin", new byte[] { 1, 2, 3, 0, 0xff }),
};
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, headers);
var call = Calls.AsyncUnaryCall(internalCall, "ABC", CancellationToken.None);
Assert.AreEqual("ABC", call.ResponseAsync.Result);
Assert.AreEqual(StatusCode.OK, call.GetStatus().StatusCode);
var trailers = call.GetTrailers();
Assert.AreEqual(2, trailers.Count);
Assert.AreEqual(headers[0].Key, trailers[0].Key);
Assert.AreEqual(headers[0].Value, trailers[0].Value);
Assert.AreEqual(headers[1].Key, trailers[1].Key);
CollectionAssert.AreEqual(headers[1].ValueBytes, trailers[1].ValueBytes);
}
[Test]
public void UnaryCall_DisposedChannel()
{
channel.Dispose();
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(call, "ABC", CancellationToken.None));
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(internalCall, "ABC", CancellationToken.None));
}
[Test]
public void UnaryCallPerformance()
{
var call = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
BenchmarkUtil.RunBenchmark(100, 100,
() => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
() => { Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken)); });
}
[Test]
public void UnknownMethodHandler()
{
var call = new Call<string, string>(ServiceName, NonexistentMethod, channel, Metadata.Empty);
var internalCall = new Call<string, string>(ServiceName, NonexistentMethod, channel, Metadata.Empty);
try
{
Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken));
Calls.BlockingUnaryCall(internalCall, "ABC", default(CancellationToken));
Assert.Fail();
}
catch (RpcException e)
@ -214,16 +269,48 @@ namespace Grpc.Core.Tests
}
}
private static async Task<string> EchoHandler(ServerCallContext context, string request)
[Test]
public void UserAgentStringPresent()
{
var internalCall = new Call<string, string>(ServiceName, EchoMethod, channel, Metadata.Empty);
string userAgent = Calls.BlockingUnaryCall(internalCall, "RETURN-USER-AGENT", CancellationToken.None);
Assert.IsTrue(userAgent.StartsWith("grpc-csharp/"));
}
private static async Task<string> EchoHandler(string request, ServerCallContext context)
{
foreach (Metadata.Entry metadataEntry in context.RequestHeaders)
{
if (metadataEntry.Key != "user-agent")
{
context.ResponseTrailers.Add(metadataEntry);
}
}
if (request == "RETURN-USER-AGENT")
{
return context.RequestHeaders.Where(entry => entry.Key == "user-agent").Single().Value;
}
if (request == "THROW")
{
throw new Exception("This was thrown on purpose by a test");
}
if (request == "THROW_UNAUTHENTICATED")
{
throw new RpcException(new Status(StatusCode.Unauthenticated, ""));
}
if (request == "SET_UNAUTHENTICATED")
{
context.Status = new Status(StatusCode.Unauthenticated, "");
}
return request;
}
private static async Task<string> ConcatAndEchoHandler(ServerCallContext context, IAsyncStreamReader<string> requestStream)
private static async Task<string> ConcatAndEchoHandler(IAsyncStreamReader<string> requestStream, ServerCallContext context)
{
string result = "";
await requestStream.ForEach(async (request) =>

@ -59,5 +59,26 @@ namespace Grpc.Core.Internal.Tests
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
[Test]
public void ReadMetadataFromPtrUnsafe()
{
var metadata = new Metadata
{
new Metadata.Entry("host", "somehost"),
new Metadata.Entry("header2", "header value"),
};
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
var copy = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(nativeMetadata.Handle);
Assert.AreEqual(2, copy.Count);
Assert.AreEqual("host", copy[0].Key);
Assert.AreEqual("somehost", copy[0].Value);
Assert.AreEqual("header2", copy[1].Key);
Assert.AreEqual("header value", copy[1].Value);
nativeMetadata.Dispose();
}
}
}

@ -58,6 +58,19 @@ namespace Grpc.Core.Internal.Tests
Assert.AreEqual(Timespec.NativeSize, Marshal.SizeOf(typeof(Timespec)));
}
[Test]
public void ToDateTime()
{
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc),
new Timespec(IntPtr.Zero, 0).ToDateTime());
Assert.AreEqual(new DateTime(1970, 1, 1, 0, 0, 10, DateTimeKind.Utc).AddTicks(50),
new Timespec(new IntPtr(10), 5000).ToDateTime());
Assert.AreEqual(new DateTime(2015, 7, 21, 4, 21, 48, DateTimeKind.Utc),
new Timespec(new IntPtr(1437452508), 0).ToDateTime());
}
[Test]
public void Add()
{

@ -43,24 +43,28 @@ namespace Grpc.Core
public sealed class AsyncClientStreamingCall<TRequest, TResponse> : IDisposable
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
readonly Task<TResponse> responseAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction)
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.result = result;
this.responseAsync = responseAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
/// <summary>
/// Asynchronous call result.
/// </summary>
public Task<TResponse> Result
public Task<TResponse> ResponseAsync
{
get
{
return this.result;
return this.responseAsync;
}
}
@ -81,11 +85,11 @@ namespace Grpc.Core
/// <returns></returns>
public TaskAwaiter<TResponse> GetAwaiter()
{
return result.GetAwaiter();
return responseAsync.GetAwaiter();
}
/// <summary>
/// Provides means to provide after the call.
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.

@ -44,12 +44,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
@ -75,6 +79,24 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything.

@ -43,11 +43,15 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction)
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
@ -62,6 +66,24 @@ namespace Grpc.Core
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (response stream has been fully read), doesn't do anything.

@ -0,0 +1,106 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Grpc.Core
{
/// <summary>
/// Return type for single request - single response call.
/// </summary>
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> responseAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseAsync = responseAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
}
/// <summary>
/// Asynchronous call result.
/// </summary>
public Task<TResponse> ResponseAsync
{
get
{
return this.responseAsync;
}
}
/// <summary>
/// Allows awaiting this object directly.
/// </summary>
public TaskAwaiter<TResponse> GetAwaiter()
{
return responseAsync.GetAwaiter();
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
}
/// <summary>
/// Gets the call trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
}
/// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
public void Dispose()
{
disposeAction.Invoke();
}
}
}

@ -53,7 +53,7 @@ namespace Grpc.Core
return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers);
}
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
public static AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
where TRequest : class
where TResponse : class
{
@ -61,7 +61,7 @@ namespace Grpc.Core
asyncCall.Initialize(call.Channel, call.Channel.CompletionQueue, call.Name);
var asyncResult = asyncCall.UnaryCallAsync(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
return await asyncResult;
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
public static AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
@ -73,7 +73,7 @@ namespace Grpc.Core
asyncCall.StartServerStreamingCall(req, call.Headers);
RegisterCancellationCallback(asyncCall, token);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -85,7 +85,7 @@ namespace Grpc.Core
var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers);
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
@ -98,7 +98,7 @@ namespace Grpc.Core
RegisterCancellationCallback(asyncCall, token);
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token)

@ -28,11 +28,14 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
@ -44,6 +47,7 @@ namespace Grpc.Core
{
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
readonly string target;
bool disposed;
@ -57,7 +61,10 @@ namespace Grpc.Core
public Channel(string host, Credentials credentials = null, IEnumerable<ChannelOption> options = null)
{
this.environment = GrpcEnvironment.GetInstance();
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(options))
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
using (ChannelArgsSafeHandle nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
if (credentials != null)
{
@ -71,7 +78,7 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs);
}
}
this.target = GetOverridenTarget(host, options);
this.target = GetOverridenTarget(host, this.options);
}
/// <summary>
@ -141,6 +148,20 @@ namespace Grpc.Core
}
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)
{
if (!options.Any((option) => option.Name == ChannelOptions.PrimaryUserAgentString))
{
options.Add(new ChannelOption(ChannelOptions.PrimaryUserAgentString, GetUserAgentString()));
}
}
private static string GetUserAgentString()
{
// TODO(jtattermusch): it would be useful to also provide .NET/mono version.
return string.Format("grpc-csharp/{0}", VersionInfo.CurrentVersion);
}
/// <summary>
/// Look for SslTargetNameOverride option and return its value instead of originalTarget
/// if found.

@ -115,41 +115,49 @@ namespace Grpc.Core
}
}
/// <summary>
/// Defines names of supported channel options.
/// </summary>
public static class ChannelOptions
{
// Override SSL target check. Only to be used for testing.
/// <summary>Override SSL target check. Only to be used for testing.</summary>
public const string SslTargetNameOverride = "grpc.ssl_target_name_override";
// Enable census for tracing and stats collection
/// <summary>Enable census for tracing and stats collection</summary>
public const string Census = "grpc.census";
// Maximum number of concurrent incoming streams to allow on a http2 connection
/// <summary>Maximum number of concurrent incoming streams to allow on a http2 connection</summary>
public const string MaxConcurrentStreams = "grpc.max_concurrent_streams";
// Maximum message length that the channel can receive
/// <summary>Maximum message length that the channel can receive</summary>
public const string MaxMessageLength = "grpc.max_message_length";
// Initial sequence number for http2 transports
/// <summary>Initial sequence number for http2 transports</summary>
public const string Http2InitialSequenceNumber = "grpc.http2.initial_sequence_number";
/// <summary>Primary user agent: goes at the start of the user-agent metadata</summary>
public const string PrimaryUserAgentString = "grpc.primary_user_agent";
/// <summary> Secondary user agent: goes at the end of the user-agent metadata</summary>
public const string SecondaryUserAgentString = "grpc.secondary_user_agent";
/// <summary>
/// Creates native object for a collection of channel options.
/// </summary>
/// <returns>The native channel arguments.</returns>
internal static ChannelArgsSafeHandle CreateChannelArgs(IEnumerable<ChannelOption> options)
internal static ChannelArgsSafeHandle CreateChannelArgs(List<ChannelOption> options)
{
if (options == null)
if (options == null || options.Count == 0)
{
return ChannelArgsSafeHandle.CreateNull();
}
var optionList = new List<ChannelOption>(options); // It's better to do defensive copy
ChannelArgsSafeHandle nativeArgs = null;
try
{
nativeArgs = ChannelArgsSafeHandle.Create(optionList.Count);
for (int i = 0; i < optionList.Count; i++)
nativeArgs = ChannelArgsSafeHandle.Create(options.Count);
for (int i = 0; i < options.Count; i++)
{
var option = optionList[i];
var option = options[i];
if (option.Type == ChannelOption.OptionType.Integer)
{
nativeArgs.SetInteger(i, option.Name, option.IntValue);

@ -33,13 +33,12 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
<Reference Include="System.Interactive.Async">
<HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
</Reference>
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
@ -102,6 +101,8 @@
<Compile Include="Internal\CompletionRegistry.cs" />
<Compile Include="Internal\BatchContextSafeHandle.cs" />
<Compile Include="ChannelOptions.cs" />
<Compile Include="AsyncUnaryCall.cs" />
<Compile Include="VersionInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -52,8 +52,8 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
// Set after status is received. Only used for streaming response calls.
Status? finishedStatus;
// Set after status is received. Used for both unary and streaming response calls.
ClientSideStatus? finishedStatus;
bool readObserverCompleted; // True if readObserver has already been completed.
@ -248,6 +248,32 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Gets the resulting status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Status GetStatus()
{
lock (myLock)
{
Preconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished.");
return finishedStatus.Value.Status;
}
}
/// <summary>
/// Gets the trailing metadata if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
public Metadata GetTrailers()
{
lock (myLock)
{
Preconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished.");
return finishedStatus.Value.Trailers;
}
}
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@ -265,7 +291,7 @@ namespace Grpc.Core.Internal
if (shouldComplete)
{
var status = finishedStatus.Value;
var status = finishedStatus.Value.Status;
if (status.StatusCode != StatusCode.OK)
{
FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
@ -288,9 +314,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
var fullStatus = ctx.GetReceivedStatusOnClient();
lock (myLock)
{
finished = true;
finishedStatus = fullStatus;
halfclosed = true;
ReleaseResourcesIfPossible();
@ -302,7 +332,8 @@ namespace Grpc.Core.Internal
return;
}
var status = ctx.GetReceivedStatus();
var status = fullStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
@ -321,13 +352,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();
var fullStatus = ctx.GetReceivedStatusOnClient();
AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
finishedStatus = status;
finishedStatus = fullStatus;
origReadCompletionDelegate = readCompletionDelegate;

@ -101,14 +101,17 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendStatusFromServer(status, HandleHalfclosed);
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
}
halfcloseRequested = true;
readingDone = true;
sendCompletionDelegate = completionDelegate;

@ -38,7 +38,6 @@ using Grpc.Core;
namespace Grpc.Core.Internal
{
/// <summary>
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
@ -46,6 +45,9 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_initial_metadata(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
@ -58,12 +60,24 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_host(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec grpcsharp_batch_context_server_rpc_new_deadline(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_request_metadata(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
@ -87,13 +101,26 @@ namespace Grpc.Core.Internal
}
}
public Status GetReceivedStatus()
// Gets data of recv_initial_metadata completion.
public Metadata GetReceivedInitialMetadata()
{
IntPtr metadataArrayPtr = grpcsharp_batch_context_recv_initial_metadata(this);
return MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
}
// Gets data of recv_status_on_client completion.
public ClientSideStatus GetReceivedStatusOnClient()
{
// TODO: can the native method return string directly?
string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this));
return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details);
var status = new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details);
IntPtr metadataArrayPtr = grpcsharp_batch_context_recv_status_on_client_trailing_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ClientSideStatus(status, metadata);
}
// Gets data of recv_message completion.
public byte[] GetReceivedMessage()
{
IntPtr len = grpcsharp_batch_context_recv_message_length(this);
@ -106,16 +133,22 @@ namespace Grpc.Core.Internal
return data;
}
public CallSafeHandle GetServerRpcNewCall()
// Gets data of server_rpc_new completion.
public ServerRpcNew GetServerRpcNew()
{
return grpcsharp_batch_context_server_rpc_new_call(this);
}
var call = grpcsharp_batch_context_server_rpc_new_call(this);
public string GetServerRpcNewMethod()
{
return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
var method = Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this));
var host = Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_host(this));
var deadline = grpcsharp_batch_context_server_rpc_new_deadline(this);
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
return new ServerRpcNew(call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
public bool GetReceivedCloseOnServerCancelled()
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
@ -127,4 +160,97 @@ namespace Grpc.Core.Internal
return true;
}
}
/// <summary>
/// Status + metadata received on client side when call finishes.
/// (when receive_status_on_client operation finishes).
/// </summary>
internal struct ClientSideStatus
{
readonly Status status;
readonly Metadata trailers;
public ClientSideStatus(Status status, Metadata trailers)
{
this.status = status;
this.trailers = trailers;
}
public Status Status
{
get
{
return this.status;
}
}
public Metadata Trailers
{
get
{
return this.trailers;
}
}
}
/// <summary>
/// Details of a newly received RPC.
/// </summary>
internal struct ServerRpcNew
{
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
this.call = call;
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestMetadata = requestMetadata;
}
public CallSafeHandle Call
{
get
{
return this.call;
}
}
public string Method
{
get
{
return this.method;
}
}
public string Host
{
get
{
return this.host;
}
}
public Timespec Deadline
{
get
{
return this.deadline;
}
}
public Metadata RequestMetadata
{
get
{
return this.requestMetadata;
}
}
}
}

@ -81,7 +81,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@ -159,11 +159,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)

@ -45,13 +45,25 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_metadata_array_add(MetadataArraySafeHandle array, string key, byte[] value, UIntPtr valueLength);
[DllImport("grpc_csharp_ext.dll")]
static extern UIntPtr grpcsharp_metadata_array_count(IntPtr metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_metadata_array_get_key(IntPtr metadataArray, UIntPtr index);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_metadata_array_get_value(IntPtr metadataArray, UIntPtr index);
[DllImport("grpc_csharp_ext.dll")]
static extern UIntPtr grpcsharp_metadata_array_get_value_length(IntPtr metadataArray, UIntPtr index);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_metadata_array_destroy_full(IntPtr array);
private MetadataArraySafeHandle()
{
}
public static MetadataArraySafeHandle Create(Metadata metadata)
{
// TODO(jtattermusch): we might wanna check that the metadata is readonly
@ -63,6 +75,38 @@ namespace Grpc.Core.Internal
return metadataArray;
}
/// <summary>
/// Reads metadata from pointer to grpc_metadata_array
/// </summary>
public static Metadata ReadMetadataFromPtrUnsafe(IntPtr metadataArray)
{
if (metadataArray == IntPtr.Zero)
{
return null;
}
ulong count = grpcsharp_metadata_array_count(metadataArray).ToUInt64();
var metadata = new Metadata();
for (ulong i = 0; i < count; i++)
{
var index = new UIntPtr(i);
string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index));
var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()];
Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length);
metadata.Add(new Metadata.Entry(key, bytes));
}
return metadata;
}
internal IntPtr Handle
{
get
{
return handle;
}
}
protected override bool ReleaseHandle()
{
grpcsharp_metadata_array_destroy_full(handle);

@ -34,6 +34,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@ -42,7 +43,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment);
Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@ -58,27 +59,28 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status = Status.DefaultSuccess;
Status status;
var context = HandlerUtils.NewContext(newRpc);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
var result = await handler(request, context);
status = context.Status;
await responseStream.WriteAsync(result);
}
catch (Exception e)
@ -88,7 +90,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@ -111,28 +113,28 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status = Status.DefaultSuccess;
Status status;
var context = HandlerUtils.NewContext(newRpc);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);
await handler(request, responseStream, context);
status = context.Status;
}
catch (Exception e)
{
@ -142,7 +144,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@ -165,23 +167,24 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
Status status = Status.DefaultSuccess;
Status status;
var context = HandlerUtils.NewContext(newRpc);
try
{
var result = await handler(context, requestStream);
var result = await handler(requestStream, context);
status = context.Status;
try
{
await responseStream.WriteAsync(result);
@ -199,7 +202,7 @@ namespace Grpc.Core.Internal
try
{
await responseStream.WriteStatusAsync(status);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@ -222,23 +225,24 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
Status status = Status.DefaultSuccess;
Status status;
var context = HandlerUtils.NewContext(newRpc);
try
{
await handler(context, requestStream, responseStream);
await handler(requestStream, responseStream, context);
status = context.Status;
}
catch (Exception e)
{
@ -247,7 +251,7 @@ namespace Grpc.Core.Internal
}
try
{
await responseStream.WriteStatusAsync(status);
await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@ -259,18 +263,19 @@ namespace Grpc.Core.Internal
internal class NoSuchMethodCallHandler : IServerCallHandler
{
public async Task HandleCall(string methodName, CallSafeHandle call, GrpcEnvironment environment)
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
(payload) => payload, (payload) => payload, environment);
asyncCall.Initialize(call);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."));
await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty);
await finishedTask;
}
}
@ -279,8 +284,22 @@ namespace Grpc.Core.Internal
{
public static Status StatusFromException(Exception e)
{
var rpcException = e as RpcException;
if (rpcException != null)
{
// use the status thrown by handler.
return rpcException.Status;
}
// TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
public static ServerCallContext NewContext(ServerRpcNew newRpc)
{
return new ServerCallContext(
newRpc.Method, newRpc.Host, newRpc.Deadline.ToDateTime(),
newRpc.RequestMetadata, CancellationToken.None);
}
}
}

@ -56,10 +56,10 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
public Task WriteStatusAsync(Status status)
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, taskSource.CompletionDelegate);
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
}

@ -43,6 +43,8 @@ namespace Grpc.Core.Internal
const int NanosPerSecond = 1000 * 1000 * 1000;
const int NanosPerTick = 100;
static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
[DllImport("grpc_csharp_ext.dll")]
static extern Timespec gprsharp_now();
@ -52,6 +54,13 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
public Timespec(IntPtr tv_sec, int tv_nsec)
{
this.tv_sec = tv_sec;
this.tv_nsec = tv_nsec;
this.clock_type = GPRClockType.Realtime;
}
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;
@ -76,6 +85,11 @@ namespace Grpc.Core.Internal
return gprsharp_now();
}
}
public DateTime ToDateTime()
{
return UnixEpoch.AddTicks(tv_sec.ToInt64() * (NanosPerSecond / NanosPerTick) + tv_nsec / NanosPerTick);
}
internal static int NativeSize
{

@ -220,6 +220,11 @@ namespace Grpc.Core
return value;
}
}
public override string ToString()
{
return string.Format("[Entry: key={0}, value={1}]", Key, Value);
}
}
}
}

@ -53,6 +53,7 @@ namespace Grpc.Core
public const int PickUnusedPort = 0;
readonly GrpcEnvironment environment;
readonly List<ChannelOption> options;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@ -69,7 +70,8 @@ namespace Grpc.Core
public Server(IEnumerable<ChannelOption> options = null)
{
this.environment = GrpcEnvironment.GetInstance();
using (var channelArgs = ChannelOptions.CreateChannelArgs(options))
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
}
@ -218,16 +220,16 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
private async Task InvokeCallHandler(CallSafeHandle call, string method)
private async Task HandleCallAsync(ServerRpcNew newRpc)
{
try
{
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(method, out callHandler))
if (!callHandlers.TryGetValue(newRpc.Method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
callHandler = NoSuchMethodCallHandler.Instance;
}
await callHandler.HandleCall(method, call, environment);
await callHandler.HandleCall(newRpc, environment);
}
catch (Exception e)
{
@ -240,15 +242,15 @@ namespace Grpc.Core
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
// TODO: handle error
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
if (success)
{
Task.Run(async () => await InvokeCallHandler(call, method));
ServerRpcNew newRpc = ctx.GetServerRpcNew();
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
Task.Run(async () => await HandleCallAsync(newRpc));
}
}
AllowOneRpc();

@ -33,6 +33,7 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core
@ -42,14 +43,93 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): expose method to send initial metadata back to client
// TODO(jtattermusch): add deadline info
private readonly string method;
private readonly string host;
private readonly DateTime deadline;
private readonly Metadata requestHeaders;
private readonly CancellationToken cancellationToken;
private readonly Metadata responseTrailers = new Metadata();
// TODO(jtattermusch): expose initial metadata sent by client for reading
private Status status = Status.DefaultSuccess;
// TODO(jtattermusch): expose method to send initial metadata back to client
public ServerCallContext(string method, string host, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
{
this.method = method;
this.host = host;
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
}
/// <summary> Name of method called in this RPC. </summary>
public string Method
{
get
{
return this.method;
}
}
/// <summary> Name of host called in this RPC. </summary>
public string Host
{
get
{
return this.host;
}
}
/// <summary> Deadline for this RPC. </summary>
public DateTime Deadline
{
get
{
return this.deadline;
}
}
/// <summary> Initial metadata sent by client. </summary>
public Metadata RequestHeaders
{
get
{
return this.requestHeaders;
}
}
// TODO(jtattermusch): support signalling cancellation.
/// <summary> Cancellation token signals when call is cancelled. </summary>
public CancellationToken CancellationToken
{
get
{
return this.cancellationToken;
}
}
/// <summary> Trailers to send back to client after RPC finishes.</summary>
public Metadata ResponseTrailers
{
get
{
return this.responseTrailers;
}
}
/// <summary> Status to send back to client after RPC finishes.</summary>
public Status Status
{
get
{
return this.status;
}
// TODO(jtattermusch): allow setting status and trailing metadata to send after handler completes.
set
{
status = value;
}
}
}
}

@ -42,28 +42,28 @@ namespace Grpc.Core
/// <summary>
/// Server-side handler for unary call.
/// </summary>
public delegate Task<TResponse> UnaryServerMethod<TRequest, TResponse>(ServerCallContext context, TRequest request)
public delegate Task<TResponse> UnaryServerMethod<TRequest, TResponse>(TRequest request, ServerCallContext context)
where TRequest : class
where TResponse : class;
/// <summary>
/// Server-side handler for client streaming call.
/// </summary>
public delegate Task<TResponse> ClientStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, IAsyncStreamReader<TRequest> requestStream)
public delegate Task<TResponse> ClientStreamingServerMethod<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, ServerCallContext context)
where TRequest : class
where TResponse : class;
/// <summary>
/// Server-side handler for server streaming call.
/// </summary>
public delegate Task ServerStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, TRequest request, IServerStreamWriter<TResponse> responseStream)
public delegate Task ServerStreamingServerMethod<TRequest, TResponse>(TRequest request, IServerStreamWriter<TResponse> responseStream, ServerCallContext context)
where TRequest : class
where TResponse : class;
/// <summary>
/// Server-side handler for bidi streaming call.
/// </summary>
public delegate Task DuplexStreamingServerMethod<TRequest, TResponse>(ServerCallContext context, IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream)
public delegate Task DuplexStreamingServerMethod<TRequest, TResponse>(IAsyncStreamReader<TRequest> requestStream, IServerStreamWriter<TResponse> responseStream, ServerCallContext context)
where TRequest : class
where TResponse : class;
}

@ -2,4 +2,4 @@ using System.Reflection;
using System.Runtime.CompilerServices;
// The current version of gRPC C#.
[assembly: AssemblyVersion("0.6.0.*")]
[assembly: AssemblyVersion(Grpc.Core.VersionInfo.CurrentVersion + ".*")]

@ -0,0 +1,13 @@
using System.Reflection;
using System.Runtime.CompilerServices;
namespace Grpc.Core
{
public static class VersionInfo
{
/// <summary>
/// Current version of gRPC
/// </summary>
public const string CurrentVersion = "0.6.0";
}
}

@ -144,7 +144,7 @@ namespace math.Tests
n => Num.CreateBuilder().SetNum_(n).Build());
await call.RequestStream.WriteAll(numbers);
var result = await call.Result;
var result = await call.ResponseAsync;
Assert.AreEqual(60, result.Num_);
}
}).Wait();

@ -46,8 +46,7 @@ namespace math
public static async Task DivAsyncExample(Math.IMathClient client)
{
Task<DivReply> resultTask = client.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
DivReply result = await resultTask;
DivReply result = await client.DivAsync(new DivArgs.Builder { Dividend = 4, Divisor = 5 }.Build());
Console.WriteLine("DivAsync Result: " + result);
}
@ -72,7 +71,7 @@ namespace math
using (var call = client.Sum())
{
await call.RequestStream.WriteAll(numbers);
Console.WriteLine("Sum Result: " + await call.Result);
Console.WriteLine("Sum Result: " + await call.ResponseAsync);
}
}
@ -104,7 +103,7 @@ namespace math
using (var sumCall = client.Sum())
{
await sumCall.RequestStream.WriteAll(numbers);
sum = await sumCall.Result;
sum = await sumCall.ResponseAsync;
}
DivReply result = await client.DivAsync(new DivArgs.Builder { Dividend = sum.Num_, Divisor = numbers.Count }.Build());

@ -45,7 +45,7 @@ namespace math {
public interface IMathClient
{
global::math.DivReply Div(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncDuplexStreamingCall<global::math.DivArgs, global::math.DivReply> DivMany(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncServerStreamingCall<global::math.Num> Fib(global::math.FibArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
AsyncClientStreamingCall<global::math.Num, global::math.Num> Sum(Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken));
@ -54,10 +54,10 @@ namespace math {
// server-side interface
public interface IMath
{
Task<global::math.DivReply> Div(ServerCallContext context, global::math.DivArgs request);
Task DivMany(ServerCallContext context, IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream);
Task Fib(ServerCallContext context, global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream);
Task<global::math.Num> Sum(ServerCallContext context, IAsyncStreamReader<global::math.Num> requestStream);
Task<global::math.DivReply> Div(global::math.DivArgs request, ServerCallContext context);
Task DivMany(IAsyncStreamReader<global::math.DivArgs> requestStream, IServerStreamWriter<global::math.DivReply> responseStream, ServerCallContext context);
Task Fib(global::math.FibArgs request, IServerStreamWriter<global::math.Num> responseStream, ServerCallContext context);
Task<global::math.Num> Sum(IAsyncStreamReader<global::math.Num> requestStream, ServerCallContext context);
}
// client stub
@ -71,7 +71,7 @@ namespace math {
var call = CreateCall(__ServiceName, __Method_Div, headers);
return Calls.BlockingUnaryCall(call, request, cancellationToken);
}
public Task<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
public AsyncUnaryCall<global::math.DivReply> DivAsync(global::math.DivArgs request, Metadata headers = null, CancellationToken cancellationToken = default(CancellationToken))
{
var call = CreateCall(__ServiceName, __Method_Div, headers);
return Calls.AsyncUnaryCall(call, request, cancellationToken);

@ -45,12 +45,12 @@ namespace math
/// </summary>
public class MathServiceImpl : Math.IMath
{
public Task<DivReply> Div(ServerCallContext context, DivArgs request)
public Task<DivReply> Div(DivArgs request, ServerCallContext context)
{
return Task.FromResult(DivInternal(request));
}
public async Task Fib(ServerCallContext context, FibArgs request, IServerStreamWriter<Num> responseStream)
public async Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream, ServerCallContext context)
{
if (request.Limit <= 0)
{
@ -67,7 +67,7 @@ namespace math
}
}
public async Task<Num> Sum(ServerCallContext context, IAsyncStreamReader<Num> requestStream)
public async Task<Num> Sum(IAsyncStreamReader<Num> requestStream, ServerCallContext context)
{
long sum = 0;
await requestStream.ForEach(async num =>
@ -77,7 +77,7 @@ namespace math
return Num.CreateBuilder().SetNum_(sum).Build();
}
public async Task DivMany(ServerCallContext context, IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream)
public async Task DivMany(IAsyncStreamReader<DivArgs> requestStream, IServerStreamWriter<DivReply> responseStream, ServerCallContext context)
{
await requestStream.ForEach(async divArgs =>
{

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save