Merge branch 'direct-calls' into buffer_pools_for_realsies

reviewable/pr8239/r2
Craig Tiller 8 years ago
commit d88c461217
  1. 12
      BUILD
  2. 5
      CMakeLists.txt
  3. 6
      Makefile
  4. 1
      binding.gyp
  5. 2
      build.yaml
  6. 1
      config.m4
  7. 2
      doc/core/pending_api_cleanups.md
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      include/grpc++/ext/reflection.grpc.pb.h
  11. 2
      include/grpc++/ext/reflection.pb.h
  12. 6
      include/grpc/impl/codegen/grpc_types.h
  13. 2
      package.xml
  14. 65
      src/core/ext/client_config/client_channel.c
  15. 10
      src/core/ext/client_config/lb_policy.c
  16. 10
      src/core/ext/client_config/lb_policy.h
  17. 9
      src/core/ext/client_config/subchannel.c
  18. 3
      src/core/ext/client_config/subchannel.h
  19. 49
      src/core/ext/client_config/subchannel_factory.c
  20. 66
      src/core/ext/client_config/subchannel_factory.h
  21. 20
      src/core/ext/lb_policy/grpclb/grpclb.c
  22. 20
      src/core/ext/lb_policy/pick_first/pick_first.c
  23. 20
      src/core/ext/lb_policy/round_robin/round_robin.c
  24. 32
      src/core/lib/channel/channel_stack.c
  25. 18
      src/core/lib/channel/channel_stack.h
  26. 302
      src/core/lib/channel/deadline_filter.c
  27. 79
      src/core/lib/channel/deadline_filter.h
  28. 4
      src/core/lib/channel/http_client_filter.c
  29. 46
      src/core/lib/channel/message_size_filter.c
  30. 58
      src/core/lib/iomgr/error.c
  31. 8
      src/core/lib/iomgr/error.h
  32. 98
      src/core/lib/iomgr/tcp_client_posix.c
  33. 143
      src/core/lib/surface/call.c
  34. 2
      src/core/lib/surface/completion_queue.h
  35. 7
      src/core/lib/surface/init.c
  36. 2
      src/core/lib/transport/transport.c
  37. 2
      src/cpp/ext/reflection.grpc.pb.cc
  38. 2
      src/cpp/ext/reflection.pb.cc
  39. 1
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  40. 3
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  41. 1
      src/python/grpcio/grpc_core_dependencies.py
  42. 6
      test/core/channel/channel_stack_test.c
  43. 3
      test/core/nanopb/fuzzer_response.c
  44. 3
      test/core/nanopb/fuzzer_serverlist.c
  45. 32
      tools/codegen/extensions/gen_reflection_proto.sh
  46. 2
      tools/doxygen/Doxyfile.c++.internal
  47. 2
      tools/doxygen/Doxyfile.core.internal
  48. 3
      tools/run_tests/sources_and_headers.json
  49. 3
      vsprojects/vcxproj/grpc++/grpc++.vcxproj
  50. 6
      vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters
  51. 3
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj
  52. 6
      vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters
  53. 3
      vsprojects/vcxproj/grpc/grpc.vcxproj
  54. 6
      vsprojects/vcxproj/grpc/grpc.vcxproj.filters
  55. 3
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
  56. 6
      vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
  57. 3
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
  58. 6
      vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters

12
BUILD

@ -168,6 +168,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -328,6 +329,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -568,6 +570,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -713,6 +716,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -923,6 +927,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -1060,6 +1065,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -1275,6 +1281,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -1391,6 +1398,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -1686,6 +1694,7 @@ cc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -1797,6 +1806,7 @@ cc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -2186,6 +2196,7 @@ objc_library(
"src/core/lib/channel/channel_stack_builder.c",
"src/core/lib/channel/compress_filter.c",
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/http_client_filter.c",
"src/core/lib/channel/http_server_filter.c",
@ -2405,6 +2416,7 @@ objc_library(
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",

@ -295,6 +295,7 @@ add_library(grpc
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@ -553,6 +554,7 @@ add_library(grpc_cronet
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@ -783,6 +785,7 @@ add_library(grpc_unsecure
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@ -1041,6 +1044,7 @@ add_library(grpc++
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c
@ -1396,6 +1400,7 @@ add_library(grpc++_unsecure
src/core/lib/channel/channel_stack_builder.c
src/core/lib/channel/compress_filter.c
src/core/lib/channel/connected_channel.c
src/core/lib/channel/deadline_filter.c
src/core/lib/channel/handshaker.c
src/core/lib/channel/http_client_filter.c
src/core/lib/channel/http_server_filter.c

@ -2536,6 +2536,7 @@ LIBGRPC_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@ -2812,6 +2813,7 @@ LIBGRPC_CRONET_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@ -3077,6 +3079,7 @@ LIBGRPC_TEST_UTIL_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@ -3269,6 +3272,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@ -3610,6 +3614,7 @@ LIBGRPC++_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \
@ -4240,6 +4245,7 @@ LIBGRPC++_UNSECURE_SRC = \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \

@ -570,6 +570,7 @@
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',

@ -172,6 +172,7 @@ filegroups:
- src/core/lib/channel/compress_filter.h
- src/core/lib/channel/connected_channel.h
- src/core/lib/channel/context.h
- src/core/lib/channel/deadline_filter.h
- src/core/lib/channel/handshaker.h
- src/core/lib/channel/http_client_filter.h
- src/core/lib/channel/http_server_filter.h
@ -255,6 +256,7 @@ filegroups:
- src/core/lib/channel/channel_stack_builder.c
- src/core/lib/channel/compress_filter.c
- src/core/lib/channel/connected_channel.c
- src/core/lib/channel/deadline_filter.c
- src/core/lib/channel/handshaker.c
- src/core/lib/channel/http_client_filter.c
- src/core/lib/channel/http_server_filter.c

@ -89,6 +89,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \

@ -13,3 +13,5 @@ number:
- remove `GRPC_ARG_MAX_MESSAGE_LENGTH` channel arg from
`include/grpc/impl/codegen/grpc_types.h` (commit `af00d8b`)
- remove `ServerBuilder::SetMaxMessageSize()` method from
`include/grpc++/server_builder.h` (commit `6980362`)

@ -255,6 +255,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/compress_filter.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/deadline_filter.h',
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',
@ -419,6 +420,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',
@ -627,6 +629,7 @@ Pod::Spec.new do |s|
'src/core/lib/channel/compress_filter.h',
'src/core/lib/channel/connected_channel.h',
'src/core/lib/channel/context.h',
'src/core/lib/channel/deadline_filter.h',
'src/core/lib/channel/handshaker.h',
'src/core/lib/channel/http_client_filter.h',
'src/core/lib/channel/http_server_filter.h',

@ -175,6 +175,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/compress_filter.h )
s.files += %w( src/core/lib/channel/connected_channel.h )
s.files += %w( src/core/lib/channel/context.h )
s.files += %w( src/core/lib/channel/deadline_filter.h )
s.files += %w( src/core/lib/channel/handshaker.h )
s.files += %w( src/core/lib/channel/http_client_filter.h )
s.files += %w( src/core/lib/channel/http_server_filter.h )
@ -339,6 +340,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/channel/channel_stack_builder.c )
s.files += %w( src/core/lib/channel/compress_filter.c )
s.files += %w( src/core/lib/channel/connected_channel.c )
s.files += %w( src/core/lib/channel/deadline_filter.c )
s.files += %w( src/core/lib/channel/handshaker.c )
s.files += %w( src/core/lib/channel/http_client_filter.c )
s.files += %w( src/core/lib/channel/http_server_filter.c )

@ -32,7 +32,7 @@
*/
// Generated by the gRPC protobuf plugin.
// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// If you make any local change, they will be lost.
// source: reflection.proto
// Original file comments:

@ -32,7 +32,7 @@
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// source: reflection.proto
#ifndef PROTOBUF_reflection_2eproto__INCLUDED

@ -148,11 +148,13 @@ typedef struct {
/** Maximum number of concurrent incoming streams to allow on a http2
connection. Int valued. */
#define GRPC_ARG_MAX_CONCURRENT_STREAMS "grpc.max_concurrent_streams"
/** Maximum message length that the channel can receive. Int valued, bytes. */
/** Maximum message length that the channel can receive. Int valued, bytes.
-1 means unlimited. */
#define GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH "grpc.max_receive_message_length"
/** \deprecated For backward compatibility. */
#define GRPC_ARG_MAX_MESSAGE_LENGTH GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
/** Maximum message length that the channel can send. Int valued, bytes. */
/** Maximum message length that the channel can send. Int valued, bytes.
-1 means unlimited. */
#define GRPC_ARG_MAX_SEND_MESSAGE_LENGTH "grpc.max_send_message_length"
/** Initial sequence number for http2 transports. Int valued. */
#define GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER \

@ -182,6 +182,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/compress_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/context.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/handshaker.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.h" role="src" />
@ -346,6 +347,7 @@
<file baseinstalldir="/" name="src/core/lib/channel/channel_stack_builder.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/compress_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/connected_channel.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/deadline_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/handshaker.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_client_filter.c" role="src" />
<file baseinstalldir="/" name="src/core/lib/channel/http_server_filter.c" role="src" />

@ -46,6 +46,7 @@
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/profiling/timers.h"
@ -114,7 +115,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx,
grpc_lb_policy_cancel_picks(
exec_ctx, chand->lb_policy,
/* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY,
/* check= */ 0);
/* check= */ 0, GRPC_ERROR_REF(error));
}
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error,
reason);
@ -391,6 +392,17 @@ typedef enum {
for initial metadata before trying to create a call object,
and handling cancellation gracefully. */
typedef struct client_channel_call_data {
// State for handling deadlines.
// The code in deadline_filter.c requires this to be the first field.
// TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
// and this struct both independently store a pointer to the call
// stack and each has its own mutex. If/when we have time, find a way
// to avoid this without breaking the grpc_deadline_state abstraction.
grpc_deadline_state deadline_state;
gpr_timespec deadline;
grpc_error *cancel_error;
/** either 0 for no call, 1 for cancelled, or a pointer to a
grpc_subchannel_call */
gpr_atm subchannel_call;
@ -485,7 +497,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
gpr_atm_no_barrier_store(&calld->subchannel_call, 1);
fail_locked(exec_ctx, calld, GRPC_ERROR_CREATE_REFERENCING(
"Failed to create subchannel", &error, 1));
} else if (1 == gpr_atm_acq_load(&calld->subchannel_call)) {
} else if (GET_CALL(calld) == CANCELLED_CALL) {
/* already cancelled before subchannel became ready */
fail_locked(exec_ctx, calld,
GRPC_ERROR_CREATE_REFERENCING(
@ -493,7 +505,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
} else {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *new_error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent,
exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (new_error != GRPC_ERROR_NONE) {
new_error = grpc_error_add_child(new_error, error);
@ -531,7 +543,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
grpc_closure *on_ready, grpc_error *error);
static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
@ -542,7 +554,8 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg,
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL);
} else if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->initial_metadata_flags,
cpa->connected_subchannel, cpa->on_ready)) {
cpa->connected_subchannel, cpa->on_ready,
GRPC_ERROR_NONE)) {
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL);
}
gpr_free(cpa);
@ -552,7 +565,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
grpc_closure *on_ready, grpc_error *error) {
GPR_TIMER_BEGIN("pick_subchannel", 0);
channel_data *chand = elem->channel_data;
@ -566,21 +579,24 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
if (initial_metadata == NULL) {
if (chand->lb_policy != NULL) {
grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy,
connected_subchannel);
connected_subchannel, GRPC_ERROR_REF(error));
}
for (closure = chand->waiting_for_config_closures.head; closure != NULL;
closure = closure->next_data.next) {
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
grpc_exec_ctx_sched(exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE("Pick cancelled"), NULL);
grpc_exec_ctx_sched(
exec_ctx, cpa->on_ready,
GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
}
}
gpr_mu_unlock(&chand->mu);
GPR_TIMER_END("pick_subchannel", 0);
GRPC_ERROR_UNREF(error);
return true;
}
GPR_ASSERT(error == GRPC_ERROR_NONE);
if (chand->lb_policy != NULL) {
grpc_lb_policy *lb_policy = chand->lb_policy;
int r;
@ -631,12 +647,13 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
/* try to (atomically) get the call */
grpc_subchannel_call *call = GET_CALL(calld);
GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
if (call == CANCELLED_CALL) {
grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
GRPC_ERROR_CANCELLED);
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@ -652,8 +669,8 @@ retry:
call = GET_CALL(calld);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&calld->mu);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
GRPC_ERROR_CANCELLED);
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@ -669,18 +686,24 @@ retry:
(gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
// Stash a copy of cancel_error in our call data, so that we can use
// it for subsequent operations. This ensures that if the call is
// cancelled before any ops are passed down (e.g., if the deadline
// is in the past when the call starts), we can return the right
// error to the caller when the first op does get passed down.
calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
switch (calld->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel,
NULL);
NULL, GRPC_ERROR_REF(op->cancel_error));
break;
}
gpr_mu_unlock(&calld->mu);
grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
GRPC_ERROR_CANCELLED);
grpc_transport_stream_op_finish_with_failure(
exec_ctx, op, GRPC_ERROR_REF(op->cancel_error));
GPR_TIMER_END("cc_start_transport_stream_op", 0);
return;
}
@ -694,7 +717,8 @@ retry:
GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel");
if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata,
op->send_initial_metadata_flags,
&calld->connected_subchannel, &calld->next_step)) {
&calld->connected_subchannel, &calld->next_step,
GRPC_ERROR_NONE)) {
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel");
}
@ -704,7 +728,7 @@ retry:
calld->connected_subchannel != NULL) {
grpc_subchannel_call *subchannel_call = NULL;
grpc_error *error = grpc_connected_subchannel_create_call(
exec_ctx, calld->connected_subchannel, calld->pollent,
exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
&subchannel_call);
if (error != GRPC_ERROR_NONE) {
subchannel_call = CANCELLED_CALL;
@ -727,6 +751,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
grpc_deadline_state_init(exec_ctx, elem, args);
calld->deadline = args->deadline;
calld->cancel_error = GRPC_ERROR_NONE;
gpr_atm_rel_store(&calld->subchannel_call, 0);
gpr_mu_init(&calld->mu);
calld->connected_subchannel = NULL;
@ -745,6 +772,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_final_info *final_info,
void *and_free_memory) {
call_data *calld = elem->call_data;
grpc_deadline_state_destroy(exec_ctx, elem);
GRPC_ERROR_UNREF(calld->cancel_error);
grpc_subchannel_call *call = GET_CALL(calld);
if (call != NULL && call != CANCELLED_CALL) {
GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");

@ -108,16 +108,18 @@ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target) {
policy->vtable->cancel_pick(exec_ctx, policy, target);
grpc_connected_subchannel **target,
grpc_error *error) {
policy->vtable->cancel_pick(exec_ctx, policy, target, error);
}
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask,
initial_metadata_flags_eq);
initial_metadata_flags_eq, error);
}
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) {

@ -77,12 +77,12 @@ struct grpc_lb_policy_vtable {
/** \see grpc_lb_policy_cancel_pick */
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
grpc_connected_subchannel **target, grpc_error *error);
/** \see grpc_lb_policy_cancel_picks */
void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
uint32_t initial_metadata_flags_eq, grpc_error *error);
/** \see grpc_lb_policy_ping_one */
void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@ -161,7 +161,8 @@ void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
The \a on_complete callback of the pending picks will be invoked with \a
*target set to NULL. */
void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
grpc_connected_subchannel **target);
grpc_connected_subchannel **target,
grpc_error *error);
/** Cancel all pending picks for which their \a initial_metadata_flags (as given
in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq
@ -169,7 +170,8 @@ void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx,
grpc_lb_policy *policy,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq);
uint32_t initial_metadata_flags_eq,
grpc_error *error);
/** Try to enter a READY connectivity state */
void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy);

@ -629,7 +629,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
c->have_alarm = 1;
grpc_connectivity_state_set(
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
"connect_failed");
gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, now);
const char *errmsg = grpc_error_string(error);
@ -698,14 +700,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
grpc_polling_entity *pollent, grpc_subchannel_call **call) {
grpc_polling_entity *pollent, gpr_timespec deadline,
grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
*call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
(*call)->connection = con; // Ref is added below.
grpc_error *error =
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
NULL, NULL, callstk);
NULL, NULL, deadline, callstk);
if (error != GRPC_ERROR_NONE) {
const char *error_string = grpc_error_string(error);
gpr_log(GPR_ERROR, "error: %s", error_string);

@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
grpc_polling_entity *pollent, gpr_timespec deadline,
grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(

@ -1,49 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/ext/client_config/subchannel_factory.h"
void grpc_subchannel_factory_ref(grpc_subchannel_factory* factory) {
factory->vtable->ref(factory);
}
void grpc_subchannel_factory_unref(grpc_exec_ctx* exec_ctx,
grpc_subchannel_factory* factory) {
factory->vtable->unref(exec_ctx, factory);
}
grpc_subchannel* grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx* exec_ctx, grpc_subchannel_factory* factory,
grpc_subchannel_args* args) {
return factory->vtable->create_subchannel(exec_ctx, factory, args);
}

@ -1,66 +0,0 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H
#include "src/core/ext/client_config/subchannel.h"
#include "src/core/lib/channel/channel_stack.h"
typedef struct grpc_subchannel_factory grpc_subchannel_factory;
typedef struct grpc_subchannel_factory_vtable grpc_subchannel_factory_vtable;
/** Constructor for new configured channels.
Creating decorators around this type is encouraged to adapt behavior. */
struct grpc_subchannel_factory {
const grpc_subchannel_factory_vtable *vtable;
};
struct grpc_subchannel_factory_vtable {
void (*ref)(grpc_subchannel_factory *factory);
void (*unref)(grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory);
grpc_subchannel *(*create_subchannel)(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *factory,
grpc_subchannel_args *args);
};
void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
void grpc_subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_factory *factory);
/** Create a new grpc_subchannel */
grpc_subchannel *grpc_subchannel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *factory,
grpc_subchannel_args *args);
#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */

@ -688,7 +688,8 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
pending_pick *pp = glb_policy->pending_picks;
@ -699,8 +700,10 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@ -708,12 +711,14 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client);
static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
glb_lb_policy *glb_policy = (glb_lb_policy *)pol;
gpr_mu_lock(&glb_policy->mu);
if (glb_policy->lb_client != NULL) {
@ -728,8 +733,10 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(
exec_ctx, pp->pollent, glb_policy->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CANCELLED, NULL);
grpc_exec_ctx_sched(
exec_ctx, &pp->wrapped_on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = glb_policy->pending_picks;
glb_policy->pending_picks = pp;
@ -737,6 +744,7 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&glb_policy->mu);
GRPC_ERROR_UNREF(error);
}
static void query_for_backends(grpc_exec_ctx *exec_ctx,

@ -128,7 +128,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -140,8 +141,9 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -150,11 +152,13 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -166,8 +170,9 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
initial_metadata_flags_eq) {
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
grpc_exec_ctx_sched(exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE("Pick Cancelled"), NULL);
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick Cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -176,6 +181,7 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
@ -466,6 +472,8 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
}
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
* the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);

@ -308,7 +308,8 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel **target) {
grpc_connected_subchannel **target,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -320,8 +321,9 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
NULL);
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -330,11 +332,13 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
uint32_t initial_metadata_flags_mask,
uint32_t initial_metadata_flags_eq) {
uint32_t initial_metadata_flags_eq,
grpc_error *error) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@ -347,8 +351,9 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
p->base.interested_parties);
*pp->target = NULL;
grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED,
NULL);
grpc_exec_ctx_sched(
exec_ctx, pp->on_complete,
GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1), NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@ -357,6 +362,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pp = next;
}
gpr_mu_unlock(&p->mu);
GRPC_ERROR_UNREF(error);
}
static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
@ -629,6 +635,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx,
if (args->addresses->addresses[i].is_balancer) continue;
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
/* server_name will be copied as part of the subchannel creation. This makes
* the copying of args->server_name (a borrowed pointer) OK. */
sc_args.server_name = args->server_name;
sc_args.addr =
(struct sockaddr *)(&args->addresses->addresses[i].address.addr);

@ -158,13 +158,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack) {
grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
gpr_timespec deadline, grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@ -185,6 +183,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
args.context = context;
args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
@ -276,16 +275,16 @@ static void destroy_op(grpc_exec_ctx *exec_ctx, void *op, grpc_error *error) {
}
void grpc_call_element_send_cancel(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem) {
grpc_call_element *elem) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->cancel_error = GRPC_ERROR_CANCELLED;
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_call_next_op(exec_ctx, cur_elem, op);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
@ -293,5 +292,16 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_transport_stream_op_add_cancellation_with_message(op, status,
optional_message);
grpc_call_next_op(exec_ctx, cur_elem, op);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}
void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_status_code status,
gpr_slice *optional_message) {
grpc_transport_stream_op *op = gpr_malloc(sizeof(*op));
memset(op, 0, sizeof(*op));
op->on_complete = grpc_closure_create(destroy_op, op);
grpc_transport_stream_op_add_close(op, status, optional_message);
elem->filter->start_transport_stream_op(exec_ctx, elem, op);
}

@ -74,6 +74,7 @@ typedef struct {
grpc_call_stack *call_stack;
const void *server_transport_data;
grpc_call_context_element *context;
gpr_timespec deadline;
} grpc_call_element_args;
typedef struct {
@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy,
void *destroy_arg,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
grpc_error *grpc_call_stack_init(
grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
grpc_call_context_element *context, const void *transport_server_data,
gpr_timespec deadline, grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
@ -290,6 +289,11 @@ void grpc_call_element_send_cancel_with_message(grpc_exec_ctx *exec_ctx,
grpc_status_code status,
gpr_slice *optional_message);
void grpc_call_element_send_close_with_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *cur_elem,
grpc_status_code status,
gpr_slice *optional_message);
extern int grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \

@ -0,0 +1,302 @@
//
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
#include "src/core/lib/channel/deadline_filter.h"
#include <stdbool.h>
#include <string.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
//
// grpc_deadline_state
//
// Timer callback.
static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = arg;
grpc_deadline_state* deadline_state = elem->call_data;
gpr_mu_lock(&deadline_state->timer_mu);
deadline_state->timer_pending = false;
gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) {
gpr_slice msg = gpr_slice_from_static_string("Deadline Exceeded");
grpc_call_element_send_cancel_with_message(
exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &msg);
gpr_slice_unref(msg);
}
GRPC_CALL_STACK_UNREF(exec_ctx, deadline_state->call_stack, "deadline_timer");
}
// Starts the deadline timer.
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// Take a reference to the call stack, to be owned by the timer.
GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
gpr_mu_lock(&deadline_state->timer_mu);
deadline_state->timer_pending = true;
grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback,
elem, gpr_now(GPR_CLOCK_MONOTONIC));
gpr_mu_unlock(&deadline_state->timer_mu);
}
}
// Cancels the deadline timer.
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
gpr_mu_lock(&deadline_state->timer_mu);
if (deadline_state->timer_pending) {
grpc_timer_cancel(exec_ctx, &deadline_state->timer);
deadline_state->timer_pending = false;
}
gpr_mu_unlock(&deadline_state->timer_mu);
}
// Callback run when the call is complete.
static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
deadline_state->next_on_complete->cb(
exec_ctx, deadline_state->next_on_complete->cb_arg, error);
}
// Inject our own on_complete callback into op.
static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
grpc_transport_stream_op* op) {
deadline_state->next_on_complete = op->on_complete;
grpc_closure_init(&deadline_state->on_complete, on_complete, deadline_state);
op->on_complete = &deadline_state->on_complete;
}
// Callback and associated state for starting the timer after call stack
// initialization has been completed.
struct start_timer_after_init_state {
grpc_call_element* elem;
gpr_timespec deadline;
grpc_closure closure;
};
static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
struct start_timer_after_init_state* state = arg;
start_timer_if_needed(exec_ctx, state->elem, state->deadline);
gpr_free(state);
}
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_element_args* args) {
grpc_deadline_state* deadline_state = elem->call_data;
memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = args->call_stack;
gpr_mu_init(&deadline_state->timer_mu);
// Deadline will always be infinite on servers, so the timer will only be
// set on clients with a finite deadline.
const gpr_timespec deadline =
gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
// When the deadline passes, we indicate the failure by sending down
// an op with cancel_error set. However, we can't send down any ops
// until after the call stack is fully initialized. If we start the
// timer here, we have no guarantee that the timer won't pop before
// call stack initialization is finished. To avoid that problem, we
// create a closure to start the timer, and we schedule that closure
// to be run after call stack initialization is done.
struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
state->elem = elem;
state->deadline = deadline;
grpc_closure_init(&state->closure, start_timer_after_init, state);
grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
}
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
grpc_deadline_state* deadline_state = elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
gpr_mu_destroy(&deadline_state->timer_mu);
}
void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_deadline_state* deadline_state = elem->call_data;
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata != NULL) {
inject_on_complete_cb(deadline_state, op);
}
}
}
//
// filter code
//
// Constructor for channel_data. Used for both client and server filters.
static void init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
}
// Destructor for channel_data. Used for both client and server filters.
static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element* elem) {}
// Call data used for both client and server filter.
typedef struct base_call_data {
grpc_deadline_state deadline_state;
} base_call_data;
// Additional call data used only for the server filter.
typedef struct server_call_data {
base_call_data base; // Must be first.
// The closure for receiving initial metadata.
grpc_closure recv_initial_metadata_ready;
// Received initial metadata batch.
grpc_metadata_batch* recv_initial_metadata;
// The original recv_initial_metadata_ready closure, which we chain to
// after our own closure is invoked.
grpc_closure* next_recv_initial_metadata_ready;
} server_call_data;
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
// Note: size of call data is different between client and server.
memset(elem->call_data, 0, elem->filter->sizeof_call_data);
grpc_deadline_state_init(exec_ctx, elem, args);
return GRPC_ERROR_NONE;
}
// Destructor for call_data. Used for both client and server filters.
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
void* and_free_memory) {
grpc_deadline_state_destroy(exec_ctx, elem);
}
// Method for starting a call op for client filter.
static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op);
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}
// Callback for receiving initial metadata on the server.
static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = arg;
server_call_data* calld = elem->call_data;
// Get deadline from metadata and start the timer if needed.
start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline);
// Invoke the next callback.
calld->next_recv_initial_metadata_ready->cb(
exec_ctx, calld->next_recv_initial_metadata_ready->cb_arg, error);
}
// Method for starting a call op for server filter.
static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_transport_stream_op* op) {
server_call_data* calld = elem->call_data;
if (op->cancel_error != GRPC_ERROR_NONE ||
op->close_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state);
} else {
// If we're receiving initial metadata, we need to get the deadline
// from the recv_initial_metadata_ready callback. So we inject our
// own callback into that hook.
if (op->recv_initial_metadata_ready != NULL) {
calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
calld->recv_initial_metadata = op->recv_initial_metadata;
grpc_closure_init(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem);
op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready;
}
// Make sure we know when the call is complete, so that we can cancel
// the timer.
// Note that we trigger this on recv_trailing_metadata, even though
// the client never sends trailing metadata, because this is the
// hook that tells us when the call is complete on the server side.
if (op->recv_trailing_metadata != NULL) {
inject_on_complete_cb(&calld->base.deadline_state, op);
}
}
// Chain to next filter.
grpc_call_next_op(exec_ctx, elem, op);
}
const grpc_channel_filter grpc_client_deadline_filter = {
client_start_transport_stream_op,
grpc_channel_next_op,
sizeof(base_call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"deadline",
};
const grpc_channel_filter grpc_server_deadline_filter = {
server_start_transport_stream_op,
grpc_channel_next_op,
sizeof(server_call_data),
init_call_elem,
grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
0, // sizeof(channel_data)
init_channel_elem,
destroy_channel_elem,
grpc_call_next_get_peer,
"deadline",
};

@ -0,0 +1,79 @@
//
// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
#ifndef GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
#define GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
// Guards access to timer_pending and timer.
gpr_mu timer_mu;
// True if the timer callback is currently pending.
bool timer_pending;
// The deadline timer.
grpc_timer timer;
// Closure to invoke when the call is complete.
// We use this to cancel the timer.
grpc_closure on_complete;
// The original on_complete closure, which we chain to after our own
// closure is invoked.
grpc_closure* next_on_complete;
} grpc_deadline_state;
// To be used in a filter's init_call_elem(), destroy_call_elem(), and
// start_transport_stream_op() methods to enforce call deadlines.
//
// REQUIRES: The first field in elem->call_data is a grpc_deadline_state.
//
// For grpc_deadline_state_client_start_transport_stream_op(), it is the
// caller's responsibility to chain to the next filter if necessary
// after the function returns.
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_element_args* args);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem);
void grpc_deadline_state_client_start_transport_stream_op(
grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_transport_stream_op* op);
// Deadline filters for direct client channels and server channels.
// Note: Deadlines for non-direct client channels are handled by the
// client_channel filter.
extern const grpc_channel_filter grpc_client_deadline_filter;
extern const grpc_channel_filter grpc_server_deadline_filter;
#endif /* GRPC_CORE_LIB_CHANNEL_DEADLINE_FILTER_H */

@ -103,8 +103,8 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
grpc_mdstr_as_c_string(md->value));
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_cancel_with_message(a->exec_ctx, a->elem,
GRPC_STATUS_CANCELLED, &message);
grpc_call_element_send_close_with_message(a->exec_ctx, a->elem,
GRPC_STATUS_CANCELLED, &message);
return NULL;
} else if (md == GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC) {
return NULL;

@ -40,8 +40,9 @@
#include "src/core/lib/channel/channel_args.h"
#define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited.
// The protobuf library will (by default) start warning at 100 megs.
#define DEFAULT_MAX_MESSAGE_LENGTH (4 * 1024 * 1024)
#define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024)
typedef struct call_data {
// Receive closures are chained: we inject this closure as the
@ -55,8 +56,8 @@ typedef struct call_data {
} call_data;
typedef struct channel_data {
size_t max_send_size;
size_t max_recv_size;
int max_send_size;
int max_recv_size;
} channel_data;
// Callback invoked when we receive a message. Here we check the max
@ -66,15 +67,15 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data,
grpc_call_element* elem = user_data;
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
if (*calld->recv_message != NULL &&
(*calld->recv_message)->length > chand->max_recv_size) {
if (*calld->recv_message != NULL && chand->max_recv_size >= 0 &&
(*calld->recv_message)->length > (size_t)chand->max_recv_size) {
char* message_string;
gpr_asprintf(
&message_string, "Received message larger than max (%u vs. %lu)",
(*calld->recv_message)->length, (unsigned long)chand->max_recv_size);
gpr_asprintf(&message_string,
"Received message larger than max (%u vs. %d)",
(*calld->recv_message)->length, chand->max_recv_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_cancel_with_message(
grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Invoke the next callback.
@ -88,14 +89,14 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
call_data* calld = elem->call_data;
channel_data* chand = elem->channel_data;
// Check max send message size.
if (op->send_message != NULL &&
op->send_message->length > chand->max_send_size) {
if (op->send_message != NULL && chand->max_send_size >= 0 &&
op->send_message->length > (size_t)chand->max_send_size) {
char* message_string;
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %lu)",
op->send_message->length, (unsigned long)chand->max_send_size);
gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)",
op->send_message->length, chand->max_send_size);
gpr_slice message = gpr_slice_from_copied_string(message_string);
gpr_free(message_string);
grpc_call_element_send_cancel_with_message(
grpc_call_element_send_close_with_message(
exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message);
}
// Inject callback for receiving a message.
@ -130,19 +131,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx,
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
memset(chand, 0, sizeof(*chand));
chand->max_send_size = DEFAULT_MAX_MESSAGE_LENGTH;
chand->max_recv_size = DEFAULT_MAX_MESSAGE_LENGTH;
const grpc_integer_options options = {DEFAULT_MAX_MESSAGE_LENGTH, 0, INT_MAX};
chand->max_send_size = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
chand->max_recv_size = DEFAULT_MAX_RECV_MESSAGE_LENGTH;
for (size_t i = 0; i < args->channel_args->num_args; ++i) {
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_SEND_MESSAGE_LENGTH) == 0) {
chand->max_send_size = (size_t)grpc_channel_arg_get_integer(
&args->channel_args->args[i], options);
const grpc_integer_options options = {DEFAULT_MAX_SEND_MESSAGE_LENGTH, 0,
INT_MAX};
chand->max_send_size =
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
if (strcmp(args->channel_args->args[i].key,
GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH) == 0) {
chand->max_recv_size = (size_t)grpc_channel_arg_get_integer(
&args->channel_args->args[i], options);
const grpc_integer_options options = {DEFAULT_MAX_RECV_MESSAGE_LENGTH, 0,
INT_MAX};
chand->max_recv_size =
grpc_channel_arg_get_integer(&args->channel_args->args[i], options);
}
}
}

@ -324,6 +324,64 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
}
typedef struct {
grpc_error *error;
grpc_status_code code;
const char *msg;
} special_error_status_map;
static special_error_status_map error_status_map[] = {
{GRPC_ERROR_NONE, GRPC_STATUS_OK, ""},
{GRPC_ERROR_CANCELLED, GRPC_STATUS_CANCELLED, "RPC cancelled"},
{GRPC_ERROR_OOM, GRPC_STATUS_RESOURCE_EXHAUSTED, "Out of memory"},
};
static grpc_error *recursively_find_error_with_status(grpc_error *error,
intptr_t *status) {
// If the error itself has a status code, return it.
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, status)) {
return error;
}
// Otherwise, search through its children.
intptr_t key = 0;
while (true) {
grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
if (child_error == NULL) break;
grpc_error *result =
recursively_find_error_with_status(child_error, status);
if (result != NULL) return result;
}
return NULL;
}
void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
const char **msg) {
// Handle special errors via the static map.
for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); ++i) {
if (error == error_status_map[i].error) {
*code = error_status_map[i].code;
*msg = error_status_map[i].msg;
return;
}
}
// Populate code.
// Start with the parent error and recurse through the tree of children
// until we find the first one that has a status code.
intptr_t status = GRPC_STATUS_UNKNOWN; // Default in case we don't find one.
grpc_error *found_error = recursively_find_error_with_status(error, &status);
*code = (grpc_status_code)status;
// Now populate msg.
// If we found an error with a status code above, use that; otherwise,
// fall back to using the parent error.
if (found_error == NULL) found_error = error;
// If the error has a status message, use it. Otherwise, fall back to
// the error description.
*msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_GRPC_MESSAGE);
if (*msg == NULL) {
*msg = grpc_error_get_str(found_error, GRPC_ERROR_STR_DESCRIPTION);
if (*msg == NULL) *msg = "uknown error"; // Just in case.
}
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);

@ -37,6 +37,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <grpc/status.h>
#include <grpc/support/time.h>
/// Opaque representation of an error.
@ -179,6 +180,13 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
/// Returns NULL if the specified string is not set.
/// Caller does NOT own return value.
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
/// A utility function to get the status code and message to be returned
/// to the application. If not set in the top-level message, looks
/// through child errors until it finds the first one with these attributes.
void grpc_error_get_status(grpc_error *error, grpc_status_code *code,
const char **msg);
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
/// errors that contributed to them.

@ -176,62 +176,58 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
if (error == GRPC_ERROR_NONE) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
&so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
error = GRPC_OS_ERROR(errno, "getsockopt");
goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
memory in the kernel for the data structures allocated
when you connect a socket. If this happens it is very
likely that if we wait a little bit then try again the
connection will work (since other programs or this
program will close their network connections and free up
memory). This does _not_ indicate that there is anything
wrong with the server we are connecting to, this is a
local problem.
If you are looking at this code, then chances are that
your program or another program on the same computer
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
gpr_mu_unlock(&ac->mu);
grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
return;
} else {
switch (so_error) {
case ECONNREFUSED:
error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, errno);
error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
"Connection refused");
break;
default:
error = GRPC_OS_ERROR(errno, "getsockopt(SO_ERROR)");
break;
}
goto finish;
}
} else {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
*ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args,
ac->addr_str);
fd = NULL;
goto finish;
}
} else {
if (error != GRPC_ERROR_NONE) {
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, "Timeout occurred");
goto finish;
}
GPR_UNREACHABLE_CODE(return );
do {
so_error_size = sizeof(so_error);
err = getsockopt(grpc_fd_wrapped_fd(fd), SOL_SOCKET, SO_ERROR, &so_error,
&so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
error = GRPC_OS_ERROR(errno, "getsockopt");
goto finish;
}
switch (so_error) {
case 0:
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
*ep = grpc_tcp_client_create_from_fd(exec_ctx, fd, ac->channel_args,
+ac->addr_str);
fd = NULL;
break;
case ENOBUFS:
/* We will get one of these errors if we have run out of
memory in the kernel for the data structures allocated
when you connect a socket. If this happens it is very
likely that if we wait a little bit then try again the
connection will work (since other programs or this
program will close their network connections and free up
memory). This does _not_ indicate that there is anything
wrong with the server we are connecting to, this is a
local problem.
If you are looking at this code, then chances are that
your program or another program on the same computer
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
gpr_mu_unlock(&ac->mu);
grpc_fd_notify_on_write(exec_ctx, fd, &ac->write_closure);
return;
case ECONNREFUSED:
/* This error shouldn't happen for anything other than connect(). */
error = GRPC_OS_ERROR(so_error, "connect");
break;
default:
/* We don't really know which syscall triggered the problem here,
so punt by reporting getsockopt(). */
error = GRPC_OS_ERROR(so_error, "getsockopt(SO_ERROR)");
break;
}
finish:
if (fd != NULL) {

@ -126,8 +126,6 @@ struct grpc_call {
/* client or server call */
bool is_client;
/* is the alarm set */
bool have_alarm;
/** has grpc_call_destroy been called */
bool destroy_called;
/** flag indicating that cancellation is inherited */
@ -170,9 +168,6 @@ struct grpc_call {
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
/* Deadline alarm - if have_alarm is non-zero */
grpc_timer alarm;
/* for the client, extra metadata is initial metadata; for the
server, it's trailing metadata */
grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
@ -215,8 +210,6 @@ struct grpc_call {
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
gpr_timespec deadline);
static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_transport_stream_op *op);
static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
@ -264,40 +257,9 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
}
}
call->send_deadline =
gpr_timespec send_deadline =
gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC);
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, destroy_call, call, call->context,
args->server_transport_data, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
intptr_t status;
if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
status = GRPC_STATUS_UNKNOWN;
}
const char *error_str =
grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
close_with_status(&exec_ctx, call, (grpc_status_code)status,
error_str == NULL ? "unknown error" : error_str);
}
if (args->cq != NULL) {
GPR_ASSERT(
args->pollset_set_alternative == NULL &&
"Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
GRPC_CQ_INTERNAL_REF(args->cq, "bind");
call->pollent =
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
}
if (args->pollset_set_alternative != NULL) {
call->pollent = grpc_polling_entity_create_from_pollset_set(
args->pollset_set_alternative);
}
if (!grpc_polling_entity_is_empty(&call->pollent)) {
grpc_call_stack_set_pollset_or_pollset_set(
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
gpr_timespec send_deadline = args->send_deadline;
if (args->parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(args->parent_call, "child");
GPR_ASSERT(call->is_client);
@ -339,10 +301,38 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args,
gpr_mu_unlock(&args->parent_call->mu);
}
if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
0) {
set_deadline_alarm(&exec_ctx, call, send_deadline);
call->send_deadline = send_deadline;
GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
/* initial refcount dropped by grpc_call_destroy */
grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, destroy_call, call, call->context,
args->server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
if (error != GRPC_ERROR_NONE) {
grpc_status_code status;
const char *error_str;
grpc_error_get_status(error, &status, &error_str);
close_with_status(&exec_ctx, call, status, error_str);
GRPC_ERROR_UNREF(error);
}
if (args->cq != NULL) {
GPR_ASSERT(
args->pollset_set_alternative == NULL &&
"Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
GRPC_CQ_INTERNAL_REF(args->cq, "bind");
call->pollent =
grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
}
if (args->pollset_set_alternative != NULL) {
call->pollent = grpc_polling_entity_create_from_pollset_set(
args->pollset_set_alternative);
}
if (!grpc_polling_entity_is_empty(&call->pollent)) {
grpc_call_stack_set_pollset_or_pollset_set(
&exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
grpc_exec_ctx_finish(&exec_ctx);
GPR_TIMER_END("grpc_call_create", 0);
return error;
@ -455,20 +445,11 @@ static void set_status_details(grpc_call *call, status_source source,
static void set_status_from_error(grpc_call *call, status_source source,
grpc_error *error) {
intptr_t status;
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
set_status_code(call, source, (uint32_t)status);
} else {
set_status_code(call, source, GRPC_STATUS_INTERNAL);
}
const char *msg = grpc_error_get_str(error, GRPC_ERROR_STR_GRPC_MESSAGE);
bool free_msg = false;
if (msg == NULL) {
free_msg = true;
msg = grpc_error_string(error);
}
grpc_status_code status;
const char *msg;
grpc_error_get_status(error, &status, &msg);
set_status_code(call, source, (uint32_t)status);
set_status_details(call, source, grpc_mdstr_from_string(msg));
if (free_msg) grpc_error_free_string(msg);
}
static void set_incoming_compression_algorithm(
@ -741,9 +722,6 @@ void grpc_call_destroy(grpc_call *c) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->destroy_called);
c->destroy_called = 1;
if (c->have_alarm) {
grpc_timer_cancel(&exec_ctx, &c->alarm);
}
cancel = !c->received_final_op;
gpr_mu_unlock(&c->mu);
if (cancel) grpc_call_cancel(c, NULL);
@ -781,7 +759,6 @@ typedef struct termination_closure {
grpc_closure closure;
grpc_call *call;
grpc_error *error;
grpc_closure *op_closure;
enum { TC_CANCEL, TC_CLOSE } type;
grpc_transport_stream_op op;
} termination_closure;
@ -798,7 +775,6 @@ static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp,
break;
}
GRPC_ERROR_UNREF(tc->error);
grpc_exec_ctx_sched(exec_ctx, tc->op_closure, GRPC_ERROR_NONE, NULL);
gpr_free(tc);
}
@ -818,7 +794,6 @@ static void send_close(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) {
tc->op.close_error = tc->error;
/* reuse closure to catch completion */
grpc_closure_init(&tc->closure, done_termination, tc);
tc->op_closure = tc->op.on_complete;
tc->op.on_complete = &tc->closure;
execute_op(exec_ctx, tc->call, &tc->op);
}
@ -901,32 +876,6 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
if (error != GRPC_ERROR_CANCELLED) {
cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
"Deadline Exceeded");
}
gpr_mu_unlock(&call->mu);
GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
}
static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
gpr_timespec deadline) {
if (call->have_alarm) {
gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
assert(0);
return;
}
GRPC_CALL_INTERNAL_REF(call, "alarm");
call->have_alarm = 1;
call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
gpr_now(GPR_CLOCK_MONOTONIC));
}
/* we offset status by a small amount when storing it into transport metadata
as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
*/
@ -1280,9 +1229,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
GPR_TIMER_BEGIN("set_deadline_alarm", 0);
set_deadline_alarm(exec_ctx, call, md->deadline);
GPR_TIMER_END("set_deadline_alarm", 0);
call->send_deadline =
gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
}
}
@ -1311,9 +1259,17 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
GRPC_ERROR_REF(error);
gpr_mu_lock(&call->mu);
// If the error has an associated status code, set the call's status.
intptr_t status;
if (error != GRPC_ERROR_NONE &&
grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
set_status_from_error(call, STATUS_FROM_CORE, error);
}
if (bctl->send_initial_metadata) {
if (error != GRPC_ERROR_NONE) {
set_status_code(call, STATUS_FROM_CORE, GRPC_STATUS_UNAVAILABLE);
set_status_from_error(call, STATUS_FROM_CORE, error);
}
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
@ -1331,9 +1287,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_metadata_batch_filter(md, recv_trailing_filter, call);
call->received_final_op = true;
if (call->have_alarm) {
grpc_timer_cancel(exec_ctx, &call->alarm);
}
/* propagate cancellation to any interested children */
child_call = call->first_child;
if (child_call != NULL) {

@ -60,6 +60,8 @@ typedef struct grpc_cq_completion {
uintptr_t next;
} grpc_cq_completion;
//#define GRPC_CQ_REF_COUNT_DEBUG
#ifdef GRPC_CQ_REF_COUNT_DEBUG
void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
const char *file, int line);

@ -43,6 +43,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/deadline_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
@ -100,6 +101,12 @@ static bool maybe_add_http_filter(grpc_channel_stack_builder *builder,
}
static void register_builtin_channel_init() {
grpc_channel_init_register_stage(
GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_client_deadline_filter);
grpc_channel_init_register_stage(
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
(void *)&grpc_server_deadline_filter);
grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);

@ -226,7 +226,7 @@ void grpc_transport_stream_op_add_cancellation_with_message(
error = GRPC_ERROR_CREATE("Call cancelled");
}
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status);
add_error(op, &op->close_error, error);
add_error(op, &op->cancel_error, error);
}
void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,

@ -32,7 +32,7 @@
*/
// Generated by the gRPC protobuf plugin.
// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// If you make any local change, they will be lost.
// source: reflection.proto

@ -32,7 +32,7 @@
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
// Generated by tools/codegen/extensions/gen_reflection_proto.sh
// source: reflection.proto
#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION

@ -136,6 +136,7 @@ cdef extern from "grpc/grpc.h":
const char *GRPC_ARG_ENABLE_CENSUS
const char *GRPC_ARG_MAX_CONCURRENT_STREAMS
const char *GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
const char *GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
const char *GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
const char *GRPC_ARG_DEFAULT_AUTHORITY
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING

@ -39,7 +39,8 @@ class ConnectivityState:
class ChannelArgKey:
enable_census = GRPC_ARG_ENABLE_CENSUS
max_concurrent_streams = GRPC_ARG_MAX_CONCURRENT_STREAMS
max_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
max_receive_message_length = GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
max_send_message_length = GRPC_ARG_MAX_SEND_MESSAGE_LENGTH
http2_initial_sequence_number = GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER
default_authority = GRPC_ARG_DEFAULT_AUTHORITY
primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING

@ -83,6 +83,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/channel/channel_stack_builder.c',
'src/core/lib/channel/compress_filter.c',
'src/core/lib/channel/connected_channel.c',
'src/core/lib/channel/deadline_filter.c',
'src/core/lib/channel/handshaker.c',
'src/core/lib/channel/http_client_filter.c',
'src/core/lib/channel/http_server_filter.c',

@ -135,9 +135,9 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
grpc_error *error =
grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack,
NULL, NULL, call_stack);
grpc_error *error = grpc_call_stack_init(
&exec_ctx, channel_stack, 1, free_call, call_stack, NULL, NULL,
gpr_inf_future(GPR_CLOCK_MONOTONIC), call_stack);
GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);

@ -41,7 +41,10 @@
bool squelch = true;
bool leak_check = true;
static void dont_log(gpr_log_func_args *args) {}
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
if (squelch) gpr_set_log_function(dont_log);
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
grpc_grpclb_initial_response *response;
if ((response = grpc_grpclb_initial_response_parse(slice))) {

@ -41,7 +41,10 @@
bool squelch = true;
bool leak_check = true;
static void dont_log(gpr_log_func_args *args) {}
int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
if (squelch) gpr_set_log_function(dont_log);
gpr_slice slice = gpr_slice_from_copied_buffer((const char *)data, size);
grpc_grpclb_serverlist *serverlist;
if ((serverlist = grpc_grpclb_response_parse_serverlist(slice))) {

@ -29,20 +29,39 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
set -e
cd $(dirname $0)/../../..
PROTO_DIR="src/proto/grpc/reflection/v1alpha"
PROTO_FILE="reflection"
HEADER_DIR="include/grpc++/ext"
SRC_DIR="src/cpp/ext"
INCLUDE_DIR="grpc++/ext"
TMP_DIR="tmp"
GRPC_PLUGIN="bins/opt/grpc_cpp_plugin"
PROTOC="bins/opt/protobuf/protoc"
set -e
if hash grpc_cpp_plugin 2>/dev/null; then
GRPC_PLUGIN=$(which grpc_cpp_plugin)
else
if [ -f bins/opt/grpc_cpp_plugin ]; then
GRPC_PLUGIN="bins/opt/grpc_cpp_plugin"
else
echo "gRPC protoc plugin not found"
exit 1
fi
fi
TMP_DIR=${TMP_DIR}_${PROTO_FILE}
if hash protoc 2>/dev/null; then
PROTOC=$(which protoc)
else
if [ -f bins/opt/protobuf/protoc ]; then
PROTOC="bins/opt/protobuf/protoc"
else
echo "protoc not found"
exit 1
fi
fi
cd $(dirname $0)/../../..
TMP_DIR=${TMP_DIR}_${PROTO_FILE}
[ ! -d $HEADER_DIR ] && mkdir -p $HEADER_DIR || :
[ ! -d $SRC_DIR ] && mkdir -p $SRC_DIR || :
@ -56,6 +75,9 @@ sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g"
sed -i "s/\"${PROTO_FILE}.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc
sed -i "s/\"${PROTO_FILE}.grpc.pb.h\"/<${INCLUDE_DIR/\//\\\/}\/${PROTO_FILE}.grpc.pb.h>/g" ${TMP_DIR}/${PROTO_FILE}.grpc.pb.cc
sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.h
sed -i "1s/.*/\/\/ Generated by tools\/codegen\/extensions\/gen_reflection_proto.sh/g" ${TMP_DIR}/*.pb.cc
/bin/cp LICENSE ${TMP_DIR}/TMP_LICENSE
sed -i -e "s/./ &/" -e "s/.*/ \*&/" ${TMP_DIR}/TMP_LICENSE
sed -i -r "\$a\ *\n *\/\n\n" ${TMP_DIR}/TMP_LICENSE

@ -877,6 +877,7 @@ src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/compress_filter.h \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/deadline_filter.h \
src/core/lib/channel/handshaker.h \
src/core/lib/channel/http_client_filter.h \
src/core/lib/channel/http_server_filter.h \
@ -993,6 +994,7 @@ src/core/lib/channel/channel_stack.c \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \

@ -792,6 +792,7 @@ src/core/lib/channel/channel_stack_builder.h \
src/core/lib/channel/compress_filter.h \
src/core/lib/channel/connected_channel.h \
src/core/lib/channel/context.h \
src/core/lib/channel/deadline_filter.h \
src/core/lib/channel/handshaker.h \
src/core/lib/channel/http_client_filter.h \
src/core/lib/channel/http_server_filter.h \
@ -956,6 +957,7 @@ src/core/lib/channel/channel_stack.c \
src/core/lib/channel/channel_stack_builder.c \
src/core/lib/channel/compress_filter.c \
src/core/lib/channel/connected_channel.c \
src/core/lib/channel/deadline_filter.c \
src/core/lib/channel/handshaker.c \
src/core/lib/channel/http_client_filter.c \
src/core/lib/channel/http_server_filter.c \

@ -5996,6 +5996,7 @@
"src/core/lib/channel/compress_filter.h",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.h",
"src/core/lib/channel/http_server_filter.h",
@ -6095,6 +6096,8 @@
"src/core/lib/channel/connected_channel.c",
"src/core/lib/channel/connected_channel.h",
"src/core/lib/channel/context.h",
"src/core/lib/channel/deadline_filter.c",
"src/core/lib/channel/deadline_filter.h",
"src/core/lib/channel/handshaker.c",
"src/core/lib/channel/handshaker.h",
"src/core/lib/channel/http_client_filter.c",

@ -377,6 +377,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@ -534,6 +535,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">

@ -118,6 +118,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@ -734,6 +737,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>

@ -373,6 +373,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@ -520,6 +521,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">

@ -103,6 +103,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@ -707,6 +710,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>

@ -301,6 +301,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@ -473,6 +474,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">

@ -19,6 +19,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@ -683,6 +686,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>

@ -193,6 +193,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@ -318,6 +319,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">

@ -70,6 +70,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@ -464,6 +467,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>

@ -291,6 +291,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\compress_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\http_server_filter.h" />
@ -441,6 +442,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\http_client_filter.c">

@ -22,6 +22,9 @@
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\connected_channel.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.c">
<Filter>src\core\lib\channel</Filter>
</ClCompile>
@ -593,6 +596,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\context.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\deadline_filter.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\channel\handshaker.h">
<Filter>src\core\lib\channel</Filter>
</ClInclude>

Loading…
Cancel
Save