Merge branch 'master' into update-deps-version

pull/18180/head
billfeng327 6 years ago
commit b9d784338d
  1. 5
      BUILD
  2. 1
      CMakeLists.txt
  3. 3
      Makefile
  4. 11
      bazel/generate_cc.bzl
  5. 1
      build.yaml
  6. 3
      doc/environment_variables.md
  7. 4
      doc/server_reflection_tutorial.md
  8. 1
      grpc.def
  9. 13
      include/grpc/grpc.h
  10. 50
      include/grpcpp/impl/codegen/client_callback.h
  11. 4
      setup.py
  12. 7
      src/core/ext/filters/client_channel/client_channel.cc
  13. 23
      src/core/ext/filters/client_channel/lb_policy.h
  14. 7
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  15. 4
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  16. 4
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  17. 90
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  18. 4
      src/core/ext/filters/client_channel/lb_policy_factory.h
  19. 2
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  20. 23
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  21. 3
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  22. 9
      src/core/ext/filters/client_channel/resolver_result_parsing.h
  23. 14
      src/core/ext/filters/client_channel/resolving_lb_policy.cc
  24. 15
      src/core/ext/filters/client_channel/resolving_lb_policy.h
  25. 2
      src/core/ext/filters/client_channel/subchannel.cc
  26. 2
      src/core/ext/filters/message_size/message_size_filter.cc
  27. 2
      src/core/ext/transport/chttp2/transport/hpack_parser.cc
  28. 5
      src/core/ext/transport/inproc/inproc_transport.cc
  29. 3
      src/core/lib/debug/trace.h
  30. 2
      src/core/lib/gprpp/atomic.h
  31. 49
      src/core/lib/gprpp/thd.h
  32. 44
      src/core/lib/gprpp/thd_posix.cc
  33. 54
      src/core/lib/gprpp/thd_windows.cc
  34. 108
      src/core/lib/surface/init.cc
  35. 1
      src/core/lib/surface/init.h
  36. 4
      src/core/lib/transport/service_config.cc
  37. 8
      src/core/lib/transport/service_config.h
  38. 15
      src/cpp/server/server_context.cc
  39. 24
      src/csharp/Grpc.Core.Api/ServerCallContext.cs
  40. 47
      src/csharp/Grpc.Core.Tests/Interceptors/ServerInterceptorTest.cs
  41. 2
      src/php/ext/grpc/php_grpc.c
  42. 69
      src/python/grpcio/commands.py
  43. 6
      src/python/grpcio/grpc/BUILD.bazel
  44. 2
      src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
  45. 2
      src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
  46. 2
      src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
  47. 8
      src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
  48. 2
      src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
  49. 2
      src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
  50. 2
      src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
  51. 5
      src/python/grpcio/grpc/_server.py
  52. 14
      src/python/grpcio/grpc/framework/common/BUILD.bazel
  53. 13
      src/python/grpcio/grpc/framework/foundation/BUILD.bazel
  54. 13
      src/python/grpcio/grpc/framework/interfaces/base/BUILD.bazel
  55. 6
      src/python/grpcio/grpc/framework/interfaces/face/BUILD.bazel
  56. 5
      src/python/grpcio_status/grpc_status/rpc_status.py
  57. 8
      src/python/grpcio_tests/tests/BUILD.bazel
  58. 32
      src/python/grpcio_tests/tests/bazel_namespace_package_hack.py
  59. 7
      src/python/grpcio_tests/tests/interop/BUILD.bazel
  60. 8
      src/python/grpcio_tests/tests/interop/methods.py
  61. 20
      src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py
  62. 1
      src/python/grpcio_tests/tests/status/BUILD.bazel
  63. 8
      src/python/grpcio_tests/tests/status/_grpc_status_test.py
  64. 1
      src/python/grpcio_tests/tests/unit/_abort_test.py
  65. 2
      src/python/grpcio_tests/tests/unit/thread_pool.py
  66. 2
      src/ruby/ext/grpc/rb_grpc_imports.generated.c
  67. 3
      src/ruby/ext/grpc/rb_grpc_imports.generated.h
  68. 3
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  69. 2
      test/core/end2end/fuzzers/api_fuzzer.cc
  70. 10
      test/core/end2end/fuzzers/client_fuzzer.cc
  71. 8
      test/core/end2end/fuzzers/server_fuzzer.cc
  72. 2
      test/core/handshake/readahead_handshaker_server_ssl.cc
  73. 14
      test/core/iomgr/resolve_address_test.cc
  74. 6
      test/core/json/fuzzer.cc
  75. 2
      test/core/memory_usage/client.cc
  76. 2
      test/core/memory_usage/server.cc
  77. 10
      test/core/security/alts_credentials_fuzzer.cc
  78. 10
      test/core/security/ssl_server_fuzzer.cc
  79. 33
      test/core/slice/percent_decode_fuzzer.cc
  80. 40
      test/core/slice/percent_encode_fuzzer.cc
  81. 21
      test/core/surface/init_test.cc
  82. 1
      test/core/surface/public_headers_must_be_c89.c
  83. 31
      test/core/util/memory_counters.cc
  84. 18
      test/core/util/memory_counters.h
  85. 2
      test/core/util/port.cc
  86. 3
      test/core/util/test_config.cc
  87. 25
      test/core/util/test_lb_policies.cc
  88. 1
      test/cpp/end2end/BUILD
  89. 709
      test/cpp/end2end/client_callback_end2end_test.cc
  90. 21
      test/cpp/end2end/client_lb_end2end_test.cc
  91. 15
      test/cpp/end2end/test_service_impl.cc
  92. 2
      test/cpp/naming/address_sorting_test.cc
  93. 16
      test/cpp/util/grpc_tool_test.cc
  94. 11
      third_party/py/python_configure.bzl
  95. 4
      tools/bazel.rc
  96. 2
      tools/internal_ci/linux/grpc_python_bazel_test_in_docker.sh
  97. 3
      tools/run_tests/generated/sources_and_headers.json

@ -63,6 +63,11 @@ config_setting(
values = {"cpu": "x64_windows_msvc"},
)
config_setting(
name = "python3",
values = {"python_path": "python3"},
)
config_setting(
name = "mac_x86_64",
values = {"cpu": "darwin"},

@ -12441,6 +12441,7 @@ if (gRPC_BUILD_TESTS)
add_executable(client_callback_end2end_test
test/cpp/end2end/client_callback_end2end_test.cc
test/cpp/end2end/interceptors_util.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)

@ -17464,6 +17464,7 @@ endif
CLIENT_CALLBACK_END2END_TEST_SRC = \
test/cpp/end2end/client_callback_end2end_test.cc \
test/cpp/end2end/interceptors_util.cc \
CLIENT_CALLBACK_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(CLIENT_CALLBACK_END2END_TEST_SRC))))
ifeq ($(NO_SECURE),true)
@ -17496,6 +17497,8 @@ endif
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/client_callback_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
$(OBJDIR)/$(CONFIG)/test/cpp/end2end/interceptors_util.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr.a
deps_client_callback_end2end_test: $(CLIENT_CALLBACK_END2END_TEST_OBJS:.o=.dep)
ifneq ($(NO_SECURE),true)

@ -28,7 +28,7 @@ def generate_cc_impl(ctx):
else:
outs += [proto.path[label_len:-len(".proto")] + ".pb.h" for proto in protos]
outs += [proto.path[label_len:-len(".proto")] + ".pb.cc" for proto in protos]
out_files = [ctx.new_file(out) for out in outs]
out_files = [ctx.actions.declare_file(out) for out in outs]
dir_out = str(ctx.genfiles_dir.path + proto_root)
arguments = []
@ -38,10 +38,10 @@ def generate_cc_impl(ctx):
if ctx.attr.generate_mocks:
flags.append("generate_mock_code=true")
arguments += ["--PLUGIN_out=" + ",".join(flags) + ":" + dir_out]
additional_input = [ctx.executable.plugin]
tools = [ctx.executable.plugin]
else:
arguments += ["--cpp_out=" + ",".join(ctx.attr.flags) + ":" + dir_out]
additional_input = []
tools = []
# Import protos relative to their workspace root so that protoc prints the
# right include paths.
@ -70,8 +70,9 @@ def generate_cc_impl(ctx):
arguments += ["-I{0}".format(f + "/../..")]
well_known_proto_files = [f for f in ctx.attr.well_known_protos.files]
ctx.action(
inputs = protos + includes + additional_input + well_known_proto_files,
ctx.actions.run(
inputs = protos + includes + well_known_proto_files,
tools = tools,
outputs = out_files,
executable = ctx.executable._protoc,
arguments = arguments,

@ -4468,6 +4468,7 @@ targets:
language: c++
src:
- test/cpp/end2end/client_callback_end2end_test.cc
- test/cpp/end2end/interceptors_util.cc
deps:
- grpc++_test_util
- grpc_test_util

@ -41,6 +41,9 @@ some configuration as environment variables that can be set.
- bdp_estimator - traces behavior of bdp estimation logic
- call_combiner - traces call combiner state
- call_error - traces the possible errors contributing to final call status
- cares_resolver - traces operations of the c-ares based DNS resolver
- cares_address_sorting - traces operations of the c-ares based DNS
resolver's resolved address sorter
- channel - traces operations on the C core channel stack
- client_channel - traces client channel activity, including resolver
and load balancing policy interaction

@ -15,8 +15,8 @@ server reflection, you can link this library to your server binary.
Some platforms (e.g. Ubuntu 11.10 onwards) only link in libraries that directly
contain symbols used by the application. On these platforms, LD flag
`--no-as-needed` is needed for for dynamic linking and `--whole-archive` is
needed for for static linking.
`--no-as-needed` is needed for dynamic linking and `--whole-archive` is
needed for static linking.
This [Makefile](../examples/cpp/helloworld/Makefile#L37#L45) demonstrates
enabling c++ server reflection on Linux and MacOS.

@ -16,7 +16,6 @@ EXPORTS
grpc_init
grpc_shutdown
grpc_is_initialized
grpc_shutdown_blocking
grpc_version_string
grpc_g_stands_for
grpc_completion_queue_factory_lookup

@ -73,11 +73,10 @@ GRPCAPI void grpc_init(void);
Before it's called, there should haven been a matching invocation to
grpc_init().
The last call to grpc_shutdown will initiate cleaning up of grpc library
internals, which can happen in another thread. Once the clean-up is done,
no memory is used by grpc, nor are any instructions executing within the
grpc library. Prior to calling, all application owned grpc objects must
have been destroyed. */
No memory is used by grpc after this call returns, nor are any instructions
executing within the grpc library.
Prior to calling, all application owned grpc objects must have been
destroyed. */
GRPCAPI void grpc_shutdown(void);
/** EXPERIMENTAL. Returns 1 if the grpc library has been initialized.
@ -86,10 +85,6 @@ GRPCAPI void grpc_shutdown(void);
https://github.com/grpc/grpc/issues/15334 */
GRPCAPI int grpc_is_initialized(void);
/** EXPERIMENTAL. Blocking shut down grpc library.
This is only for wrapped language to use now. */
GRPCAPI void grpc_shutdown_blocking(void);
/** Return a string representing the current version of grpc */
GRPCAPI const char* grpc_version_string(void);

@ -268,9 +268,8 @@ class ClientCallbackReaderWriterImpl
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any read backlog
// 3. Recv trailing metadata, on_completion callback
// 4. Any write backlog
// 5. See if the call can finish (if other callbacks were triggered already)
// 3. Any write backlog
// 4. Recv trailing metadata, on_completion callback
started_ = true;
start_tag_.Set(call_.call(),
@ -308,12 +307,6 @@ class ClientCallbackReaderWriterImpl
call_.PerformOps(&read_ops_);
}
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
&finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
if (write_ops_at_start_) {
call_.PerformOps(&write_ops_);
}
@ -321,7 +314,12 @@ class ClientCallbackReaderWriterImpl
if (writes_done_ops_at_start_) {
call_.PerformOps(&writes_done_ops_);
}
MaybeFinish();
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
&finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
}
void Read(Response* msg) override {
@ -414,8 +412,8 @@ class ClientCallbackReaderWriterImpl
CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
// Minimum of 3 callbacks to pre-register for StartCall, start, and finish
std::atomic_int callbacks_outstanding_{3};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
bool started_{false};
};
@ -468,7 +466,6 @@ class ClientCallbackReaderImpl
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Any backlog
// 3. Recv trailing metadata, on_completion callback
// 4. See if the call can finish (if other callbacks were triggered already)
started_ = true;
start_tag_.Set(call_.call(),
@ -500,8 +497,6 @@ class ClientCallbackReaderImpl
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
MaybeFinish();
}
void Read(Response* msg) override {
@ -545,8 +540,8 @@ class ClientCallbackReaderImpl
CallbackWithSuccessTag read_tag_;
bool read_ops_at_start_{false};
// Minimum of 3 callbacks to pre-register for StartCall, start, and finish
std::atomic_int callbacks_outstanding_{3};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
bool started_{false};
};
@ -597,9 +592,8 @@ class ClientCallbackWriterImpl
void StartCall() override {
// This call initiates two batches, plus any backlog, each with a callback
// 1. Send initial metadata (unless corked) + recv initial metadata
// 2. Recv trailing metadata, on_completion callback
// 3. Any backlog
// 4. See if the call can finish (if other callbacks were triggered already)
// 2. Any backlog
// 3. Recv trailing metadata, on_completion callback
started_ = true;
start_tag_.Set(call_.call(),
@ -626,12 +620,6 @@ class ClientCallbackWriterImpl
&write_ops_);
write_ops_.set_core_cq_tag(&write_tag_);
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
&finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
if (write_ops_at_start_) {
call_.PerformOps(&write_ops_);
}
@ -640,7 +628,11 @@ class ClientCallbackWriterImpl
call_.PerformOps(&writes_done_ops_);
}
MaybeFinish();
finish_tag_.Set(call_.call(), [this](bool ok) { MaybeFinish(); },
&finish_ops_);
finish_ops_.ClientRecvStatus(context_, &finish_status_);
finish_ops_.set_core_cq_tag(&finish_tag_);
call_.PerformOps(&finish_ops_);
}
void Write(const Request* msg, WriteOptions options) override {
@ -722,8 +714,8 @@ class ClientCallbackWriterImpl
CallbackWithSuccessTag writes_done_tag_;
bool writes_done_ops_at_start_{false};
// Minimum of 3 callbacks to pre-register for StartCall, start, and finish
std::atomic_int callbacks_outstanding_{3};
// Minimum of 2 callbacks to pre-register for start and finish
std::atomic_int callbacks_outstanding_{2};
bool started_{false};
};

@ -159,7 +159,7 @@ if EXTRA_ENV_COMPILE_ARGS is None:
elif "linux" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -std=gnu99 -fvisibility=hidden -fno-wrapv -fno-exceptions'
elif "darwin" in sys.platform:
EXTRA_ENV_COMPILE_ARGS += ' -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -stdlib=libc++ -fvisibility=hidden -fno-wrapv -fno-exceptions'
EXTRA_ENV_COMPILE_ARGS += ' -DPB_FIELD_32BIT'
if EXTRA_ENV_LINK_ARGS is None:
@ -265,7 +265,7 @@ def cython_extensions_and_necessity():
for name in CYTHON_EXTENSION_MODULE_NAMES]
config = os.environ.get('CONFIG', 'opt')
prefix = 'libs/' + config + '/'
if "darwin" in sys.platform or USE_PREBUILT_GRPC_CORE:
if USE_PREBUILT_GRPC_CORE:
extra_objects = [prefix + 'libares.a',
prefix + 'libboringssl.a',
prefix + 'libgpr.a',

@ -249,10 +249,9 @@ class ClientChannelControlHelper
// Synchronous callback from chand->resolving_lb_policy to process a resolver
// result update.
static bool process_resolver_result_locked(void* arg,
const grpc_channel_args& args,
const char** lb_policy_name,
grpc_json** lb_policy_config) {
static bool process_resolver_result_locked(
void* arg, const grpc_channel_args& args, const char** lb_policy_name,
grpc_core::RefCountedPtr<LoadBalancingPolicy::Config>* lb_policy_config) {
channel_data* chand = static_cast<channel_data*>(arg);
chand->have_service_config = true;
ProcessedResolverResult resolver_result(args, chand->enable_retries);

@ -30,6 +30,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/service_config.h"
extern grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount;
@ -214,6 +215,23 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
GRPC_ABSTRACT_BASE_CLASS
};
// Configuration for an LB policy instance.
class Config : public RefCounted<Config> {
public:
Config(const grpc_json* lb_config,
RefCountedPtr<ServiceConfig> service_config)
: json_(lb_config), service_config_(std::move(service_config)) {}
const grpc_json* json() const { return json_; }
RefCountedPtr<ServiceConfig> service_config() const {
return service_config_;
}
private:
const grpc_json* json_;
RefCountedPtr<ServiceConfig> service_config_;
};
/// Args used to instantiate an LB policy.
struct Args {
/// The combiner under which all LB policy calls will be run.
@ -243,7 +261,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
virtual void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) GRPC_ABSTRACT;
RefCountedPtr<Config> lb_config) {
std::move(lb_config); // Suppress clang-tidy complaint.
GRPC_ABSTRACT;
}
/// Tries to enter a READY connectivity state.
/// This is a no-op by default, since most LB policies never go into

@ -127,7 +127,7 @@ class GrpcLb : public LoadBalancingPolicy {
const char* name() const override { return kGrpclb; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
RefCountedPtr<Config> lb_config) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
@ -1171,7 +1171,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
GrpcLb::GrpcLb(LoadBalancingPolicy::Args args)
GrpcLb::GrpcLb(Args args)
: LoadBalancingPolicy(std::move(args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@ -1321,7 +1321,8 @@ void GrpcLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
void GrpcLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
void GrpcLb::UpdateLocked(const grpc_channel_args& args,
RefCountedPtr<Config> lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
ProcessChannelArgsLocked(args);
// Update the existing RR policy.

@ -51,7 +51,7 @@ class PickFirst : public LoadBalancingPolicy {
const char* name() const override { return kPickFirst; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
RefCountedPtr<Config> lb_config) override;
void ExitIdleLocked() override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
@ -231,7 +231,7 @@ void PickFirst::UpdateChildRefsLocked() {
}
void PickFirst::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
RefCountedPtr<Config> lb_config) {
AutoChildRefsUpdater guard(this);
const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
if (addresses == nullptr) {

@ -62,7 +62,7 @@ class RoundRobin : public LoadBalancingPolicy {
const char* name() const override { return kRoundRobin; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
RefCountedPtr<Config> lb_config) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
channelz::ChildRefsList* ignored) override;
@ -477,7 +477,7 @@ void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
}
void RoundRobin::UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) {
RefCountedPtr<Config> lb_config) {
AutoChildRefsUpdater guard(this);
const ServerAddressList* addresses = FindServerAddressListChannelArg(&args);
if (addresses == nullptr) {

@ -122,7 +122,7 @@ class XdsLb : public LoadBalancingPolicy {
const char* name() const override { return kXds; }
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override;
RefCountedPtr<Config> lb_config) override;
void ResetBackoffLocked() override;
void FillChildRefsForChannelz(
channelz::ChildRefsList* child_subchannels,
@ -242,9 +242,9 @@ class XdsLb : public LoadBalancingPolicy {
// Parses the xds config given the JSON node of the first child of XdsConfig.
// If parsing succeeds, updates \a balancer_name, and updates \a
// child_policy_json_dump_ and \a fallback_policy_json_dump_ if they are also
// child_policy_config_ and \a fallback_policy_config_ if they are also
// found. Does nothing upon failure.
void ParseLbConfig(grpc_json* xds_config_json);
void ParseLbConfig(Config* xds_config);
// Methods for dealing with the balancer channel and call.
void StartBalancerCallLocked();
@ -303,7 +303,8 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
UniquePtr<char> fallback_policy_json_string_;
UniquePtr<char> fallback_policy_name_;
RefCountedPtr<Config> fallback_policy_config_;
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
UniquePtr<ServerAddressList> fallback_backend_addresses_;
@ -313,8 +314,9 @@ class XdsLb : public LoadBalancingPolicy {
grpc_closure lb_on_fallback_;
// The policy to use for the backends.
UniquePtr<char> child_policy_name_;
RefCountedPtr<Config> child_policy_config_;
OrphanablePtr<LoadBalancingPolicy> child_policy_;
UniquePtr<char> child_policy_json_string_;
};
//
@ -952,8 +954,7 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
// TODO(vishalpowar): Use lb_config in args to configure LB policy.
XdsLb::XdsLb(LoadBalancingPolicy::Args args)
XdsLb::XdsLb(Args args)
: LoadBalancingPolicy(std::move(args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
@ -1087,11 +1088,12 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
void XdsLb::ParseLbConfig(grpc_json* xds_config_json) {
void XdsLb::ParseLbConfig(Config* xds_config) {
const grpc_json* xds_config_json = xds_config->json();
const char* balancer_name = nullptr;
grpc_json* child_policy = nullptr;
grpc_json* fallback_policy = nullptr;
for (grpc_json* field = xds_config_json; field != nullptr;
for (const grpc_json* field = xds_config_json; field != nullptr;
field = field->next) {
if (field->key == nullptr) return;
if (strcmp(field->key, "balancerName") == 0) {
@ -1108,19 +1110,22 @@ void XdsLb::ParseLbConfig(grpc_json* xds_config_json) {
}
if (balancer_name == nullptr) return; // Required field.
if (child_policy != nullptr) {
child_policy_json_string_ =
UniquePtr<char>(grpc_json_dump_to_string(child_policy, 0 /* indent */));
child_policy_name_ = UniquePtr<char>(gpr_strdup(child_policy->key));
child_policy_config_ = MakeRefCounted<Config>(child_policy->child,
xds_config->service_config());
}
if (fallback_policy != nullptr) {
fallback_policy_json_string_ = UniquePtr<char>(
grpc_json_dump_to_string(fallback_policy, 0 /* indent */));
fallback_policy_name_ = UniquePtr<char>(gpr_strdup(fallback_policy->key));
fallback_policy_config_ = MakeRefCounted<Config>(
fallback_policy->child, xds_config->service_config());
}
balancer_name_ = UniquePtr<char>(gpr_strdup(balancer_name));
}
void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
void XdsLb::UpdateLocked(const grpc_channel_args& args,
RefCountedPtr<Config> lb_config) {
const bool is_initial_update = lb_channel_ == nullptr;
ParseLbConfig(lb_config);
ParseLbConfig(lb_config.get());
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if (balancer_name_ == nullptr) {
@ -1285,30 +1290,13 @@ void XdsLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
// code for interacting with the child policy
//
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
GPR_ASSERT(child_policy_ == nullptr);
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(args));
if (GPR_UNLIKELY(child_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return;
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
}
grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
bool is_backend_from_grpclb_load_balancer = false;
// This should never be invoked if we do not have serverlist_, as fallback
// mode is disabled for xDS plugin.
GPR_ASSERT(serverlist_ != nullptr);
GPR_ASSERT(serverlist_->num_servers > 0);
UniquePtr<ServerAddressList> addresses = ProcessServerlist(serverlist_);
GPR_ASSERT(addresses != nullptr);
is_backend_from_grpclb_load_balancer = true;
// Replace the server address list in the channel args that we pass down to
// the subchannel.
static const char* keys_to_remove[] = {GRPC_ARG_SERVER_ADDRESS_LIST};
@ -1318,7 +1306,7 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
// grpclb load balancer.
grpc_channel_arg_integer_create(
const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_XDS_LOAD_BALANCER),
is_backend_from_grpclb_load_balancer),
1),
};
grpc_channel_args* args = grpc_channel_args_copy_and_add_and_remove(
args_, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add,
@ -1326,25 +1314,27 @@ grpc_channel_args* XdsLb::CreateChildPolicyArgsLocked() {
return args;
}
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
GPR_ASSERT(child_policy_ == nullptr);
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
name, std::move(args));
if (GPR_UNLIKELY(child_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return;
}
// Add the xDS's interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
// xDS LB, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
interested_parties());
}
void XdsLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
grpc_channel_args* args = CreateChildPolicyArgsLocked();
GPR_ASSERT(args != nullptr);
const char* child_policy_name = nullptr;
grpc_json* child_policy_config = nullptr;
grpc_json* child_policy_json =
grpc_json_parse_string(child_policy_json_string_.get());
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
if (child_policy_json != nullptr) {
child_policy_name = child_policy_json->key;
child_policy_config = child_policy_json->child;
} else {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] No valid child policy LB config", this);
}
child_policy_name = "round_robin";
}
// TODO(juanlishen): Switch policy according to child_policy_config->key.
if (child_policy_ == nullptr) {
LoadBalancingPolicy::Args lb_policy_args;
@ -1352,7 +1342,10 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
lb_policy_args.args = args;
lb_policy_args.channel_control_helper =
UniquePtr<ChannelControlHelper>(New<Helper>(Ref()));
CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args));
CreateChildPolicyLocked(child_policy_name_ == nullptr
? "round_robin"
: child_policy_name_.get(),
std::move(lb_policy_args));
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
child_policy_.get());
@ -1362,9 +1355,8 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
gpr_log(GPR_INFO, "[xdslb %p] Updating child policy %p", this,
child_policy_.get());
}
child_policy_->UpdateLocked(*args, child_policy_config);
child_policy_->UpdateLocked(*args, child_policy_config_);
grpc_channel_args_destroy(args);
grpc_json_destroy(child_policy_json);
}
//

@ -33,9 +33,7 @@ class LoadBalancingPolicyFactory {
virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const {
std::move(args); // Suppress clang-tidy complaint.
// The rest of this is copied from the GRPC_ABSTRACT macro.
gpr_log(GPR_ERROR, "Function marked GRPC_ABSTRACT was not implemented");
GPR_ASSERT(false);
GRPC_ABSTRACT;
}
/// Returns the LB policy name that this factory provides.

@ -489,7 +489,7 @@ void grpc_resolver_dns_ares_init() {
address_sorting_init();
grpc_error* error = grpc_ares_init();
if (error != GRPC_ERROR_NONE) {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
GRPC_LOG_IF_ERROR("grpc_ares_init() failed", error);
return;
}
if (default_resolver == nullptr) {

@ -47,9 +47,6 @@
using grpc_core::ServerAddress;
using grpc_core::ServerAddressList;
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
"cares_address_sorting");
@ -89,8 +86,6 @@ typedef struct grpc_ares_hostbyname_request {
bool is_balancer;
} grpc_ares_hostbyname_request;
static void do_basic_init(void) { gpr_mu_init(&g_init_mu); }
static void log_address_sorting_list(const ServerAddressList& addresses,
const char* input_output_str) {
for (size_t i = 0; i < addresses.size(); i++) {
@ -588,12 +583,12 @@ static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
void (*grpc_cancel_ares_request_locked)(grpc_ares_request* r) =
grpc_cancel_ares_request_locked_impl;
// ares_library_init and ares_library_cleanup are currently no-op except under
// Windows. Calling them may cause race conditions when other parts of the
// binary calls these functions concurrently.
#ifdef GPR_WINDOWS
grpc_error* grpc_ares_init(void) {
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
int status = ares_library_init(ARES_LIB_INIT_ALL);
gpr_mu_unlock(&g_init_mu);
if (status != ARES_SUCCESS) {
char* error_msg;
gpr_asprintf(&error_msg, "ares_library_init failed: %s",
@ -605,11 +600,11 @@ grpc_error* grpc_ares_init(void) {
return GRPC_ERROR_NONE;
}
void grpc_ares_cleanup(void) {
gpr_mu_lock(&g_init_mu);
ares_library_cleanup();
gpr_mu_unlock(&g_init_mu);
}
void grpc_ares_cleanup(void) { ares_library_cleanup(); }
#else
grpc_error* grpc_ares_init(void) { return GRPC_ERROR_NONE; }
void grpc_ares_cleanup(void) {}
#endif // GPR_WINDOWS
/*
* grpc_resolve_address_ares related structs and functions

@ -148,7 +148,8 @@ void ProcessedResolverResult::ParseLbConfigFromServiceConfig(
LoadBalancingPolicy::ParseLoadBalancingConfig(field);
if (policy != nullptr) {
lb_policy_name_.reset(gpr_strdup(policy->key));
lb_policy_config_ = policy->child;
lb_policy_config_ = MakeRefCounted<LoadBalancingPolicy::Config>(
policy->child, service_config_);
}
}

@ -21,6 +21,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/retry_throttle.h"
#include "src/core/lib/channel/status_util.h"
#include "src/core/lib/gprpp/ref_counted.h"
@ -60,7 +61,9 @@ class ProcessedResolverResult {
return std::move(method_params_table_);
}
UniquePtr<char> lb_policy_name() { return std::move(lb_policy_name_); }
grpc_json* lb_policy_config() { return lb_policy_config_; }
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config() {
return std::move(lb_policy_config_);
}
private:
// Finds the service config; extracts LB config and (maybe) retry throttle
@ -82,10 +85,10 @@ class ProcessedResolverResult {
// Service config.
UniquePtr<char> service_config_json_;
UniquePtr<grpc_core::ServiceConfig> service_config_;
RefCountedPtr<ServiceConfig> service_config_;
// LB policy.
grpc_json* lb_policy_config_ = nullptr;
UniquePtr<char> lb_policy_name_;
RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config_;
// Retry throttle data.
char* server_name_ = nullptr;
RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;

@ -117,14 +117,13 @@ class ResolvingLoadBalancingPolicy::ResolvingControlHelper
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name, grpc_json* child_lb_config,
UniquePtr<char> child_policy_name, RefCountedPtr<Config> child_lb_config,
grpc_error** error)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
child_policy_name_(std::move(child_policy_name)),
child_lb_config_str_(grpc_json_dump_to_string(child_lb_config, 0)),
child_lb_config_(grpc_json_parse_string(child_lb_config_str_.get())) {
child_lb_config_(std::move(child_lb_config)) {
GPR_ASSERT(child_policy_name_ != nullptr);
// Don't fetch service config, since this ctor is for use in nested LB
// policies, not at the top level, and we only fetch the service
@ -170,7 +169,6 @@ grpc_error* ResolvingLoadBalancingPolicy::Init(const grpc_channel_args& args) {
ResolvingLoadBalancingPolicy::~ResolvingLoadBalancingPolicy() {
GPR_ASSERT(resolver_ == nullptr);
GPR_ASSERT(lb_policy_ == nullptr);
grpc_json_destroy(child_lb_config_);
}
void ResolvingLoadBalancingPolicy::ShutdownLocked() {
@ -218,6 +216,9 @@ void ResolvingLoadBalancingPolicy::StartResolvingLocked() {
}
GPR_ASSERT(!started_resolving_);
started_resolving_ = true;
channel_control_helper()->UpdateState(
GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
UniquePtr<SubchannelPicker>(New<QueuePicker>(Ref())));
Ref().release();
resolver_->NextLocked(&resolver_result_, &on_resolver_result_changed_);
}
@ -403,7 +404,7 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
} else {
// Parse the resolver result.
const char* lb_policy_name = nullptr;
grpc_json* lb_policy_config = nullptr;
RefCountedPtr<Config> lb_policy_config;
bool service_config_changed = false;
if (self->process_resolver_result_ != nullptr) {
service_config_changed = self->process_resolver_result_(
@ -429,7 +430,8 @@ void ResolvingLoadBalancingPolicy::OnResolverResultChangedLocked(
gpr_log(GPR_INFO, "resolving_lb=%p: updating LB policy \"%s\" (%p)", self,
lb_policy_name, self->lb_policy_.get());
}
self->lb_policy_->UpdateLocked(*self->resolver_result_, lb_policy_config);
self->lb_policy_->UpdateLocked(*self->resolver_result_,
std::move(lb_policy_config));
// Add channel trace event.
if (self->channelz_node() != nullptr) {
if (service_config_changed) {

@ -56,17 +56,17 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
ResolvingLoadBalancingPolicy(Args args, TraceFlag* tracer,
UniquePtr<char> target_uri,
UniquePtr<char> child_policy_name,
grpc_json* child_lb_config, grpc_error** error);
RefCountedPtr<Config> child_lb_config,
grpc_error** error);
// Private ctor, to be used by client_channel only!
//
// Synchronous callback that takes the resolver result and sets
// lb_policy_name and lb_policy_config to point to the right data.
// Returns true if the service config has changed since the last result.
typedef bool (*ProcessResolverResultCallback)(void* user_data,
const grpc_channel_args& args,
const char** lb_policy_name,
grpc_json** lb_policy_config);
typedef bool (*ProcessResolverResultCallback)(
void* user_data, const grpc_channel_args& args,
const char** lb_policy_name, RefCountedPtr<Config>* lb_policy_config);
// If error is set when this returns, then construction failed, and
// the caller may not use the new object.
ResolvingLoadBalancingPolicy(
@ -80,7 +80,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
// TODO(roth): Need to support updating child LB policy's config for xds
// use case.
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {}
RefCountedPtr<Config> lb_config) override {}
void ExitIdleLocked() override;
@ -116,8 +116,7 @@ class ResolvingLoadBalancingPolicy : public LoadBalancingPolicy {
ProcessResolverResultCallback process_resolver_result_ = nullptr;
void* process_resolver_result_user_data_ = nullptr;
UniquePtr<char> child_policy_name_;
UniquePtr<char> child_lb_config_str_;
grpc_json* child_lb_config_ = nullptr;
RefCountedPtr<Config> child_lb_config_;
// Resolver and associated state.
OrphanablePtr<Resolver> resolver_;

@ -590,7 +590,7 @@ Subchannel::Subchannel(SubchannelKey* key, grpc_connector* connector,
const char* service_config_json = grpc_channel_arg_get_string(
grpc_channel_args_find(args_, GRPC_ARG_SERVICE_CONFIG));
if (service_config_json != nullptr) {
UniquePtr<ServiceConfig> service_config =
RefCountedPtr<ServiceConfig> service_config =
ServiceConfig::Create(service_config_json);
if (service_config != nullptr) {
HealthCheckParams params;

@ -319,7 +319,7 @@ static grpc_error* init_channel_elem(grpc_channel_element* elem,
grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG);
const char* service_config_str = grpc_channel_arg_get_string(channel_arg);
if (service_config_str != nullptr) {
grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
grpc_core::RefCountedPtr<grpc_core::ServiceConfig> service_config =
grpc_core::ServiceConfig::Create(service_config_str);
if (service_config != nullptr) {
chand->method_limit_table = service_config->CreateMethodConfigTable(

@ -1452,7 +1452,7 @@ static grpc_error* begin_parse_string(grpc_chttp2_hpack_parser* p,
uint8_t binary,
grpc_chttp2_hpack_parser_string* str) {
if (!p->huff && binary == NOT_BINARY &&
(end - cur) >= static_cast<intptr_t>(p->strlen) &&
static_cast<uint32_t>(end - cur) >= p->strlen &&
p->current_slice_refcount != nullptr) {
GRPC_STATS_INC_HPACK_RECV_UNCOMPRESSED();
str->copied = false;

@ -1032,6 +1032,11 @@ void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
}
} else {
if (error != GRPC_ERROR_NONE) {
// Consume any send message that was sent here but that we are not pushing
// to the other side
if (op->send_message) {
op->payload->send_message.send_message.reset();
}
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
if (op->payload->recv_initial_metadata.trailing_metadata_available !=

@ -53,8 +53,7 @@ void grpc_tracer_enable_flag(grpc_core::TraceFlag* flag);
class TraceFlag {
public:
TraceFlag(bool default_enabled, const char* name);
// TraceFlag needs to be trivially destructible since it is used as global
// variable.
// This needs to be trivially destructible as it is used as global variable.
~TraceFlag() = default;
const char* name() const { return name_; }

@ -23,6 +23,8 @@
#include <atomic>
#include <grpc/support/atm.h>
namespace grpc_core {
enum class MemoryOrder {

@ -47,27 +47,6 @@ class ThreadInternalsInterface {
class Thread {
public:
class Options {
public:
Options() : joinable_(true), tracked_(true) {}
/// Set whether the thread is joinable or detached.
Options& set_joinable(bool joinable) {
joinable_ = joinable;
return *this;
}
bool joinable() const { return joinable_; }
/// Set whether the thread is tracked for fork support.
Options& set_tracked(bool tracked) {
tracked_ = tracked;
return *this;
}
bool tracked() const { return tracked_; }
private:
bool joinable_;
bool tracked_;
};
/// Default constructor only to allow use in structs that lack constructors
/// Does not produce a validly-constructed thread; must later
/// use placement new to construct a real thread. Does not init mu_ and cv_
@ -78,17 +57,14 @@ class Thread {
/// with argument \a arg once it is started.
/// The optional \a success argument indicates whether the thread
/// is successfully created.
/// The optional \a options can be used to set the thread detachable.
Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success = nullptr, const Options& options = Options());
bool* success = nullptr);
/// Move constructor for thread. After this is called, the other thread
/// no longer represents a living thread object
Thread(Thread&& other)
: state_(other.state_), impl_(other.impl_), options_(other.options_) {
Thread(Thread&& other) : state_(other.state_), impl_(other.impl_) {
other.state_ = MOVED;
other.impl_ = nullptr;
other.options_ = Options();
}
/// Move assignment operator for thread. After this is called, the other
@ -103,37 +79,27 @@ class Thread {
// assert it for the time being.
state_ = other.state_;
impl_ = other.impl_;
options_ = other.options_;
other.state_ = MOVED;
other.impl_ = nullptr;
other.options_ = Options();
}
return *this;
}
/// The destructor is strictly optional; either the thread never came to life
/// and the constructor itself killed it, or it has already been joined and
/// the Join function kills it, or it was detached (non-joinable) and it has
/// run to completion and is now killing itself. The destructor shouldn't have
/// to do anything.
~Thread() { GPR_ASSERT(!options_.joinable() || impl_ == nullptr); }
/// and the constructor itself killed it or it has already been joined and
/// the Join function kills it. The destructor shouldn't have to do anything.
~Thread() { GPR_ASSERT(impl_ == nullptr); }
void Start() {
if (impl_ != nullptr) {
GPR_ASSERT(state_ == ALIVE);
state_ = STARTED;
impl_->Start();
// If the Thread is not joinable, then the impl_ will cause the deletion
// of this Thread object when the thread function completes. Since no
// other operation is allowed to a detached thread after Start, there is
// no need to change the value of the impl_ or state_ . The next operation
// on this object will be the deletion, which will trigger the destructor.
} else {
GPR_ASSERT(state_ == FAILED);
}
}
};
// It is only legal to call Join if the Thread is created as joinable.
void Join() {
if (impl_ != nullptr) {
impl_->Join();
@ -153,13 +119,12 @@ class Thread {
/// FAKE -- just a dummy placeholder Thread created by the default constructor
/// ALIVE -- an actual thread of control exists associated with this thread
/// STARTED -- the thread of control has been started
/// DONE -- the thread of control has completed and been joined/detached
/// DONE -- the thread of control has completed and been joined
/// FAILED -- the thread of control never came alive
/// MOVED -- contents were moved out and we're no longer tracking them
enum ThreadState { FAKE, ALIVE, STARTED, DONE, FAILED, MOVED };
ThreadState state_;
internal::ThreadInternalsInterface* impl_;
Options options_;
};
} // namespace grpc_core

@ -44,14 +44,13 @@ struct thd_arg {
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
const char* name; /* name of thread. Can be nullptr. */
bool joinable;
bool tracked;
};
class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
class ThreadInternalsPosix
: public grpc_core::internal::ThreadInternalsInterface {
public:
ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
void* arg, bool* success, const Thread::Options& options)
void* arg, bool* success)
: started_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
@ -64,20 +63,11 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
info->body = thd_body;
info->arg = arg;
info->name = thd_name;
info->joinable = options.joinable();
info->tracked = options.tracked();
if (options.tracked()) {
Fork::IncThreadCount();
}
grpc_core::Fork::IncThreadCount();
GPR_ASSERT(pthread_attr_init(&attr) == 0);
if (options.joinable()) {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
0);
} else {
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
0);
}
GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
0);
*success =
(pthread_create(&pthread_id_, &attr,
@ -107,14 +97,8 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
}
gpr_mu_unlock(&arg.thread->mu_);
if (!arg.joinable) {
Delete(arg.thread);
}
(*arg.body)(arg.arg);
if (arg.tracked) {
Fork::DecThreadCount();
}
grpc_core::Fork::DecThreadCount();
return nullptr;
},
info) == 0);
@ -124,11 +108,9 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
if (!(*success)) {
/* don't use gpr_free, as this was allocated using malloc (see above) */
free(info);
if (options.tracked()) {
Fork::DecThreadCount();
}
grpc_core::Fork::DecThreadCount();
}
}
};
~ThreadInternalsPosix() override {
gpr_mu_destroy(&mu_);
@ -154,15 +136,15 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
} // namespace
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success, const Options& options)
: options_(options) {
bool* success) {
bool outcome = false;
impl_ = New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome, options);
impl_ =
grpc_core::New<ThreadInternalsPosix>(thd_name, thd_body, arg, &outcome);
if (outcome) {
state_ = ALIVE;
} else {
state_ = FAILED;
Delete(impl_);
grpc_core::Delete(impl_);
impl_ = nullptr;
}

@ -46,7 +46,6 @@ struct thd_info {
void (*body)(void* arg); /* body of a thread */
void* arg; /* argument to a thread */
HANDLE join_event; /* the join event */
bool joinable; /* whether it is joinable */
};
thread_local struct thd_info* g_thd_info;
@ -54,8 +53,7 @@ thread_local struct thd_info* g_thd_info;
class ThreadInternalsWindows
: public grpc_core::internal::ThreadInternalsInterface {
public:
ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success,
const grpc_core::Thread::Options& options)
ThreadInternalsWindows(void (*thd_body)(void* arg), void* arg, bool* success)
: started_(false) {
gpr_mu_init(&mu_);
gpr_cv_init(&ready_);
@ -65,23 +63,20 @@ class ThreadInternalsWindows
info_->thread = this;
info_->body = thd_body;
info_->arg = arg;
info_->join_event = nullptr;
info_->joinable = options.joinable();
if (info_->joinable) {
info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info_->join_event == nullptr) {
gpr_free(info_);
*success = false;
return;
}
}
handle = CreateThread(nullptr, 64 * 1024, thread_body, info_, 0, nullptr);
if (handle == nullptr) {
destroy_thread();
info_->join_event = CreateEvent(nullptr, FALSE, FALSE, nullptr);
if (info_->join_event == nullptr) {
gpr_free(info_);
*success = false;
} else {
CloseHandle(handle);
*success = true;
handle = CreateThread(nullptr, 64 * 1024, thread_body, info_, 0, nullptr);
if (handle == nullptr) {
destroy_thread();
*success = false;
} else {
CloseHandle(handle);
*success = true;
}
}
}
@ -112,24 +107,14 @@ class ThreadInternalsWindows
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(&g_thd_info->thread->mu_);
if (!g_thd_info->joinable) {
grpc_core::Delete(g_thd_info->thread);
g_thd_info->thread = nullptr;
}
g_thd_info->body(g_thd_info->arg);
if (g_thd_info->joinable) {
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
} else {
gpr_free(g_thd_info);
}
BOOL ret = SetEvent(g_thd_info->join_event);
GPR_ASSERT(ret);
return 0;
}
void destroy_thread() {
if (info_ != nullptr && info_->joinable) {
CloseHandle(info_->join_event);
}
CloseHandle(info_->join_event);
gpr_free(info_);
}
@ -144,15 +129,14 @@ class ThreadInternalsWindows
namespace grpc_core {
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success, const Options& options)
: options_(options) {
bool* success) {
bool outcome = false;
impl_ = New<ThreadInternalsWindows>(thd_body, arg, &outcome, options);
impl_ = grpc_core::New<ThreadInternalsWindows>(thd_body, arg, &outcome);
if (outcome) {
state_ = ALIVE;
} else {
state_ = FAILED;
Delete(impl_);
grpc_core::Delete(impl_);
impl_ = nullptr;
}

@ -33,7 +33,6 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/mutex_lock.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
@ -62,15 +61,10 @@ extern void grpc_register_built_in_plugins(void);
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
static gpr_cv* g_shutting_down_cv;
static bool g_shutting_down;
static void do_basic_init(void) {
gpr_log_verbosity_init();
gpr_mu_init(&g_init_mu);
g_shutting_down_cv = static_cast<gpr_cv*>(malloc(sizeof(gpr_cv)));
gpr_cv_init(g_shutting_down_cv);
g_shutting_down = false;
grpc_register_built_in_plugins();
grpc_cq_global_init();
g_initializations = 0;
@ -124,12 +118,8 @@ void grpc_init(void) {
int i;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
if (g_shutting_down) {
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
}
grpc_core::Fork::GlobalInit();
grpc_fork_handlers_auto_register();
gpr_time_init();
@ -160,88 +150,50 @@ void grpc_init(void) {
grpc_channel_init_finalize();
grpc_iomgr_start();
}
gpr_mu_unlock(&g_init_mu);
GRPC_API_TRACE("grpc_init(void)", 0, ());
}
void grpc_shutdown_internal_locked(void) {
void grpc_shutdown(void) {
int i;
{
grpc_core::ExecCtx exec_ctx(0);
grpc_iomgr_shutdown_background_closure();
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
{
grpc_timer_manager_set_threading(false); // shutdown timer_manager thread
grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy();
grpc_core::ExecCtx exec_ctx(0);
grpc_iomgr_shutdown_background_closure();
{
grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread
grpc_core::Executor::ShutdownAll();
for (i = g_number_of_plugins; i >= 0; i--) {
if (g_all_of_the_plugins[i].destroy != nullptr) {
g_all_of_the_plugins[i].destroy();
}
}
}
grpc_iomgr_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_mdctx_global_shutdown();
grpc_core::HandshakerRegistry::Shutdown();
grpc_slice_intern_shutdown();
grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
grpc_iomgr_shutdown();
gpr_timers_global_destroy();
grpc_tracer_shutdown();
grpc_mdctx_global_shutdown();
grpc_core::HandshakerRegistry::Shutdown();
grpc_slice_intern_shutdown();
grpc_core::channelz::ChannelzRegistry::Shutdown();
grpc_stats_shutdown();
grpc_core::Fork::GlobalShutdown();
}
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
g_shutting_down = false;
gpr_cv_broadcast(g_shutting_down_cv);
}
void grpc_shutdown_internal(void* ignored) {
GRPC_API_TRACE("grpc_shutdown_internal", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
// We have released lock from the shutdown thread and it is possible that
// another grpc_init has been called, and do nothing if that is the case.
if (--g_initializations != 0) {
return;
}
grpc_shutdown_internal_locked();
}
void grpc_shutdown(void) {
GRPC_API_TRACE("grpc_shutdown(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_initializations++;
g_shutting_down = true;
// spawn a detached thread to do the actual clean up in case we are
// currently in an executor thread.
grpc_core::Thread cleanup_thread(
"grpc_shutdown", grpc_shutdown_internal, nullptr, nullptr,
grpc_core::Thread::Options().set_joinable(false).set_tracked(false));
cleanup_thread.Start();
}
}
void grpc_shutdown_blocking(void) {
GRPC_API_TRACE("grpc_shutdown_blocking(void)", 0, ());
grpc_core::MutexLock lock(&g_init_mu);
if (--g_initializations == 0) {
g_shutting_down = true;
grpc_shutdown_internal_locked();
grpc_core::ExecCtx::GlobalShutdown();
grpc_core::ApplicationCallbackExecCtx::GlobalShutdown();
}
gpr_mu_unlock(&g_init_mu);
}
int grpc_is_initialized(void) {
int r;
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
gpr_mu_lock(&g_init_mu);
r = g_initializations > 0;
gpr_mu_unlock(&g_init_mu);
return r;
}
void grpc_maybe_wait_for_async_shutdown(void) {
gpr_once_init(&g_basic_init, do_basic_init);
grpc_core::MutexLock lock(&g_init_mu);
while (g_shutting_down) {
gpr_cv_wait(g_shutting_down_cv, &g_init_mu,
gpr_inf_future(GPR_CLOCK_REALTIME));
}
}

@ -22,6 +22,5 @@
void grpc_register_security_filters(void);
void grpc_security_pre_init(void);
void grpc_security_init(void);
void grpc_maybe_wait_for_async_shutdown(void);
#endif /* GRPC_CORE_LIB_SURFACE_INIT_H */

@ -33,14 +33,14 @@
namespace grpc_core {
UniquePtr<ServiceConfig> ServiceConfig::Create(const char* json) {
RefCountedPtr<ServiceConfig> ServiceConfig::Create(const char* json) {
UniquePtr<char> json_string(gpr_strdup(json));
grpc_json* json_tree = grpc_json_parse_string(json_string.get());
if (json_tree == nullptr) {
gpr_log(GPR_INFO, "failed to parse JSON for service config");
return nullptr;
}
return MakeUnique<ServiceConfig>(std::move(json_string), json_tree);
return MakeRefCounted<ServiceConfig>(std::move(json_string), json_tree);
}
ServiceConfig::ServiceConfig(UniquePtr<char> json_string, grpc_json* json_tree)

@ -23,6 +23,7 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/slice/slice_hash_table.h"
@ -41,8 +42,7 @@
// }
// ],
// // remaining fields are optional.
// // see
// https://developers.google.com/protocol-buffers/docs/proto3#json
// // see https://developers.google.com/protocol-buffers/docs/proto3#json
// // for format details.
// "waitForReady": bool,
// "timeout": "duration_string",
@ -54,11 +54,11 @@
namespace grpc_core {
class ServiceConfig {
class ServiceConfig : public RefCounted<ServiceConfig> {
public:
/// Creates a new service config from parsing \a json_string.
/// Returns null on parse error.
static UniquePtr<ServiceConfig> Create(const char* json);
static RefCountedPtr<ServiceConfig> Create(const char* json);
~ServiceConfig();

@ -138,7 +138,7 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
internal::Call call_;
internal::ServerReactor* reactor_;
internal::ServerReactor* const reactor_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
@ -200,12 +200,17 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
cancelled_ = 1;
}
if (cancelled_ && (reactor_ != nullptr)) {
// Decide whether to call the cancel callback before releasing the lock
bool call_cancel = (cancelled_ != 0);
// Release the lock since we are going to be calling a callback and
// interceptors now
lock.unlock();
if (call_cancel && (reactor_ != nullptr)) {
reactor_->OnCancel();
}
/* Release the lock since we are going to be running through interceptors now
*/
lock.unlock();
/* Add interception point and run through interceptors */
interceptor_methods_.AddInterceptionHookPoint(
experimental::InterceptionHookPoints::POST_RECV_CLOSE);

@ -17,6 +17,7 @@
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
@ -27,6 +28,8 @@ namespace Grpc.Core
/// </summary>
public abstract class ServerCallContext
{
private Dictionary<object, object> userState;
/// <summary>
/// Creates a new instance of <c>ServerCallContext</c>.
/// </summary>
@ -113,6 +116,12 @@ namespace Grpc.Core
/// </summary>
public AuthContext AuthContext => AuthContextCore;
/// <summary>
/// Gets a dictionary that can be used by the various interceptors and handlers of this
/// call to store arbitrary state.
/// </summary>
public IDictionary<object, object> UserState => UserStateCore;
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract Task WriteResponseHeadersAsyncCore(Metadata responseHeaders);
/// <summary>Provides implementation of a non-virtual public member.</summary>
@ -135,7 +144,20 @@ namespace Grpc.Core
protected abstract Status StatusCore { get; set; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract WriteOptions WriteOptionsCore { get; set; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected abstract AuthContext AuthContextCore { get; }
/// <summary>Provides implementation of a non-virtual public member.</summary>
protected virtual IDictionary<object, object> UserStateCore
{
get
{
if (userState == null)
{
userState = new Dictionary<object, object>();
}
return userState;
}
}
}
}

@ -77,6 +77,53 @@ namespace Grpc.Core.Interceptors.Tests
Assert.AreEqual("CB1B2B3A", stringBuilder.ToString());
}
[Test]
public void UserStateVisibleToAllInterceptors()
{
object key1 = new object();
object value1 = new object();
const string key2 = "Interceptor #2";
const string value2 = "Important state";
var interceptor1 = new ServerCallContextInterceptor(ctx => {
// state starts off empty
Assert.AreEqual(0, ctx.UserState.Count);
ctx.UserState.Add(key1, value1);
});
var interceptor2 = new ServerCallContextInterceptor(ctx => {
// second interceptor can see state set by the first
bool found = ctx.UserState.TryGetValue(key1, out object storedValue1);
Assert.IsTrue(found);
Assert.AreEqual(value1, storedValue1);
ctx.UserState.Add(key2, value2);
});
var helper = new MockServiceHelper(Host);
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => {
// call handler can see all the state
bool found = context.UserState.TryGetValue(key1, out object storedValue1);
Assert.IsTrue(found);
Assert.AreEqual(value1, storedValue1);
found = context.UserState.TryGetValue(key2, out object storedValue2);
Assert.IsTrue(found);
Assert.AreEqual(value2, storedValue2);
return Task.FromResult("PASS");
});
helper.ServiceDefinition = helper.ServiceDefinition
.Intercept(interceptor2)
.Intercept(interceptor1);
var server = helper.GetServer();
server.Start();
var channel = helper.GetChannel();
Assert.AreEqual("PASS", Calls.BlockingUnaryCall(helper.CreateUnaryCall(), ""));
}
[Test]
public void CheckNullInterceptorRegistrationFails()
{

@ -361,7 +361,7 @@ PHP_MSHUTDOWN_FUNCTION(grpc) {
zend_hash_destroy(&grpc_target_upper_bound_map);
grpc_shutdown_timeval(TSRMLS_C);
grpc_php_shutdown_completion_queue(TSRMLS_C);
grpc_shutdown_blocking();
grpc_shutdown();
GRPC_G(initialized) = 0;
}
return SUCCESS;

@ -212,50 +212,39 @@ class BuildExt(build_ext.build_ext):
LINK_OPTIONS = {}
def build_extensions(self):
def compiler_ok_with_extra_std():
"""Test if default compiler is okay with specifying c++ version
when invoked in C mode. GCC is okay with this, while clang is not.
"""
cc_test = subprocess.Popen(
['cc', '-x', 'c', '-std=c++11', '-'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
_, cc_err = cc_test.communicate(input='int main(){return 0;}')
return not 'invalid argument' in str(cc_err)
# This special conditioning is here due to difference of compiler
# behavior in gcc and clang. The clang doesn't take --stdc++11
# flags but gcc does. Since the setuptools of Python only support
# all C or all C++ compilation, the mix of C and C++ will crash.
# *By default*, the macOS use clang and Linux use gcc, that's why
# the special condition here is checking platform.
if "darwin" in sys.platform:
config = os.environ.get('CONFIG', 'opt')
target_path = os.path.abspath(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), '..', '..',
'..', 'libs', config))
targets = [
os.path.join(target_path, 'libboringssl.a'),
os.path.join(target_path, 'libares.a'),
os.path.join(target_path, 'libgpr.a'),
os.path.join(target_path, 'libgrpc.a')
]
# Running make separately for Mac means we lose all
# Extension.define_macros configured in setup.py. Re-add the macro
# for gRPC Core's fork handlers.
# TODO(ericgribkoff) Decide what to do about the other missing core
# macros, including GRPC_ENABLE_FORK_SUPPORT, which defaults to 1
# on Linux but remains unset on Mac.
extra_defines = [
'EXTRA_DEFINES="GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK=1"'
]
# Ensure the BoringSSL are built instead of using system provided
# libraries. It prevents dependency issues while distributing to
# Mac users who use MacPorts to manage their libraries. #17002
mod_env = dict(os.environ)
mod_env['REQUIRE_CUSTOM_LIBRARIES_opt'] = '1'
make_process = subprocess.Popen(
['make'] + extra_defines + targets,
env=mod_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
make_out, make_err = make_process.communicate()
if make_out and make_process.returncode != 0:
sys.stdout.write(str(make_out) + '\n')
if make_err:
sys.stderr.write(str(make_err) + '\n')
if make_process.returncode != 0:
raise Exception("make command failed!")
# *By default*, macOS and FreBSD use clang and Linux use gcc
#
# If we are not using a permissive compiler that's OK with being
# passed wrong std flags, swap out compile function by adding a filter
# for it.
if not compiler_ok_with_extra_std():
old_compile = self.compiler._compile
def new_compile(obj, src, ext, cc_args, extra_postargs, pp_opts):
if src[-2:] == '.c':
extra_postargs = [
arg for arg in extra_postargs if not '-std=c++' in arg
]
return old_compile(obj, src, ext, cc_args, extra_postargs,
pp_opts)
self.compiler._compile = new_compile
compiler = self.compiler.compiler_type
if compiler in BuildExt.C_OPTIONS:

@ -15,9 +15,11 @@ py_library(
"//src/python/grpcio/grpc/_cython:cygrpc",
"//src/python/grpcio/grpc/experimental",
"//src/python/grpcio/grpc/framework",
requirement('enum34'),
requirement('six'),
],
] + select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
data = [
"//:grpc",
],

@ -87,7 +87,7 @@ cdef class Call:
def __dealloc__(self):
if self.c_call != NULL:
grpc_call_unref(self.c_call)
grpc_shutdown_blocking()
grpc_shutdown()
# The object *should* always be valid from Python. Used for debugging.
@property

@ -399,7 +399,7 @@ cdef _close(Channel channel, grpc_status_code code, object details,
_destroy_c_completion_queue(state.c_connectivity_completion_queue)
grpc_channel_destroy(state.c_channel)
state.c_channel = NULL
grpc_shutdown_blocking()
grpc_shutdown()
state.condition.notify_all()
else:
# Another call to close already completed in the past or is currently

@ -118,4 +118,4 @@ cdef class CompletionQueue:
self.c_completion_queue, c_deadline, NULL)
self._interpret_event(event)
grpc_completion_queue_destroy(self.c_completion_queue)
grpc_shutdown_blocking()
grpc_shutdown()

@ -61,7 +61,7 @@ cdef int _get_metadata(
cdef void _destroy(void *state) with gil:
cpython.Py_DECREF(<object>state)
grpc_shutdown_blocking()
grpc_shutdown()
cdef class MetadataPluginCallCredentials(CallCredentials):
@ -125,7 +125,7 @@ cdef class SSLSessionCacheLRU:
def __dealloc__(self):
if self._cache != NULL:
grpc_ssl_session_cache_destroy(self._cache)
grpc_shutdown_blocking()
grpc_shutdown()
cdef class SSLChannelCredentials(ChannelCredentials):
@ -191,7 +191,7 @@ cdef class ServerCertificateConfig:
def __dealloc__(self):
grpc_ssl_server_certificate_config_destroy(self.c_cert_config)
gpr_free(self.c_ssl_pem_key_cert_pairs)
grpc_shutdown_blocking()
grpc_shutdown()
cdef class ServerCredentials:
@ -207,7 +207,7 @@ cdef class ServerCredentials:
def __dealloc__(self):
if self.c_credentials != NULL:
grpc_server_credentials_release(self.c_credentials)
grpc_shutdown_blocking()
grpc_shutdown()
cdef const char* _get_c_pem_root_certs(pem_root_certs):
if pem_root_certs is None:

@ -319,7 +319,7 @@ cdef extern from "grpc/grpc.h":
grpc_op_data data
void grpc_init() nogil
void grpc_shutdown_blocking() nogil
void grpc_shutdown() nogil
int grpc_is_initialized() nogil
ctypedef struct grpc_completion_queue_factory:

@ -134,7 +134,7 @@ cdef class CallDetails:
def __dealloc__(self):
with nogil:
grpc_call_details_destroy(&self.c_details)
grpc_shutdown_blocking()
grpc_shutdown()
@property
def method(self):

@ -151,4 +151,4 @@ cdef class Server:
def __dealloc__(self):
if self.c_server == NULL:
grpc_shutdown_blocking()
grpc_shutdown()

@ -19,6 +19,7 @@ import logging
import threading
import time
from concurrent import futures
import six
import grpc
@ -565,8 +566,8 @@ def _send_message_callback_to_blocking_iterator_adapter(
def _select_thread_pool_for_behavior(behavior, default_thread_pool):
if hasattr(behavior, 'experimental_thread_pool'
) and behavior.experimental_thread_pool is not None:
if hasattr(behavior, 'experimental_thread_pool') and isinstance(
behavior.experimental_thread_pool, futures.ThreadPoolExecutor):
return behavior.experimental_thread_pool
else:
return default_thread_pool

@ -13,15 +13,17 @@ py_library(
py_library(
name = "cardinality",
srcs = ["cardinality.py"],
deps = [
requirement("enum34"),
],
deps = select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)
py_library(
name = "style",
srcs = ["style.py"],
deps = [
requirement("enum34"),
],
deps = select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)

@ -23,9 +23,11 @@ py_library(
name = "callable_util",
srcs = ["callable_util.py"],
deps = [
requirement("enum34"),
requirement("six"),
],
] + select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)
py_library(
@ -39,9 +41,10 @@ py_library(
py_library(
name = "logging_pool",
srcs = ["logging_pool.py"],
deps = [
requirement("futures"),
],
deps = select({
"//conditions:default": [requirement('futures'),],
"//:python3": [],
}),
)
py_library(

@ -15,15 +15,18 @@ py_library(
srcs = ["base.py"],
deps = [
"//src/python/grpcio/grpc/framework/foundation:abandonment",
requirement("enum34"),
requirement("six"),
],
] + select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)
py_library(
name = "utilities",
srcs = ["utilities.py"],
deps = [
requirement("enum34"),
],
deps = select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)

@ -16,9 +16,11 @@ py_library(
deps = [
"//src/python/grpcio/grpc/framework/foundation",
"//src/python/grpcio/grpc/framework/common",
requirement("enum34"),
requirement("six"),
],
] + select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
)
py_library(

@ -17,11 +17,6 @@ import collections
import grpc
# TODO(https://github.com/bazelbuild/bazel/issues/6844)
# Due to Bazel issue, the namespace packages won't resolve correctly.
# Adding this unused-import as a workaround to avoid module-not-found error
# under Bazel builds.
import google.protobuf # pylint: disable=unused-import
from google.rpc import status_pb2
_CODE_TO_GRPC_CODE_MAPPING = {x.value[0]: x for x in grpc.StatusCode}

@ -0,0 +1,8 @@
py_library(
name = "bazel_namespace_package_hack",
srcs = ["bazel_namespace_package_hack.py"],
visibility = [
"//src/python/grpcio_tests/tests/status:__subpackages__",
"//src/python/grpcio_tests/tests/interop:__subpackages__",
],
)

@ -0,0 +1,32 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import site
import sys
# TODO(https://github.com/bazelbuild/bazel/issues/6844) Bazel failed to
# interpret namespace packages correctly. This monkey patch will force the
# Python process to parse the .pth file in the sys.path to resolve namespace
# package in the right place.
# Analysis in depth: https://github.com/bazelbuild/rules_python/issues/55
def sys_path_to_site_dir_hack():
"""Add valid sys.path item to site directory to parse the .pth files."""
for item in sys.path:
if os.path.exists(item):
# The only difference between sys.path and site-directory is
# whether the .pth file will be parsed or not. A site-directory
# will always exist in sys.path, but not another way around.
site.addsitedir(item)

@ -29,17 +29,20 @@ py_library(
srcs = ["methods.py"],
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests:bazel_namespace_package_hack",
"//src/proto/grpc/testing:py_empty_proto",
"//src/proto/grpc/testing:py_messages_proto",
"//src/proto/grpc/testing:py_test_proto",
requirement('google-auth'),
requirement('requests'),
requirement('enum34'),
requirement('urllib3'),
requirement('chardet'),
requirement('certifi'),
requirement('idna'),
],
] + select({
"//conditions:default": [requirement('enum34'),],
"//:python3": [],
}),
imports=["../../",],
)

@ -13,6 +13,14 @@
# limitations under the License.
"""Implementations of interoperability test methods."""
# NOTE(lidiz) This module only exists in Bazel BUILD file, for more details
# please refer to comments in the "bazel_namespace_package_hack" module.
try:
from tests import bazel_namespace_package_hack
bazel_namespace_package_hack.sys_path_to_site_dir_hack()
except ImportError:
pass
import enum
import json
import os

@ -50,6 +50,16 @@ def _file_descriptor_to_proto(descriptor):
class ReflectionServicerTest(unittest.TestCase):
# TODO(https://github.com/grpc/grpc/issues/17844)
# Bazel + Python 3 will result in creating two different instance of
# DESCRIPTOR for each message. So, the equal comparison between protobuf
# returned by stub and manually crafted protobuf will always fail.
def _assert_sequence_of_proto_equal(self, x, y):
self.assertSequenceEqual(
tuple(proto.SerializeToString() for proto in x),
tuple(proto.SerializeToString() for proto in y),
)
def setUp(self):
self._server = test_common.test_server()
reflection.enable_server_reflection(_SERVICE_NAMES, self._server)
@ -84,7 +94,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
self._assert_sequence_of_proto_equal(expected_responses, responses)
def testFileBySymbol(self):
requests = (
@ -108,7 +118,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
self._assert_sequence_of_proto_equal(expected_responses, responses)
def testFileContainingExtension(self):
requests = (
@ -137,7 +147,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
self._assert_sequence_of_proto_equal(expected_responses, responses)
def testExtensionNumbersOfType(self):
requests = (
@ -162,7 +172,7 @@ class ReflectionServicerTest(unittest.TestCase):
error_message=grpc.StatusCode.NOT_FOUND.value[1].encode(),
)),
)
self.assertSequenceEqual(expected_responses, responses)
self._assert_sequence_of_proto_equal(expected_responses, responses)
def testListServices(self):
requests = (reflection_pb2.ServerReflectionRequest(list_services='',),)
@ -173,7 +183,7 @@ class ReflectionServicerTest(unittest.TestCase):
service=tuple(
reflection_pb2.ServiceResponse(name=name)
for name in _SERVICE_NAMES))),)
self.assertSequenceEqual(expected_responses, responses)
self._assert_sequence_of_proto_equal(expected_responses, responses)
def testReflectionServiceName(self):
self.assertEqual(reflection.SERVICE_NAME,

@ -10,6 +10,7 @@ py_test(
deps = [
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_status/grpc_status:grpc_status",
"//src/python/grpcio_tests/tests:bazel_namespace_package_hack",
"//src/python/grpcio_tests/tests/unit:test_common",
"//src/python/grpcio_tests/tests/unit/framework/common:common",
requirement('protobuf'),

@ -13,6 +13,14 @@
# limitations under the License.
"""Tests of grpc_status."""
# NOTE(lidiz) This module only exists in Bazel BUILD file, for more details
# please refer to comments in the "bazel_namespace_package_hack" module.
try:
from tests import bazel_namespace_package_hack
bazel_namespace_package_hack.sys_path_to_site_dir_hack()
except ImportError:
pass
import unittest
import logging

@ -115,6 +115,7 @@ class AbortTest(unittest.TestCase):
# on Python 3 (via the `__traceback__` attribute) holds a reference to
# all local vars. Storing the raised exception can prevent GC and stop the
# grpc_call from being unref'ed, even after server shutdown.
@unittest.skip("https://github.com/grpc/grpc/issues/17927")
def test_abort_does_not_leak_local_vars(self):
global do_not_leak_me # pylint: disable=global-statement
weak_ref = weakref.ref(do_not_leak_me)

@ -16,7 +16,7 @@ import threading
from concurrent import futures
class RecordingThreadPool(futures.Executor):
class RecordingThreadPool(futures.ThreadPoolExecutor):
"""A thread pool that records if used."""
def __init__(self, max_workers):

@ -39,7 +39,6 @@ grpc_register_plugin_type grpc_register_plugin_import;
grpc_init_type grpc_init_import;
grpc_shutdown_type grpc_shutdown_import;
grpc_is_initialized_type grpc_is_initialized_import;
grpc_shutdown_blocking_type grpc_shutdown_blocking_import;
grpc_version_string_type grpc_version_string_import;
grpc_g_stands_for_type grpc_g_stands_for_import;
grpc_completion_queue_factory_lookup_type grpc_completion_queue_factory_lookup_import;
@ -307,7 +306,6 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_init_import = (grpc_init_type) GetProcAddress(library, "grpc_init");
grpc_shutdown_import = (grpc_shutdown_type) GetProcAddress(library, "grpc_shutdown");
grpc_is_initialized_import = (grpc_is_initialized_type) GetProcAddress(library, "grpc_is_initialized");
grpc_shutdown_blocking_import = (grpc_shutdown_blocking_type) GetProcAddress(library, "grpc_shutdown_blocking");
grpc_version_string_import = (grpc_version_string_type) GetProcAddress(library, "grpc_version_string");
grpc_g_stands_for_import = (grpc_g_stands_for_type) GetProcAddress(library, "grpc_g_stands_for");
grpc_completion_queue_factory_lookup_import = (grpc_completion_queue_factory_lookup_type) GetProcAddress(library, "grpc_completion_queue_factory_lookup");

@ -92,9 +92,6 @@ extern grpc_shutdown_type grpc_shutdown_import;
typedef int(*grpc_is_initialized_type)(void);
extern grpc_is_initialized_type grpc_is_initialized_import;
#define grpc_is_initialized grpc_is_initialized_import
typedef void(*grpc_shutdown_blocking_type)(void);
extern grpc_shutdown_blocking_type grpc_shutdown_blocking_import;
#define grpc_shutdown_blocking grpc_shutdown_blocking_import
typedef const char*(*grpc_version_string_type)(void);
extern grpc_version_string_type grpc_version_string_import;
#define grpc_version_string grpc_version_string_import

@ -18,7 +18,6 @@
#include <cstring>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
@ -282,7 +281,7 @@ int main(int argc, char** argv) {
grpc_core::ExecCtx exec_ctx;
GRPC_COMBINER_UNREF(g_combiner, "test");
}
grpc_shutdown_blocking();
grpc_shutdown();
GPR_ASSERT(g_all_callbacks_invoked);
return 0;
}

@ -1200,6 +1200,6 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_resource_quota_unref(g_resource_quota);
grpc_shutdown_blocking();
grpc_shutdown();
return 0;
}

@ -40,8 +40,9 @@ static void dont_log(gpr_log_func_args* args) {}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_test_only_set_slice_hash_seed(0);
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
grpc_core::testing::LeakDetector leak_detector(leak_check);
if (leak_check) grpc_memory_counters_init();
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -158,6 +159,11 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_byte_buffer_destroy(response_payload_recv);
}
}
grpc_shutdown_blocking();
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -37,8 +37,9 @@ static void dont_log(gpr_log_func_args* args) {}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_test_only_set_slice_hash_seed(0);
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
grpc_core::testing::LeakDetector leak_detector(leak_check);
if (leak_check) grpc_memory_counters_init();
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -135,5 +136,10 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_completion_queue_destroy(cq);
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -83,6 +83,6 @@ int main(int argc, char* argv[]) {
UniquePtr<HandshakerFactory>(New<ReadAheadHandshakerFactory>()));
const char* full_alpn_list[] = {"grpc-exp", "h2"};
GPR_ASSERT(server_ssl_test(full_alpn_list, 2, "grpc-exp"));
grpc_shutdown_blocking();
grpc_shutdown();
return 0;
}

@ -323,11 +323,7 @@ static bool mock_ipv6_disabled_source_addr_factory_get_source_addr(
}
void mock_ipv6_disabled_source_addr_factory_destroy(
address_sorting_source_addr_factory* factory) {
mock_ipv6_disabled_source_addr_factory* f =
reinterpret_cast<mock_ipv6_disabled_source_addr_factory*>(factory);
gpr_free(f);
}
address_sorting_source_addr_factory* factory) {}
const address_sorting_source_addr_factory_vtable
kMockIpv6DisabledSourceAddrFactoryVtable = {
@ -394,11 +390,9 @@ int main(int argc, char** argv) {
// Run a test case in which c-ares's address sorter
// thinks that IPv4 is available and IPv6 isn't.
grpc_init();
mock_ipv6_disabled_source_addr_factory* factory =
static_cast<mock_ipv6_disabled_source_addr_factory*>(
gpr_malloc(sizeof(mock_ipv6_disabled_source_addr_factory)));
factory->base.vtable = &kMockIpv6DisabledSourceAddrFactoryVtable;
address_sorting_override_source_addr_factory_for_testing(&factory->base);
mock_ipv6_disabled_source_addr_factory factory;
factory.base.vtable = &kMockIpv6DisabledSourceAddrFactoryVtable;
address_sorting_override_source_addr_factory_for_testing(&factory.base);
test_localhost_result_has_ipv4_first_when_ipv6_isnt_available();
grpc_shutdown();
}

@ -31,7 +31,8 @@ bool leak_check = true;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
char* s;
grpc_core::testing::LeakDetector leak_detector(true);
struct grpc_memory_counters counters;
grpc_memory_counters_init();
s = static_cast<char*>(gpr_malloc(size));
memcpy(s, data, size);
grpc_json* x;
@ -39,5 +40,8 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_json_destroy(x);
}
gpr_free(s);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
return 0;
}

@ -285,7 +285,7 @@ int main(int argc, char** argv) {
grpc_slice_unref(slice);
grpc_completion_queue_destroy(cq);
grpc_shutdown_blocking();
grpc_shutdown();
gpr_log(GPR_INFO, "---------client stats--------");
gpr_log(

@ -318,7 +318,7 @@ int main(int argc, char** argv) {
grpc_server_destroy(server);
grpc_completion_queue_destroy(cq);
grpc_shutdown_blocking();
grpc_shutdown();
grpc_memory_counters_destroy();
return 0;
}

@ -66,7 +66,10 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
gpr_set_log_function(dont_log);
}
gpr_free(grpc_trace_fuzzer);
grpc_core::testing::LeakDetector leak_detector(leak_check);
struct grpc_memory_counters counters;
if (leak_check) {
grpc_memory_counters_init();
}
input_stream inp = {data, data + size};
grpc_init();
bool is_on_gcp = grpc_alts_is_running_on_gcp();
@ -108,5 +111,10 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
gpr_free(handshaker_service_url);
}
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -52,8 +52,9 @@ static void on_handshake_done(void* arg, grpc_error* error) {
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
struct grpc_memory_counters counters;
if (squelch) gpr_set_log_function(dont_log);
grpc_core::testing::LeakDetector leak_detector(leak_check);
if (leak_check) grpc_memory_counters_init();
grpc_init();
{
grpc_core::ExecCtx exec_ctx;
@ -117,6 +118,11 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
grpc_core::ExecCtx::Get()->Flush();
}
grpc_shutdown_blocking();
grpc_shutdown();
if (leak_check) {
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
GPR_ASSERT(counters.total_size_relative == 0);
}
return 0;
}

@ -31,23 +31,24 @@ bool squelch = true;
bool leak_check = true;
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
struct grpc_memory_counters counters;
grpc_init();
{
grpc_core::testing::LeakDetector leak_detector(true);
grpc_slice input = grpc_slice_from_copied_buffer((const char*)data, size);
grpc_slice output;
if (grpc_strict_percent_decode_slice(
input, grpc_url_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
}
if (grpc_strict_percent_decode_slice(
input, grpc_compatible_percent_encoding_unreserved_bytes,
&output)) {
grpc_slice_unref(output);
}
grpc_slice_unref(grpc_permissive_percent_decode_slice(input));
grpc_slice_unref(input);
grpc_memory_counters_init();
grpc_slice input = grpc_slice_from_copied_buffer((const char*)data, size);
grpc_slice output;
if (grpc_strict_percent_decode_slice(
input, grpc_url_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
}
grpc_shutdown_blocking();
if (grpc_strict_percent_decode_slice(
input, grpc_compatible_percent_encoding_unreserved_bytes, &output)) {
grpc_slice_unref(output);
}
grpc_slice_unref(grpc_permissive_percent_decode_slice(input));
grpc_slice_unref(input);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
grpc_shutdown();
GPR_ASSERT(counters.total_size_relative == 0);
return 0;
}

@ -31,26 +31,28 @@ bool squelch = true;
bool leak_check = true;
static void test(const uint8_t* data, size_t size, const uint8_t* dict) {
struct grpc_memory_counters counters;
grpc_init();
{
grpc_core::testing::LeakDetector leak_detector(true);
grpc_slice input = grpc_slice_from_copied_buffer(
reinterpret_cast<const char*>(data), size);
grpc_slice output = grpc_percent_encode_slice(input, dict);
grpc_slice decoded_output;
// encoder must always produce decodable output
GPR_ASSERT(grpc_strict_percent_decode_slice(output, dict, &decoded_output));
grpc_slice permissive_decoded_output =
grpc_permissive_percent_decode_slice(output);
// and decoded output must always match the input
GPR_ASSERT(grpc_slice_eq(input, decoded_output));
GPR_ASSERT(grpc_slice_eq(input, permissive_decoded_output));
grpc_slice_unref(input);
grpc_slice_unref(output);
grpc_slice_unref(decoded_output);
grpc_slice_unref(permissive_decoded_output);
}
grpc_shutdown_blocking();
grpc_memory_counters_init();
grpc_slice input =
grpc_slice_from_copied_buffer(reinterpret_cast<const char*>(data), size);
grpc_slice output = grpc_percent_encode_slice(input, dict);
grpc_slice decoded_output;
// encoder must always produce decodable output
GPR_ASSERT(grpc_strict_percent_decode_slice(output, dict, &decoded_output));
grpc_slice permissive_decoded_output =
grpc_permissive_percent_decode_slice(output);
// and decoded output must always match the input
GPR_ASSERT(grpc_slice_eq(input, decoded_output));
GPR_ASSERT(grpc_slice_eq(input, permissive_decoded_output));
grpc_slice_unref(input);
grpc_slice_unref(output);
grpc_slice_unref(decoded_output);
grpc_slice_unref(permissive_decoded_output);
counters = grpc_memory_counters_snapshot();
grpc_memory_counters_destroy();
grpc_shutdown();
GPR_ASSERT(counters.total_size_relative == 0);
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {

@ -18,9 +18,6 @@
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/surface/init.h"
#include "test/core/util/test_config.h"
static int g_flag;
@ -33,17 +30,6 @@ static void test(int rounds) {
for (i = 0; i < rounds; i++) {
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
}
static void test_blocking(int rounds) {
int i;
for (i = 0; i < rounds; i++) {
grpc_init();
}
for (i = 0; i < rounds; i++) {
grpc_shutdown_blocking();
}
}
static void test_mixed(void) {
@ -53,7 +39,6 @@ static void test_mixed(void) {
grpc_init();
grpc_shutdown();
grpc_shutdown();
grpc_maybe_wait_for_async_shutdown();
}
static void plugin_init(void) { g_flag = 1; }
@ -63,7 +48,7 @@ static void test_plugin() {
grpc_register_plugin(plugin_init, plugin_destroy);
grpc_init();
GPR_ASSERT(g_flag == 1);
grpc_shutdown_blocking();
grpc_shutdown();
GPR_ASSERT(g_flag == 2);
}
@ -72,7 +57,6 @@ static void test_repeatedly() {
grpc_init();
grpc_shutdown();
}
grpc_maybe_wait_for_async_shutdown();
}
int main(int argc, char** argv) {
@ -80,9 +64,6 @@ int main(int argc, char** argv) {
test(1);
test(2);
test(3);
test_blocking(1);
test_blocking(2);
test_blocking(3);
test_mixed();
test_plugin();
test_repeatedly();

@ -78,7 +78,6 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_init);
printf("%lx", (unsigned long) grpc_shutdown);
printf("%lx", (unsigned long) grpc_is_initialized);
printf("%lx", (unsigned long) grpc_shutdown_blocking);
printf("%lx", (unsigned long) grpc_version_string);
printf("%lx", (unsigned long) grpc_g_stands_for);
printf("%lx", (unsigned long) grpc_completion_queue_factory_lookup);

@ -16,18 +16,13 @@
*
*/
#include <inttypes.h>
#include <stdint.h>
#include <string.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/surface/init.h"
#include "test/core/util/memory_counters.h"
static struct grpc_memory_counters g_memory_counters;
@ -115,29 +110,3 @@ struct grpc_memory_counters grpc_memory_counters_snapshot() {
NO_BARRIER_LOAD(&g_memory_counters.total_allocs_absolute);
return counters;
}
namespace grpc_core {
namespace testing {
LeakDetector::LeakDetector(bool enable) : enabled_(enable) {
if (enabled_) {
grpc_memory_counters_init();
}
}
LeakDetector::~LeakDetector() {
// Wait for grpc_shutdown() to finish its async work.
grpc_maybe_wait_for_async_shutdown();
if (enabled_) {
struct grpc_memory_counters counters = grpc_memory_counters_snapshot();
if (counters.total_size_relative != 0) {
gpr_log(GPR_ERROR, "Leaking %" PRIuPTR " bytes",
static_cast<uintptr_t>(counters.total_size_relative));
GPR_ASSERT(0);
}
grpc_memory_counters_destroy();
}
}
} // namespace testing
} // namespace grpc_core

@ -32,22 +32,4 @@ void grpc_memory_counters_init();
void grpc_memory_counters_destroy();
struct grpc_memory_counters grpc_memory_counters_snapshot();
namespace grpc_core {
namespace testing {
// At destruction time, it will check there is no memory leak.
// The object should be created before grpc_init() is called and destroyed after
// grpc_shutdown() is returned.
class LeakDetector {
public:
explicit LeakDetector(bool enable);
~LeakDetector();
private:
const bool enabled_;
};
} // namespace testing
} // namespace grpc_core
#endif

@ -66,7 +66,7 @@ static void free_chosen_ports(void) {
for (i = 0; i < num_chosen_ports; i++) {
grpc_free_port_using_server(chosen_ports[i]);
}
grpc_shutdown_blocking();
grpc_shutdown();
gpr_free(chosen_ports);
}

@ -31,7 +31,6 @@
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/surface/init.h"
int64_t g_fixture_slowdown_factor = 1;
int64_t g_poller_slowdown_factor = 1;
@ -406,7 +405,7 @@ TestEnvironment::TestEnvironment(int argc, char** argv) {
grpc_test_init(argc, argv);
}
TestEnvironment::~TestEnvironment() { grpc_maybe_wait_for_async_shutdown(); }
TestEnvironment::~TestEnvironment() {}
} // namespace testing
} // namespace grpc

@ -65,8 +65,8 @@ class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
~ForwardingLoadBalancingPolicy() override = default;
void UpdateLocked(const grpc_channel_args& args,
grpc_json* lb_config) override {
delegate_->UpdateLocked(args, lb_config);
RefCountedPtr<Config> lb_config) override {
delegate_->UpdateLocked(args, std::move(lb_config));
}
void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
@ -102,7 +102,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
RefCountedPtr<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
this),
cb, user_data)),
std::move(args), /*delegate_lb_policy_name=*/"pick_first",
std::move(args),
/*delegate_lb_policy_name=*/"pick_first",
/*initial_refcount=*/2) {}
~InterceptRecvTrailingMetadataLoadBalancingPolicy() override = default;
@ -210,12 +211,11 @@ class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
void* user_data)
: cb_(cb), user_data_(user_data) {}
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>
CreateLoadBalancingPolicy(
grpc_core::LoadBalancingPolicy::Args args) const override {
return grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>(
grpc_core::New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
std::move(args), cb_, user_data_));
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
LoadBalancingPolicy::Args args) const override {
return OrphanablePtr<LoadBalancingPolicy>(
New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(std::move(args),
cb_, user_data_));
}
const char* name() const override {
@ -231,10 +231,9 @@ class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
InterceptRecvTrailingMetadataCallback cb, void* user_data) {
grpc_core::LoadBalancingPolicyRegistry::Builder::
RegisterLoadBalancingPolicyFactory(
grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
grpc_core::New<InterceptTrailingFactory>(cb, user_data)));
LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
UniquePtr<LoadBalancingPolicyFactory>(
New<InterceptTrailingFactory>(cb, user_data)));
}
} // namespace grpc_core

@ -150,6 +150,7 @@ grpc_cc_test(
"gtest",
],
deps = [
":interceptors_util",
":test_service_impl",
"//:gpr",
"//:grpc",

@ -16,6 +16,7 @@
*
*/
#include <algorithm>
#include <functional>
#include <mutex>
#include <sstream>
@ -35,9 +36,11 @@
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
#include "test/cpp/end2end/interceptors_util.h"
#include "test/cpp/end2end/test_service_impl.h"
#include "test/cpp/util/byte_buffer_proto_helper.h"
#include "test/cpp/util/string_ref_helper.h"
#include "test/cpp/util/test_credentials_provider.h"
#include <gtest/gtest.h>
@ -60,11 +63,17 @@ enum class Protocol { INPROC, TCP };
class TestScenario {
public:
TestScenario(bool serve_callback, Protocol protocol)
: callback_server(serve_callback), protocol(protocol) {}
TestScenario(bool serve_callback, Protocol protocol, bool intercept,
const grpc::string& creds_type)
: callback_server(serve_callback),
protocol(protocol),
use_interceptors(intercept),
credentials_type(creds_type) {}
void Log() const;
bool callback_server;
Protocol protocol;
bool use_interceptors;
const grpc::string credentials_type;
};
static std::ostream& operator<<(std::ostream& out,
@ -87,15 +96,18 @@ class ClientCallbackEnd2endTest
void SetUp() override {
ServerBuilder builder;
auto server_creds = GetCredentialsProvider()->GetServerCredentials(
GetParam().credentials_type);
// TODO(vjpai): Support testing of AuthMetadataProcessor
if (GetParam().protocol == Protocol::TCP) {
if (!grpc_iomgr_run_in_background()) {
do_not_test_ = true;
return;
}
int port = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << port;
builder.AddListeningPort(server_address_.str(),
InsecureServerCredentials());
picked_port_ = grpc_pick_unused_port_or_die();
server_address_ << "localhost:" << picked_port_;
builder.AddListeningPort(server_address_.str(), server_creds);
}
if (!GetParam().callback_server) {
builder.RegisterService(&service_);
@ -103,31 +115,61 @@ class ClientCallbackEnd2endTest
builder.RegisterService(&callback_service_);
}
if (GetParam().use_interceptors) {
std::vector<
std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
creators;
// Add 20 dummy server interceptors
creators.reserve(20);
for (auto i = 0; i < 20; i++) {
creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
new DummyInterceptorFactory()));
}
builder.experimental().SetInterceptorCreators(std::move(creators));
}
server_ = builder.BuildAndStart();
is_server_started_ = true;
}
void ResetStub() {
ChannelArguments args;
auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
GetParam().credentials_type, &args);
switch (GetParam().protocol) {
case Protocol::TCP:
channel_ =
CreateChannel(server_address_.str(), InsecureChannelCredentials());
if (!GetParam().use_interceptors) {
channel_ =
CreateCustomChannel(server_address_.str(), channel_creds, args);
} else {
channel_ = CreateCustomChannelWithInterceptors(
server_address_.str(), channel_creds, args,
CreateDummyClientInterceptors());
}
break;
case Protocol::INPROC:
channel_ = server_->InProcessChannel(args);
if (!GetParam().use_interceptors) {
channel_ = server_->InProcessChannel(args);
} else {
channel_ = server_->experimental().InProcessChannelWithInterceptors(
args, CreateDummyClientInterceptors());
}
break;
default:
assert(false);
}
stub_ = grpc::testing::EchoTestService::NewStub(channel_);
generic_stub_.reset(new GenericStub(channel_));
DummyInterceptor::Reset();
}
void TearDown() override {
if (is_server_started_) {
server_->Shutdown();
}
if (picked_port_ > 0) {
grpc_recycle_unused_port(picked_port_);
}
}
void SendRpcs(int num_rpcs, bool with_binary_metadata) {
@ -283,6 +325,7 @@ class ClientCallbackEnd2endTest
}
bool do_not_test_{false};
bool is_server_started_{false};
int picked_port_{0};
std::shared_ptr<Channel> channel_;
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<grpc::GenericStub> generic_stub_;
@ -419,136 +462,569 @@ TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
while (!done) {
cv.wait(l);
}
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, RequestStream) {
TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
context_.set_initial_metadata_corked(true);
stub->experimental_async()->RequestStream(&context_, &response_, this);
StartCall();
request_.set_message("Hello server.");
StartWrite(&request_);
EchoRequest request;
EchoResponse response;
ClientContext context;
request.set_message("hello");
context.AddMetadata(kServerTryCancelRequest,
grpc::to_string(CANCEL_BEFORE_PROCESSING));
std::mutex mu;
std::condition_variable cv;
bool done = false;
stub_->experimental_async()->Echo(
&context, &request, &response, [&done, &mu, &cv](Status s) {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
std::lock_guard<std::mutex> l(mu);
done = true;
cv.notify_one();
});
std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
struct ClientCancelInfo {
bool cancel{false};
int ops_before_cancel;
ClientCancelInfo() : cancel{false} {}
// Allow the single-op version to be non-explicit for ease of use
ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
};
class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
public:
WriteClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
num_msgs_to_send_(num_msgs_to_send),
client_cancel_{client_cancel} {
grpc::string msg{"Hello server."};
for (int i = 0; i < num_msgs_to_send; i++) {
desired_ += msg;
}
void OnWriteDone(bool ok) override {
writes_left_--;
if (writes_left_ > 1) {
StartWrite(&request_);
} else if (writes_left_ == 1) {
StartWriteLast(&request_, WriteOptions());
}
if (server_try_cancel != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
void OnDone(const Status& s) override {
context_.set_initial_metadata_corked(true);
stub->experimental_async()->RequestStream(&context_, &response_, this);
StartCall();
request_.set_message(msg);
MaybeWrite();
}
void OnWriteDone(bool ok) override {
if (ok) {
num_msgs_sent_++;
MaybeWrite();
}
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
int num_to_send =
(client_cancel_.cancel)
? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
: num_msgs_to_send_;
switch (server_try_cancel_) {
case CANCEL_BEFORE_PROCESSING:
case CANCEL_DURING_PROCESSING:
// If the RPC is canceled by server before / during messages from the
// client, it means that the client most likely did not get a chance to
// send all the messages it wanted to send. i.e num_msgs_sent <=
// num_msgs_to_send
EXPECT_LE(num_msgs_sent_, num_to_send);
break;
case DO_NOT_CANCEL:
case CANCEL_AFTER_PROCESSING:
// If the RPC was not canceled or canceled after all messages were read
// by the server, the client did get a chance to send all its messages
EXPECT_EQ(num_msgs_sent_, num_to_send);
break;
default:
assert(false);
break;
}
if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(response_.message(), "Hello server.Hello server.Hello server.");
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
EXPECT_EQ(response_.message(), desired_);
} else {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int writes_left_{3};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
} test{stub_.get()};
private:
void MaybeWrite() {
if (client_cancel_.cancel &&
num_msgs_sent_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
StartWrite(&request_);
} else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
StartWriteLast(&request_, WriteOptions());
}
}
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int num_msgs_sent_{0};
const int num_msgs_to_send_;
grpc::string desired_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, RequestStream) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
test.Await();
// Make sure that the server interceptors were not notified to cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
StartCall();
StartRead(&response_);
WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, {2}};
test.Await();
// Make sure that the server interceptors got the cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel before doing reading the request
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while reading a request from the stream in parallel
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after reading all the requests but before returning to the
// client
TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
MAYBE_SKIP_TEST;
ResetStub();
WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
ReadClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
void OnReadDone(bool ok) override {
if (!ok) {
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_));
reads_complete_++;
StartRead(&response_);
}
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(),
request_.message() + grpc::to_string(reads_complete_));
reads_complete_++;
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
}
// Even if we cancel, read until failure because there might be responses
// pending
StartRead(&response_);
}
void OnDone(const Status& s) override {
EXPECT_TRUE(s.ok());
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
kServerDefaultResponseStreamsToSend) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
// Status might be ok or cancelled depending on whether server
// sent status before client cancel went through
if (!s.ok()) {
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
}
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(reads_complete_, 0);
break;
case CANCEL_DURING_PROCESSING:
case CANCEL_AFTER_PROCESSING:
// If server canceled while writing messages, client must have read
// less than or equal to the expected number of messages. Even if the
// server canceled after writing all messages, the RPC may be canceled
// before the Client got a chance to read all the messages.
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
break;
default:
assert(false);
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int reads_complete_{0};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
} test{stub_.get()};
private:
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int reads_complete_{0};
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), DO_NOT_CANCEL};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), DO_NOT_CANCEL, 2};
test.Await();
// Because cancel in this case races with server finish, we can't be sure that
// server interceptors even see cancellation
}
// Server to cancel before sending any response messages
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while writing a response to the stream in parallel
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after writing all the respones to the stream but before
// returning to the client
TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
class BidiClient
: public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
public:
BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
int num_msgs_to_send, ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
grpc::to_string(server_try_cancel));
}
request_.set_message("Hello fren ");
stub->experimental_async()->BidiStream(&context_, this);
MaybeWrite();
StartRead(&response_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
if (server_try_cancel_ == DO_NOT_CANCEL) {
if (!client_cancel_.cancel) {
EXPECT_EQ(reads_complete_, msgs_to_send_);
} else {
EXPECT_LE(reads_complete_, writes_complete_);
}
}
} else {
EXPECT_LE(reads_complete_, msgs_to_send_);
EXPECT_EQ(response_.message(), request_.message());
reads_complete_++;
StartRead(&response_);
}
}
void OnWriteDone(bool ok) override {
if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_TRUE(ok);
} else if (!ok) {
return;
}
writes_complete_++;
MaybeWrite();
}
void OnDone(const Status& s) override {
gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
switch (server_try_cancel_) {
case DO_NOT_CANCEL:
if (!client_cancel_.cancel ||
client_cancel_.ops_before_cancel > msgs_to_send_) {
EXPECT_TRUE(s.ok());
EXPECT_EQ(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, writes_complete_);
} else {
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
EXPECT_LE(reads_complete_, writes_complete_);
}
break;
case CANCEL_BEFORE_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
// The RPC is canceled before the server did any work or returned any
// reads, but it's possible that some writes took place first from the
// client
EXPECT_LE(writes_complete_, msgs_to_send_);
EXPECT_EQ(reads_complete_, 0);
break;
case CANCEL_DURING_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_LE(writes_complete_, msgs_to_send_);
EXPECT_LE(reads_complete_, writes_complete_);
break;
case CANCEL_AFTER_PROCESSING:
EXPECT_FALSE(s.ok());
EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
EXPECT_EQ(writes_complete_, msgs_to_send_);
// The Server canceled after reading the last message and after writing
// the message to the client. However, the RPC cancellation might have
// taken effect before the client actually read the response.
EXPECT_LE(reads_complete_, writes_complete_);
break;
default:
assert(false);
}
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
}
void Await() {
std::unique_lock<std::mutex> l(mu_);
while (!done_) {
cv_.wait(l);
}
}
private:
void MaybeWrite() {
if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) {
context_.TryCancel();
} else if (writes_complete_ == msgs_to_send_) {
StartWritesDone();
} else {
StartWrite(&request_);
}
}
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
const ServerTryCancelRequestPhase server_try_cancel_;
int reads_complete_{0};
int writes_complete_{0};
const int msgs_to_send_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
};
TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend};
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), DO_NOT_CANCEL,
kServerDefaultResponseStreamsToSend, 2};
test.Await();
// Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel before reading/writing any requests/responses on the stream
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel while reading/writing requests/responses on the stream in
// parallel
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
// Server to cancel after reading/writing all requests/responses on the stream
// but before returning to the client
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
}
}
TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
MAYBE_SKIP_TEST;
ResetStub();
class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
EchoResponse> {
public:
explicit Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello fren ");
Client(grpc::testing::EchoTestService::Stub* stub) {
request_.set_message("Hello bidi ");
stub->experimental_async()->BidiStream(&context_, this);
StartCall();
StartRead(&response_);
StartWrite(&request_);
StartCall();
}
void OnReadDone(bool ok) override {
if (!ok) {
EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(), request_.message());
reads_complete_++;
StartRead(&response_);
}
EXPECT_TRUE(ok);
EXPECT_EQ(response_.message(), request_.message());
}
void OnWriteDone(bool ok) override {
EXPECT_TRUE(ok);
if (++writes_complete_ == kServerDefaultResponseStreamsToSend) {
StartWritesDone();
} else {
StartWrite(&request_);
}
// Now send out the simultaneous Read and WritesDone
StartWritesDone();
StartRead(&response_);
}
void OnDone(const Status& s) override {
EXPECT_TRUE(s.ok());
EXPECT_EQ(response_.message(), request_.message());
std::unique_lock<std::mutex> l(mu_);
done_ = true;
cv_.notify_one();
@ -564,8 +1040,6 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
EchoRequest request_;
EchoResponse response_;
ClientContext context_;
int reads_complete_{0};
int writes_complete_{0};
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
@ -574,13 +1048,42 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
test.Await();
}
TestScenario scenarios[]{{false, Protocol::INPROC},
{false, Protocol::TCP},
{true, Protocol::INPROC},
{true, Protocol::TCP}};
std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
std::vector<TestScenario> scenarios;
std::vector<grpc::string> credentials_types{
GetCredentialsProvider()->GetSecureCredentialsTypeList()};
auto insec_ok = [] {
// Only allow insecure credentials type when it is registered with the
// provider. User may create providers that do not have insecure.
return GetCredentialsProvider()->GetChannelCredentials(
kInsecureCredentialsType, nullptr) != nullptr;
};
if (test_insecure && insec_ok()) {
credentials_types.push_back(kInsecureCredentialsType);
}
GPR_ASSERT(!credentials_types.empty());
bool barr[]{false, true};
Protocol parr[]{Protocol::INPROC, Protocol::TCP};
for (Protocol p : parr) {
for (const auto& cred : credentials_types) {
// TODO(vjpai): Test inproc with secure credentials when feasible
if (p == Protocol::INPROC &&
(cred != kInsecureCredentialsType || !insec_ok())) {
continue;
}
for (bool callback_server : barr) {
for (bool use_interceptors : barr) {
scenarios.emplace_back(callback_server, p, use_interceptors, cred);
}
}
}
}
return scenarios;
}
INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
::testing::ValuesIn(scenarios));
::testing::ValuesIn(CreateTestScenarios(true)));
} // namespace
} // namespace testing

@ -402,6 +402,27 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::shared_ptr<ChannelCredentials> creds_;
};
TEST_F(ClientLbEnd2endTest, ChannelStateConnectingWhenResolving) {
const int kNumServers = 3;
StartServers(kNumServers);
auto channel = BuildChannel("");
auto stub = BuildStub(channel);
// Initial state should be IDLE.
EXPECT_EQ(channel->GetState(false /* try_to_connect */), GRPC_CHANNEL_IDLE);
// Tell the channel to try to connect.
// Note that this call also returns IDLE, since the state change has
// not yet occurred; it just gets triggered by this call.
EXPECT_EQ(channel->GetState(true /* try_to_connect */), GRPC_CHANNEL_IDLE);
// Now that the channel is trying to connect, we should be in state
// CONNECTING.
EXPECT_EQ(channel->GetState(false /* try_to_connect */),
GRPC_CHANNEL_CONNECTING);
// Return a resolver result, which allows the connection attempt to proceed.
SetNextResolution(GetServersPorts());
// We should eventually transition into state READY.
EXPECT_TRUE(WaitForChannelReady(channel.get()));
}
TEST_F(ClientLbEnd2endTest, PickFirst) {
// Start servers and send one RPC per server.
const int kNumServers = 3;

@ -583,7 +583,10 @@ CallbackTestServiceImpl::RequestStream() {
StartRead(&request_);
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnReadDone(bool ok) override {
if (ok) {
response_->mutable_message()->append(request_.message());
@ -666,7 +669,10 @@ CallbackTestServiceImpl::ResponseStream() {
}
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnWriteDone(bool ok) override {
if (num_msgs_sent_ < server_responses_to_send_) {
NextWrite();
@ -749,7 +755,10 @@ CallbackTestServiceImpl::BidiStream() {
StartRead(&request_);
}
void OnDone() override { delete this; }
void OnCancel() override { FinishOnce(Status::CANCELLED); }
void OnCancel() override {
EXPECT_TRUE(ctx_->IsCancelled());
FinishOnce(Status::CANCELLED);
}
void OnReadDone(bool ok) override {
if (ok) {
num_msgs_read_++;

@ -197,7 +197,7 @@ void VerifyLbAddrOutputs(const grpc_core::ServerAddressList addresses,
class AddressSortingTest : public ::testing::Test {
protected:
void SetUp() override { grpc_init(); }
void TearDown() override { grpc_shutdown_blocking(); }
void TearDown() override { grpc_shutdown(); }
};
/* Tests for rule 1 */

@ -258,6 +258,14 @@ class GrpcToolTest : public ::testing::Test {
void ShutdownServer() { server_->Shutdown(); }
void ExitWhenError(int argc, const char** argv, const CliCredentials& cred,
GrpcToolOutputCallback callback) {
int result = GrpcToolMainLib(argc, argv, cred, callback);
if (result) {
exit(result);
}
}
std::unique_ptr<Server> server_;
TestServiceImpl service_;
reflection::ProtoServerReflectionPlugin plugin_;
@ -410,9 +418,11 @@ TEST_F(GrpcToolTest, TypeNotFound) {
const char* argv[] = {"grpc_cli", "type", server_address.c_str(),
"grpc.testing.DummyRequest"};
EXPECT_TRUE(1 == GrpcToolMainLib(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)));
EXPECT_DEATH(ExitWhenError(ArraySize(argv), argv, TestCliCredentials(),
std::bind(PrintStream, &output_stream,
std::placeholders::_1)),
".*Type grpc.testing.DummyRequest not found.*");
ShutdownServer();
}

@ -138,10 +138,13 @@ def _symlink_genrule_for_dir(repository_ctx,
def _get_python_bin(repository_ctx):
"""Gets the python bin path."""
python_bin = repository_ctx.os.environ.get(_PYTHON_BIN_PATH)
if python_bin != None:
return python_bin
python_bin_path = repository_ctx.which("python")
python_bin = repository_ctx.os.environ.get(_PYTHON_BIN_PATH, 'python')
if not repository_ctx.path(python_bin).exists:
# It's a command, use 'which' to find its path.
python_bin_path = repository_ctx.which(python_bin)
else:
# It's a path, use it as it is.
python_bin_path = python_bin
if python_bin_path != None:
return str(python_bin_path)
_fail("Cannot find python in PATH, please make sure " +

@ -57,3 +57,7 @@ build:basicprof --copt=-DNDEBUG
build:basicprof --copt=-O2
build:basicprof --copt=-DGRPC_BASIC_PROFILER
build:basicprof --copt=-DGRPC_TIMERS_RDTSC
build:python3 --python_path=python3
build:python3 --force_python=PY3
build:python3 --action_env=PYTHON_BIN_PATH=python3

@ -25,3 +25,5 @@ git clone /var/local/jenkins/grpc /var/local/git/grpc
${name}')
cd /var/local/git/grpc/test
bazel test --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //src/python/...
bazel clean --expunge
bazel test --config=python3 --spawn_strategy=standalone --genrule_strategy=standalone --test_output=errors //src/python/...

@ -3300,7 +3300,8 @@
"language": "c++",
"name": "client_callback_end2end_test",
"src": [
"test/cpp/end2end/client_callback_end2end_test.cc"
"test/cpp/end2end/client_callback_end2end_test.cc",
"test/cpp/end2end/interceptors_util.cc"
],
"third_party": false,
"type": "target"

Loading…
Cancel
Save