Merge remote-tracking branch 'upstream/master' into security_handshaker1

reviewable/pr8782/r1
Mark D. Roth 8 years ago
commit c560c5a7c5
  1. 8
      BUILD
  2. 3
      CMakeLists.txt
  3. 4
      Makefile
  4. 1
      binding.gyp
  5. 2
      build.yaml
  6. 1
      config.m4
  7. 3
      doc/PROTOCOL-WEB.md
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      include/grpc++/impl/codegen/completion_queue.h
  11. 3
      include/grpc++/support/channel_arguments.h
  12. 5
      include/grpc/impl/codegen/grpc_types.h
  13. 8
      include/grpc/impl/codegen/port_platform.h
  14. 2
      include/grpc/support/log.h
  15. 2
      include/grpc/support/string_util.h
  16. 2
      package.xml
  17. 2
      src/core/ext/census/census_log.h
  18. 2
      src/core/ext/census/mlog.h
  19. 4
      src/core/ext/lb_policy/grpclb/grpclb.c
  20. 6
      src/core/lib/channel/channel_args.c
  21. 8
      src/core/lib/channel/channel_args.h
  22. 2
      src/core/lib/iomgr/ev_epoll_linux.c
  23. 98
      src/core/lib/iomgr/socket_mutator.c
  24. 80
      src/core/lib/iomgr/socket_mutator.h
  25. 9
      src/core/lib/iomgr/socket_utils_common_posix.c
  26. 5
      src/core/lib/iomgr/socket_utils_posix.h
  27. 1
      src/core/lib/iomgr/tcp_client.h
  28. 16
      src/core/lib/iomgr/tcp_client_posix.c
  29. 19
      src/core/lib/iomgr/tcp_uv.c
  30. 5
      src/core/lib/iomgr/tcp_windows.c
  31. 9
      src/core/lib/security/transport/secure_endpoint.c
  32. 20
      src/cpp/common/channel_arguments.cc
  33. 95
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  34. 41
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  35. 7
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  36. 13
      src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
  37. 5
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  38. 22
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  39. 23
      src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
  40. 2
      src/csharp/Grpc.Core/Profiling/Profilers.cs
  41. 1
      src/python/grpcio/grpc_core_dependencies.py
  42. 4
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  43. 14
      test/core/channel/channel_args_test.c
  44. 67
      test/core/iomgr/socket_utils_test.c
  45. 2
      test/core/profiling/mark_timings.stp
  46. 65
      test/cpp/common/channel_arguments_test.cc
  47. 2
      tools/doxygen/Doxyfile.core.internal
  48. 3
      tools/run_tests/sources_and_headers.json
  49. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj
  50. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  51. 3
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
  52. 6
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
  53. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  54. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -203,6 +203,7 @@ cc_library(
"src/core/lib/iomgr/sockaddr_posix.h",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_posix.h",
"src/core/lib/iomgr/socket_windows.h",
@ -376,6 +377,7 @@ cc_library(
"src/core/lib/iomgr/resolve_address_windows.c",
"src/core/lib/iomgr/resource_quota.c",
"src/core/lib/iomgr/sockaddr_utils.c",
"src/core/lib/iomgr/socket_mutator.c",
"src/core/lib/iomgr/socket_utils_common_posix.c",
"src/core/lib/iomgr/socket_utils_linux.c",
"src/core/lib/iomgr/socket_utils_posix.c",
@ -637,6 +639,7 @@ cc_library(
"src/core/lib/iomgr/sockaddr_posix.h",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_posix.h",
"src/core/lib/iomgr/socket_windows.h",
@ -795,6 +798,7 @@ cc_library(
"src/core/lib/iomgr/resolve_address_windows.c",
"src/core/lib/iomgr/resource_quota.c",
"src/core/lib/iomgr/sockaddr_utils.c",
"src/core/lib/iomgr/socket_mutator.c",
"src/core/lib/iomgr/socket_utils_common_posix.c",
"src/core/lib/iomgr/socket_utils_linux.c",
"src/core/lib/iomgr/socket_utils_posix.c",
@ -1026,6 +1030,7 @@ cc_library(
"src/core/lib/iomgr/sockaddr_posix.h",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_posix.h",
"src/core/lib/iomgr/socket_windows.h",
@ -1176,6 +1181,7 @@ cc_library(
"src/core/lib/iomgr/resolve_address_windows.c",
"src/core/lib/iomgr/resource_quota.c",
"src/core/lib/iomgr/sockaddr_utils.c",
"src/core/lib/iomgr/socket_mutator.c",
"src/core/lib/iomgr/socket_utils_common_posix.c",
"src/core/lib/iomgr/socket_utils_linux.c",
"src/core/lib/iomgr/socket_utils_posix.c",
@ -2037,6 +2043,7 @@ objc_library(
"src/core/lib/iomgr/resolve_address_windows.c",
"src/core/lib/iomgr/resource_quota.c",
"src/core/lib/iomgr/sockaddr_utils.c",
"src/core/lib/iomgr/socket_mutator.c",
"src/core/lib/iomgr/socket_utils_common_posix.c",
"src/core/lib/iomgr/socket_utils_linux.c",
"src/core/lib/iomgr/socket_utils_posix.c",
@ -2277,6 +2284,7 @@ objc_library(
"src/core/lib/iomgr/sockaddr_posix.h",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_posix.h",
"src/core/lib/iomgr/socket_windows.h",

@ -331,6 +331,7 @@ add_library(grpc
src/core/lib/iomgr/resolve_address_windows.c
src/core/lib/iomgr/resource_quota.c
src/core/lib/iomgr/sockaddr_utils.c
src/core/lib/iomgr/socket_mutator.c
src/core/lib/iomgr/socket_utils_common_posix.c
src/core/lib/iomgr/socket_utils_linux.c
src/core/lib/iomgr/socket_utils_posix.c
@ -610,6 +611,7 @@ add_library(grpc_cronet
src/core/lib/iomgr/resolve_address_windows.c
src/core/lib/iomgr/resource_quota.c
src/core/lib/iomgr/sockaddr_utils.c
src/core/lib/iomgr/socket_mutator.c
src/core/lib/iomgr/socket_utils_common_posix.c
src/core/lib/iomgr/socket_utils_linux.c
src/core/lib/iomgr/socket_utils_posix.c
@ -861,6 +863,7 @@ add_library(grpc_unsecure
src/core/lib/iomgr/resolve_address_windows.c
src/core/lib/iomgr/resource_quota.c
src/core/lib/iomgr/sockaddr_utils.c
src/core/lib/iomgr/socket_mutator.c
src/core/lib/iomgr/socket_utils_common_posix.c
src/core/lib/iomgr/socket_utils_linux.c
src/core/lib/iomgr/socket_utils_posix.c

@ -2665,6 +2665,7 @@ LIBGRPC_SRC = \
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \
@ -2962,6 +2963,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \
@ -3250,6 +3252,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \
@ -3467,6 +3470,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \

@ -611,6 +611,7 @@
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/resource_quota.c',
'src/core/lib/iomgr/sockaddr_utils.c',
'src/core/lib/iomgr/socket_mutator.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',

@ -210,6 +210,7 @@ filegroups:
- src/core/lib/iomgr/sockaddr_posix.h
- src/core/lib/iomgr/sockaddr_utils.h
- src/core/lib/iomgr/sockaddr_windows.h
- src/core/lib/iomgr/socket_mutator.h
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/iomgr/socket_utils_posix.h
- src/core/lib/iomgr/socket_windows.h
@ -307,6 +308,7 @@ filegroups:
- src/core/lib/iomgr/resolve_address_windows.c
- src/core/lib/iomgr/resource_quota.c
- src/core/lib/iomgr/sockaddr_utils.c
- src/core/lib/iomgr/socket_mutator.c
- src/core/lib/iomgr/socket_utils_common_posix.c
- src/core/lib/iomgr/socket_utils_linux.c
- src/core/lib/iomgr/socket_utils_posix.c

@ -127,6 +127,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \

@ -60,8 +60,7 @@ HTTP/2 related behavior (specified in [gRPC over HTTP2](http://www.grpc.io/docs/
Message framing (vs. [http2-transport-mapping](http://www.grpc.io/docs/guides/wire.html#http2-transport-mapping))
1. Response status encoded as part of the response body
* Key-value pairs formatted as HTTP/1.1 headers block (without the empty
newline \r\n to terminate the block)
* Key-value pairs encoded in the HTTP/2 [literal header format](https://tools.ietf.org/html/rfc7541#section-6.2) as a single header block.
2. 8th (MSB) bit of the 1st gRPC frame byte
* 0: data
* 1: trailers

@ -292,6 +292,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/sockaddr_posix.h',
'src/core/lib/iomgr/sockaddr_utils.h',
'src/core/lib/iomgr/sockaddr_windows.h',
'src/core/lib/iomgr/socket_mutator.h',
'src/core/lib/iomgr/socket_utils.h',
'src/core/lib/iomgr/socket_utils_posix.h',
'src/core/lib/iomgr/socket_windows.h',
@ -469,6 +470,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/resource_quota.c',
'src/core/lib/iomgr/sockaddr_utils.c',
'src/core/lib/iomgr/socket_mutator.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',
@ -695,6 +697,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/sockaddr_posix.h',
'src/core/lib/iomgr/sockaddr_utils.h',
'src/core/lib/iomgr/sockaddr_windows.h',
'src/core/lib/iomgr/socket_mutator.h',
'src/core/lib/iomgr/socket_utils.h',
'src/core/lib/iomgr/socket_utils_posix.h',
'src/core/lib/iomgr/socket_windows.h',

@ -212,6 +212,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/sockaddr_posix.h )
s.files += %w( src/core/lib/iomgr/sockaddr_utils.h )
s.files += %w( src/core/lib/iomgr/sockaddr_windows.h )
s.files += %w( src/core/lib/iomgr/socket_mutator.h )
s.files += %w( src/core/lib/iomgr/socket_utils.h )
s.files += %w( src/core/lib/iomgr/socket_utils_posix.h )
s.files += %w( src/core/lib/iomgr/socket_windows.h )
@ -389,6 +390,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/resolve_address_windows.c )
s.files += %w( src/core/lib/iomgr/resource_quota.c )
s.files += %w( src/core/lib/iomgr/sockaddr_utils.c )
s.files += %w( src/core/lib/iomgr/socket_mutator.c )
s.files += %w( src/core/lib/iomgr/socket_utils_common_posix.c )
s.files += %w( src/core/lib/iomgr/socket_utils_linux.c )
s.files += %w( src/core/lib/iomgr/socket_utils_posix.c )

@ -240,7 +240,7 @@ class ServerCompletionQueue : public CompletionQueue {
private:
bool is_frequently_polled_;
friend class ServerBuilder;
/// \param is_frequently_polled Informs the GPRC library about whether the
/// \param is_frequently_polled Informs the GRPC library about whether the
/// server completion queue would be actively polled (by calling Next() or
/// AsyncNext()). By default all server completion queues are assumed to be
/// frequently polled.

@ -79,6 +79,9 @@ class ChannelArguments {
/// Set the compression algorithm for the channel.
void SetCompressionAlgorithm(grpc_compression_algorithm algorithm);
/// Set the socket mutator for the channel.
void SetSocketMutator(grpc_socket_mutator* mutator);
/// The given string will be sent at the front of the user agent string.
void SetUserAgentPrefix(const grpc::string& user_agent_prefix);

@ -84,6 +84,9 @@ typedef struct grpc_server grpc_server;
can have messages written to it and read from it. */
typedef struct grpc_call grpc_call;
/** The Socket Mutator interface allows changes on socket options */
typedef struct grpc_socket_mutator grpc_socket_mutator;
/** Type specifier for grpc_arg */
typedef enum {
GRPC_ARG_STRING,
@ -215,6 +218,8 @@ typedef struct {
/** Resolved addresses in a form used by the LB policy.
Not intended for external use. */
#define GRPC_ARG_LB_ADDRESSES "grpc.lb_addresses"
/** The grpc_socket_mutator instance that set the socket options. A pointer. */
#define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -368,14 +368,14 @@ typedef unsigned __int64 uint64_t;
#endif
#endif
#ifndef GPRC_PRINT_FORMAT_CHECK
#ifndef GPR_PRINT_FORMAT_CHECK
#ifdef __GNUC__
#define GPRC_PRINT_FORMAT_CHECK(FORMAT_STR, ARGS) \
#define GPR_PRINT_FORMAT_CHECK(FORMAT_STR, ARGS) \
__attribute__((format(printf, FORMAT_STR, ARGS)))
#else
#define GPRC_PRINT_FORMAT_CHECK(FORMAT_STR, ARGS)
#define GPR_PRINT_FORMAT_CHECK(FORMAT_STR, ARGS)
#endif
#endif /* GPRC_PRINT_FORMAT_CHECK */
#endif /* GPR_PRINT_FORMAT_CHECK */
#if GPR_FORBID_UNREACHABLE_CODE
#define GPR_UNREACHABLE_CODE(STATEMENT)

@ -75,7 +75,7 @@ const char *gpr_log_severity_string(gpr_log_severity severity);
/* Log a message. It's advised to use GPR_xxx above to generate the context
* for each message */
GPRAPI void gpr_log(const char *file, int line, gpr_log_severity severity,
const char *format, ...) GPRC_PRINT_FORMAT_CHECK(4, 5);
const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
GPRAPI void gpr_log_message(const char *file, int line,
gpr_log_severity severity, const char *message);

@ -55,7 +55,7 @@ GPRAPI char *gpr_strdup(const char *src);
On error, returns -1 and sets *strp to NULL. If the format string is bad,
the result is undefined. */
GPRAPI int gpr_asprintf(char **strp, const char *format, ...)
GPRC_PRINT_FORMAT_CHECK(2, 3);
GPR_PRINT_FORMAT_CHECK(2, 3);
#ifdef __cplusplus
}

@ -219,6 +219,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/sockaddr_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/sockaddr_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/sockaddr_windows.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_mutator.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_posix.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_windows.h" role="src" />
@ -396,6 +397,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/resolve_address_windows.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/resource_quota.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/sockaddr_utils.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_mutator.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_common_posix.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_linux.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_posix.c" role="src" />

@ -84,7 +84,7 @@ const void *census_log_read_next(size_t *bytes_available);
*/
size_t census_log_remaining_space(void);
/* Returns the number of times gprc_stats_log_start_write() failed due to
/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int census_log_out_of_space_count(void);

@ -88,7 +88,7 @@ const void* census_log_read_next(size_t* bytes_available);
*/
size_t census_log_remaining_space(void);
/* Returns the number of times gprc_stats_log_start_write() failed due to
/* Returns the number of times grpc_stats_log_start_write() failed due to
out-of-space. */
int64_t census_log_out_of_space_count(void);

@ -1155,8 +1155,8 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg,
}
gpr_mu_unlock(&glb_policy->mu);
} else { /* empty payload: call cancelled. */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
/* dispose of the "lb_on_response_received" weak ref taken in
* query_for_backends_locked() and reused in every reception loop */
gpr_mu_unlock(&glb_policy->mu);
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base,
"lb_on_response_received_empty_payload");

@ -298,6 +298,12 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
}
}
grpc_channel_args *grpc_channel_args_set_socket_mutator(
grpc_channel_args *a, grpc_socket_mutator *mutator) {
grpc_arg tmp = grpc_socket_mutator_to_arg(mutator);
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b) {
int c = GPR_ICMP(a->num_args, b->num_args);

@ -36,6 +36,7 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include "src/core/lib/iomgr/socket_mutator.h"
// Channel args are intentionally immutable, to avoid the need for locking.
@ -100,6 +101,13 @@ uint32_t grpc_channel_args_compression_algorithm_get_states(
int grpc_channel_args_compare(const grpc_channel_args *a,
const grpc_channel_args *b);
/** Returns a channel arg instance with socket mutator added. The socket mutator
* will perform its mutate_fd method on all file descriptors used by the
* channel.
* If \a a is non-MULL, its args are copied. */
grpc_channel_args *grpc_channel_args_set_socket_mutator(
grpc_channel_args *a, grpc_socket_mutator *mutator);
/** Returns the value of argument \a name from \a args, or NULL if not found. */
const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args,
const char *name);

@ -163,7 +163,7 @@ static void fd_global_shutdown(void);
#define PI_ADD_REF(p, r) pi_add_ref((p))
#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
#endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */
#endif /* !defined(GRPC_PI_REF_COUNT_DEBUG) */
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {

@ -0,0 +1,98 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/lib/iomgr/socket_mutator.h"
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
const grpc_socket_mutator_vtable *vtable) {
mutator->vtable = vtable;
gpr_ref_init(&mutator->refcount, 1);
}
grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator) {
gpr_ref(&mutator->refcount);
return mutator;
}
bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd) {
return mutator->vtable->mutate_fd(fd, mutator);
}
int grpc_socket_mutator_compare(grpc_socket_mutator *a,
grpc_socket_mutator *b) {
int c = GPR_ICMP(a, b);
if (c != 0) {
grpc_socket_mutator *sma = a;
grpc_socket_mutator *smb = b;
c = GPR_ICMP(sma->vtable, smb->vtable);
if (c == 0) {
c = sma->vtable->compare(sma, smb);
}
}
return c;
}
void grpc_socket_mutator_unref(grpc_socket_mutator *mutator) {
if (gpr_unref(&mutator->refcount)) {
mutator->vtable->destory(mutator);
}
}
static void *socket_mutator_arg_copy(void *p) {
return grpc_socket_mutator_ref(p);
}
static void socket_mutator_arg_destroy(void *p) {
grpc_socket_mutator_unref(p);
}
static int socket_mutator_cmp(void *a, void *b) {
return grpc_socket_mutator_compare((grpc_socket_mutator *)a,
(grpc_socket_mutator *)b);
}
static const grpc_arg_pointer_vtable socket_mutator_arg_vtable = {
socket_mutator_arg_copy, socket_mutator_arg_destroy, socket_mutator_cmp};
grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_SOCKET_MUTATOR;
arg.value.pointer.vtable = &socket_mutator_arg_vtable;
arg.value.pointer.p = mutator;
return arg;
}

@ -0,0 +1,80 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
#define GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/sync.h>
#ifdef __cplusplus
extern "C" {
#endif
/** The virtual table of grpc_socket_mutator */
typedef struct {
/** Mutates the socket opitons of \a fd */
bool (*mutate_fd)(int fd, grpc_socket_mutator *mutator);
/** Compare socket mutator \a a and \a b */
int (*compare)(grpc_socket_mutator *a, grpc_socket_mutator *b);
/** Destroys the socket mutator instance */
void (*destory)(grpc_socket_mutator *mutator);
} grpc_socket_mutator_vtable;
/** The Socket Mutator interface allows changes on socket options */
struct grpc_socket_mutator {
const grpc_socket_mutator_vtable *vtable;
gpr_refcount refcount;
};
/** called by concrete implementations to initialize the base struct */
void grpc_socket_mutator_init(grpc_socket_mutator *mutator,
const grpc_socket_mutator_vtable *vtable);
/** Wrap \a mutator as a grpc_arg */
grpc_arg grpc_socket_mutator_to_arg(grpc_socket_mutator *mutator);
/** Perform the file descriptor mutation operation of \a mutator on \a fd */
bool grpc_socket_mutator_mutate_fd(grpc_socket_mutator *mutator, int fd);
/** Compare if \a a and \a b are the same mutator or have same settings */
int grpc_socket_mutator_compare(grpc_socket_mutator *a, grpc_socket_mutator *b);
grpc_socket_mutator *grpc_socket_mutator_ref(grpc_socket_mutator *mutator);
void grpc_socket_mutator_unref(grpc_socket_mutator *mutator);
#ifdef __cplusplus
}
#endif
#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_MUTATOR_H */

@ -209,6 +209,15 @@ grpc_error *grpc_set_socket_low_latency(int fd, int low_latency) {
return GRPC_ERROR_NONE;
}
/* set a socket using a grpc_socket_mutator */
grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator) {
GPR_ASSERT(mutator);
if (!grpc_socket_mutator_mutate_fd(mutator, fd)) {
return GRPC_ERROR_CREATE("grpc_socket_mutator failed.");
}
return GRPC_ERROR_NONE;
}
static gpr_once g_probe_ipv6_once = GPR_ONCE_INIT;
static int g_ipv6_loopback_available;

@ -39,7 +39,9 @@
#include <sys/socket.h>
#include <unistd.h>
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/socket_mutator.h"
/* a wrapper for accept or accept4 */
int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock,
@ -88,6 +90,9 @@ grpc_error *grpc_set_socket_sndbuf(int fd, int buffer_size_bytes);
/* Tries to set the socket's receive buffer to given size. */
grpc_error *grpc_set_socket_rcvbuf(int fd, int buffer_size_bytes);
/* Tries to set the socket using a grpc_socket_mutator */
grpc_error *grpc_set_socket_with_mutator(int fd, grpc_socket_mutator *mutator);
/* An enum to keep track of IPv4/IPv6 socket modes.
Currently, this information is only used when a socket is first created, but

@ -34,6 +34,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
#define GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/pollset_set.h"

@ -51,6 +51,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/timer.h"
@ -73,7 +74,8 @@ typedef struct {
grpc_channel_args *channel_args;
} async_connect;
static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd,
const grpc_channel_args *channel_args) {
grpc_error *err = GRPC_ERROR_NONE;
GPR_ASSERT(fd >= 0);
@ -88,6 +90,16 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) {
}
err = grpc_set_socket_no_sigpipe_if_possible(fd);
if (err != GRPC_ERROR_NONE) goto error;
if (channel_args) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) {
GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER);
grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p;
err = grpc_set_socket_with_mutator(fd, mutator);
if (err != GRPC_ERROR_NONE) goto error;
}
}
}
goto done;
error:
@ -287,7 +299,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy));
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) {
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
return;
}

@ -87,10 +87,12 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/*#define GRPC_TCP_REFCOUNT_DEBUG*/
#ifdef GRPC_TCP_REFCOUNT_DEBUG
#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, const char *reason, const char *file,
int line) {
#define TCP_UNREF(exec_ctx, tcp, reason) \
tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) \
tcp_ref((exec_ctx), (tcp), (reason), __FILE__, __LINE__)
static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
const char *reason, const char *file, int line) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
reason, tcp->refcount.count, tcp->refcount.count - 1);
if (gpr_unref(&tcp->refcount)) {
@ -157,7 +159,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
grpc_error_free_string(str);
for (i = 0; i < tcp->read_slices->count; i++) {
char *dump = grpc_dump_slice(tcp->read_slices->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string,
dump);
gpr_free(dump);
@ -234,7 +236,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (j = 0; j < write_slices->count; j++) {
char *data = grpc_dump_slice(write_slices->slices[j],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
@ -323,10 +325,13 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) {
static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; }
static int uv_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {
uv_endpoint_read, uv_endpoint_write, uv_get_workqueue,
uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown,
uv_destroy, uv_get_resource_user, uv_get_peer};
uv_destroy, uv_get_resource_user, uv_get_peer,
uv_get_fd};
grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle,
grpc_resource_quota *resource_quota,

@ -402,6 +402,8 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) {
return tcp->resource_user;
}
static int win_get_fd(grpc_endpoint *ep) { return -1; }
static grpc_endpoint_vtable vtable = {win_read,
win_write,
win_get_workqueue,
@ -410,7 +412,8 @@ static grpc_endpoint_vtable vtable = {win_read,
win_shutdown,
win_destroy,
win_get_resource_user,
win_get_peer};
win_get_peer,
win_get_fd};
grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
grpc_resource_quota *resource_quota,

@ -31,15 +31,20 @@
*
*/
#include "src/core/lib/security/transport/secure_endpoint.h"
/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
using that endpoint. Because of various transitive includes in uv.h,
including windows.h on Windows, uv.h must be included before other system
headers. Therefore, sockaddr.h must always be included first */
#include "src/core/lib/iomgr/sockaddr.h"
#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/tsi_error.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"

@ -39,7 +39,7 @@
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/socket_mutator.h"
namespace grpc {
ChannelArguments::ChannelArguments() {
@ -88,6 +88,24 @@ void ChannelArguments::SetCompressionAlgorithm(
SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
void ChannelArguments::SetSocketMutator(grpc_socket_mutator* mutator) {
if (!mutator) {
return;
}
grpc_arg mutator_arg = grpc_socket_mutator_to_arg(mutator);
bool replaced = false;
for (auto it = args_.begin(); it != args_.end(); ++it) {
if (it->type == mutator_arg.type &&
grpc::string(it->key) == grpc::string(mutator_arg.key)) {
it->value.pointer.vtable->destroy(it->value.pointer.p);
it->value.pointer = mutator_arg.value.pointer;
}
}
if (!replaced) {
args_.push_back(mutator_arg);
}
}
// Note: a second call to this will add in front the result of the first call.
// An example is calling this on a copy of ChannelArguments which already has a
// prefix. The user can build up a prefix string by calling this multiple times,

@ -388,35 +388,29 @@ namespace Grpc.Core.Internal
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
{
var call = CreateNativeCall(cq);
var call = CreateNativeCall(cq);
details.Channel.AddCallReference(this);
InitializeInternal(call);
RegisterCancellationCallback();
}
details.Channel.AddCallReference(this);
InitializeInternal(call);
RegisterCancellationCallback();
}
private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.CreateNativeCall"))
if (injectedNativeCall != null)
{
if (injectedNativeCall != null)
{
return injectedNativeCall; // allows injecting a mock INativeCall in tests.
}
return injectedNativeCall; // allows injecting a mock INativeCall in tests.
}
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
}
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
}
}
@ -456,47 +450,44 @@ namespace Grpc.Core.Internal
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
lock (myLock)
{
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
finished = true;
lock (myLock)
if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
finished = true;
if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible();
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
responseHeadersTcs.SetResult(responseHeaders);
if (delayedStreamingWriteTcs != null)
if (isStreamingWriteCompletionDelayed)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
delayedStreamingWriteTcs = streamingWriteTcs;
streamingWriteTcs = null;
}
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
}
ReleaseResourcesIfPossible();
}
responseHeadersTcs.SetResult(responseHeaders);
unaryResponseTcs.SetResult(msg);
if (delayedStreamingWriteTcs != null)
{
delayedStreamingWriteTcs.SetException(GetRpcExceptionClientOnly());
}
var status = receivedStatus.Status;
if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
}
unaryResponseTcs.SetResult(msg);
}
/// <summary>

@ -181,19 +181,16 @@ namespace Grpc.Core.Internal
/// </summary>
protected bool ReleaseResourcesIfPossible()
{
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.ReleaseResourcesIfPossible"))
if (!disposed && call != null)
{
if (!disposed && call != null)
bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
return true;
}
ReleaseResources();
return true;
}
return false;
}
return false;
}
protected abstract bool IsClient
@ -229,28 +226,20 @@ namespace Grpc.Core.Internal
protected byte[] UnsafeSerialize(TWrite msg)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.UnsafeSerialize"))
{
return serializer(msg);
}
return serializer(msg);
}
protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCallBase.TryDeserialize"))
try
{
try
{
msg = deserializer(payload);
return null;
}
catch (Exception e)
{
msg = default(TRead);
return e;
}
msg = deserializer(payload);
return null;
}
catch (Exception e)
{
msg = default(TRead);
return e;
}
}

@ -76,11 +76,8 @@ namespace Grpc.Core.Internal
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
using (Profilers.ForCurrentThread().NewScope("CallSafeHandle.StartUnary"))
{
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)

@ -65,16 +65,13 @@ namespace Grpc.Core.Internal
public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
if (credentials != null)
{
var result = Native.grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
if (credentials != null)
{
result.SetCredentials(credentials);
}
result.Initialize(cq);
return result;
result.SetCredentials(credentials);
}
result.Initialize(cq);
return result;
}
public ChannelState CheckConnectivityState(bool tryToConnect)

@ -70,10 +70,7 @@ namespace Grpc.Core.Internal
public CompletionQueueEvent Pluck(IntPtr tag)
{
using (Profilers.ForCurrentThread().NewScope("CompletionQueueSafeHandle.Pluck"))
{
return Native.grpcsharp_completion_queue_pluck(this, tag);
}
return Native.grpcsharp_completion_queue_pluck(this, tag);
}
/// <summary>

@ -37,6 +37,7 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
@ -54,6 +55,8 @@ namespace Grpc.Core.Internal
readonly int poolSize;
readonly int completionQueueCount;
readonly List<BasicProfiler> threadProfilers = new List<BasicProfiler>(); // profilers assigned to threadpool threads
bool stopRequested;
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
@ -82,7 +85,8 @@ namespace Grpc.Core.Internal
for (int i = 0; i < poolSize; i++)
{
threads.Add(CreateAndStartThread(i));
var optionalProfiler = i < threadProfilers.Count ? threadProfilers[i] : null;
threads.Add(CreateAndStartThread(i, optionalProfiler));
}
}
}
@ -111,6 +115,11 @@ namespace Grpc.Core.Internal
{
cq.Dispose();
}
for (int i = 0; i < threadProfilers.Count; i++)
{
threadProfilers[i].Dump(string.Format("grpc_trace_thread_{0}.txt", i));
}
});
}
@ -137,12 +146,12 @@ namespace Grpc.Core.Internal
}
}
private Thread CreateAndStartThread(int threadIndex)
private Thread CreateAndStartThread(int threadIndex, IProfiler optionalProfiler)
{
var cqIndex = threadIndex % completionQueues.Count;
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq, optionalProfiler)));
thread.IsBackground = true;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
@ -153,8 +162,13 @@ namespace Grpc.Core.Internal
/// <summary>
/// Body of the polling thread.
/// </summary>
private void RunHandlerLoop(CompletionQueueSafeHandle cq)
private void RunHandlerLoop(CompletionQueueSafeHandle cq, IProfiler optionalProfiler)
{
if (optionalProfiler != null)
{
Profilers.SetForCurrentThread(optionalProfiler);
}
CompletionQueueEvent ev;
do
{

@ -48,22 +48,19 @@ namespace Grpc.Core.Internal
public static MetadataArraySafeHandle Create(Metadata metadata)
{
using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
if (metadata.Count == 0)
{
if (metadata.Count == 0)
{
return new MetadataArraySafeHandle();
}
return new MetadataArraySafeHandle();
}
// TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
{
var valueBytes = metadata[i].GetSerializedValueUnsafe();
Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
// TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
{
var valueBytes = metadata[i].GetSerializedValueUnsafe();
Native.grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
}
/// <summary>

@ -80,7 +80,7 @@ namespace Grpc.Core.Profiling
ProfilerEntry[] entries;
int count;
public BasicProfiler() : this(1024*1024)
public BasicProfiler() : this(20*1024*1024)
{
}

@ -121,6 +121,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/resource_quota.c',
'src/core/lib/iomgr/sockaddr_utils.c',
'src/core/lib/iomgr/socket_mutator.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',

@ -686,7 +686,7 @@ extern gpr_join_host_port_type gpr_join_host_port_import;
typedef int(*gpr_split_host_port_type)(const char *name, char **host, char **port);
extern gpr_split_host_port_type gpr_split_host_port_import;
#define gpr_split_host_port gpr_split_host_port_import
typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(4, 5);
typedef void(*gpr_log_type)(const char *file, int line, gpr_log_severity severity, const char *format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);
extern gpr_log_type gpr_log_import;
#define gpr_log gpr_log_import
typedef void(*gpr_log_message_type)(const char *file, int line, gpr_log_severity severity, const char *message);
@ -707,7 +707,7 @@ extern gpr_format_message_type gpr_format_message_import;
typedef char *(*gpr_strdup_type)(const char *src);
extern gpr_strdup_type gpr_strdup_import;
#define gpr_strdup gpr_strdup_import
typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPRC_PRINT_FORMAT_CHECK(2, 3);
typedef int(*gpr_asprintf_type)(char **strp, const char *format, ...) GPR_PRINT_FORMAT_CHECK(2, 3);
extern gpr_asprintf_type gpr_asprintf_import;
#define gpr_asprintf gpr_asprintf_import
typedef const char *(*gpr_subprocess_binary_extension_type)();

@ -134,12 +134,26 @@ static void test_compression_algorithm_states(void) {
grpc_channel_args_destroy(ch_args);
}
static void test_set_socket_mutator(void) {
grpc_channel_args *ch_args;
grpc_socket_mutator mutator;
grpc_socket_mutator_init(&mutator, NULL);
ch_args = grpc_channel_args_set_socket_mutator(NULL, &mutator);
GPR_ASSERT(ch_args->num_args == 1);
GPR_ASSERT(strcmp(ch_args->args[0].key, GRPC_ARG_SOCKET_MUTATOR) == 0);
GPR_ASSERT(ch_args->args[0].type == GRPC_ARG_POINTER);
grpc_channel_args_destroy(ch_args);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
test_create();
test_set_compression_algorithm();
test_compression_algorithm_states();
test_set_socket_mutator();
grpc_shutdown();
return 0;
}

@ -39,13 +39,57 @@
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include <errno.h>
#include <netinet/ip.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/socket_mutator.h"
#include "test/core/util/test_config.h"
struct test_socket_mutator {
grpc_socket_mutator base;
int option_value;
};
static bool mutate_fd(int fd, grpc_socket_mutator *mutator) {
int newval;
socklen_t intlen = sizeof(newval);
struct test_socket_mutator *m = (struct test_socket_mutator *)mutator;
if (0 != setsockopt(fd, IPPROTO_IP, IP_TOS, &m->option_value,
sizeof(m->option_value))) {
return false;
}
if (0 != getsockopt(fd, IPPROTO_IP, IP_TOS, &newval, &intlen)) {
return false;
}
if (newval != m->option_value) {
return false;
}
return true;
}
static void destroy_test_mutator(grpc_socket_mutator *mutator) {
struct test_socket_mutator *m = (struct test_socket_mutator *)mutator;
gpr_free(m);
}
static int compare_test_mutator(grpc_socket_mutator *a,
grpc_socket_mutator *b) {
struct test_socket_mutator *ma = (struct test_socket_mutator *)a;
struct test_socket_mutator *mb = (struct test_socket_mutator *)b;
return GPR_ICMP(ma->option_value, mb->option_value);
}
static const grpc_socket_mutator_vtable mutator_vtable = {
mutate_fd, compare_test_mutator, destroy_test_mutator};
int main(int argc, char **argv) {
int sock;
grpc_error *err;
grpc_test_init(argc, argv);
sock = socket(PF_INET, SOCK_STREAM, 0);
@ -68,6 +112,29 @@ int main(int argc, char **argv) {
GPR_ASSERT(GRPC_LOG_IF_ERROR("set_socket_low_latency",
grpc_set_socket_low_latency(sock, 0)));
struct test_socket_mutator mutator;
grpc_socket_mutator_init(&mutator.base, &mutator_vtable);
mutator.option_value = IPTOS_LOWDELAY;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"set_socket_with_mutator",
grpc_set_socket_with_mutator(sock, (grpc_socket_mutator *)&mutator)));
mutator.option_value = IPTOS_THROUGHPUT;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"set_socket_with_mutator",
grpc_set_socket_with_mutator(sock, (grpc_socket_mutator *)&mutator)));
mutator.option_value = IPTOS_RELIABILITY;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"set_socket_with_mutator",
grpc_set_socket_with_mutator(sock, (grpc_socket_mutator *)&mutator)));
mutator.option_value = -1;
err = grpc_set_socket_with_mutator(sock, (grpc_socket_mutator *)&mutator);
GPR_ASSERT(err != GRPC_ERROR_NONE);
GRPC_ERROR_UNREF(err);
close(sock);
return 0;

@ -2,7 +2,7 @@
* probe definition.
*
* For a statically build binary, that'd be the name of the binary itself.
* For dinamically built ones, point to the location of the libgprc.so being
* For dynamically built ones, point to the location of the libgrpc.so being
* used. */
global starts, times, times_per_tag

@ -35,11 +35,56 @@
#include <grpc++/grpc++.h>
#include <grpc/grpc.h>
#include <grpc/support/useful.h>
#include <gtest/gtest.h>
#include "src/core/lib/iomgr/socket_mutator.h"
namespace grpc {
namespace testing {
namespace {
// A simple grpc_socket_mutator to be used to test SetSocketMutator
class TestSocketMutator : public grpc_socket_mutator {
public:
TestSocketMutator();
bool MutateFd(int fd) {
// Do nothing on the fd
return true;
}
};
//
// C API for TestSocketMutator
//
bool test_mutator_mutate_fd(int fd, grpc_socket_mutator* mutator) {
TestSocketMutator* tsm = (TestSocketMutator*)mutator;
return tsm->MutateFd(fd);
}
int test_mutator_compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
return GPR_ICMP(a, b);
}
void test_mutator_destroy(grpc_socket_mutator* mutator) {
TestSocketMutator* tsm = (TestSocketMutator*)mutator;
delete tsm;
}
grpc_socket_mutator_vtable test_mutator_vtable = {
test_mutator_mutate_fd, test_mutator_compare, test_mutator_destroy};
//
// TestSocketMutator implementation
//
TestSocketMutator::TestSocketMutator() {
grpc_socket_mutator_init(this, &test_mutator_vtable);
}
}
class ChannelArgumentsTest : public ::testing::Test {
protected:
ChannelArgumentsTest()
@ -166,6 +211,26 @@ TEST_F(ChannelArgumentsTest, SetPointer) {
EXPECT_TRUE(HasArg(arg0));
}
TEST_F(ChannelArgumentsTest, SetSocketMutator) {
VerifyDefaultChannelArgs();
grpc_arg arg0, arg1;
TestSocketMutator* mutator0 = new TestSocketMutator();
TestSocketMutator* mutator1 = new TestSocketMutator();
arg0 = grpc_socket_mutator_to_arg(mutator0);
arg1 = grpc_socket_mutator_to_arg(mutator1);
channel_args_.SetSocketMutator(mutator0);
EXPECT_TRUE(HasArg(arg0));
channel_args_.SetSocketMutator(mutator1);
EXPECT_TRUE(HasArg(arg1));
// arg0 is replaced by arg1
EXPECT_FALSE(HasArg(arg0));
// arg0 is destroyed by grpc_socket_mutator_to_arg(mutator1)
arg1.value.pointer.vtable->destroy(arg1.value.pointer.p);
}
TEST_F(ChannelArgumentsTest, SetUserAgentPrefix) {
VerifyDefaultChannelArgs();
grpc::string prefix("prefix");

@ -835,6 +835,7 @@ src/core/lib/iomgr/sockaddr.h \
src/core/lib/iomgr/sockaddr_posix.h \
src/core/lib/iomgr/sockaddr_utils.h \
src/core/lib/iomgr/sockaddr_windows.h \
src/core/lib/iomgr/socket_mutator.h \
src/core/lib/iomgr/socket_utils.h \
src/core/lib/iomgr/socket_utils_posix.h \
src/core/lib/iomgr/socket_windows.h \
@ -1012,6 +1013,7 @@ src/core/lib/iomgr/resolve_address_uv.c \
src/core/lib/iomgr/resolve_address_windows.c \
src/core/lib/iomgr/resource_quota.c \
src/core/lib/iomgr/sockaddr_utils.c \
src/core/lib/iomgr/socket_mutator.c \
src/core/lib/iomgr/socket_utils_common_posix.c \
src/core/lib/iomgr/socket_utils_linux.c \
src/core/lib/iomgr/socket_utils_posix.c \

@ -6695,6 +6695,7 @@
"src/core/lib/iomgr/sockaddr_posix.h",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_posix.h",
"src/core/lib/iomgr/socket_windows.h",
@ -6852,6 +6853,8 @@
"src/core/lib/iomgr/sockaddr_utils.c",
"src/core/lib/iomgr/sockaddr_utils.h",
"src/core/lib/iomgr/sockaddr_windows.h",
"src/core/lib/iomgr/socket_mutator.c",
"src/core/lib/iomgr/socket_mutator.h",
"src/core/lib/iomgr/socket_utils.h",
"src/core/lib/iomgr/socket_utils_common_posix.c",
"src/core/lib/iomgr/socket_utils_linux.c",

@ -344,6 +344,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_windows.h" />
@ -570,6 +571,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_linux.c">

@ -142,6 +142,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -869,6 +872,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -237,6 +237,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_windows.h" />
@ -421,6 +422,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_linux.c">

@ -199,6 +199,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -665,6 +668,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

@ -334,6 +334,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_posix.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_windows.h" />
@ -538,6 +539,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_linux.c">

@ -145,6 +145,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_utils.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils_common_posix.c">
<Filter>src\core\lib\iomgr</Filter>
</ClCompile>
@ -782,6 +785,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\sockaddr_windows.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_mutator.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\socket_utils.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>

Loading…
Cancel
Save