Merge branch 'master' into interceptorcqavalanching

pull/17806/head
Yash Tibrewal 6 years ago
commit 786598a6ff
  1. 38
      doc/interop-test-descriptions.md
  2. 4
      gRPC-C++.podspec
  3. 4
      gRPC-Core.podspec
  4. 1
      gRPC-ProtoRPC.podspec
  5. 1
      gRPC-RxLibrary.podspec
  6. 1
      gRPC.podspec
  7. 4
      include/grpc/impl/codegen/grpc_types.h
  8. 27
      include/grpcpp/impl/codegen/interceptor.h
  9. 29
      include/grpcpp/server.h
  10. 31
      src/core/ext/filters/client_channel/lb_policy.cc
  11. 16
      src/core/ext/filters/client_channel/lb_policy.h
  12. 20
      src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
  13. 8
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  14. 8
      src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
  15. 4
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  16. 93
      src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
  17. 7
      src/core/ext/filters/client_channel/lb_policy_factory.h
  18. 4
      src/core/ext/filters/client_channel/lb_policy_registry.cc
  19. 2
      src/core/ext/filters/client_channel/lb_policy_registry.h
  20. 2
      src/core/ext/filters/client_channel/request_routing.cc
  21. 3
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  22. 4
      src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
  23. 42
      src/core/ext/filters/client_channel/resolver_result_parsing.cc
  24. 5
      src/core/ext/filters/client_channel/subchannel.cc
  25. 5
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  26. 1
      src/core/lib/iomgr/exec_ctx.cc
  27. 57
      src/core/lib/iomgr/exec_ctx.h
  28. 7
      src/core/lib/iomgr/executor.cc
  29. 7
      src/core/lib/iomgr/timer_manager.cc
  30. 6
      src/core/lib/surface/call.cc
  31. 5
      src/core/lib/surface/completion_queue.cc
  32. 23
      src/core/lib/surface/server.cc
  33. 6
      src/core/lib/transport/transport.cc
  34. 3
      src/cpp/common/alarm.cc
  35. 4
      src/cpp/server/load_reporter/load_reporter_async_service_impl.cc
  36. 225
      src/cpp/server/server_cc.cc
  37. 6
      src/csharp/Grpc.Core.Tests/CallOptionsTest.cs
  38. 25
      src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
  39. 14
      src/csharp/Grpc.Core/CallOptions.cs
  40. 59
      src/csharp/Grpc.Core/ContextPropagationOptions.cs
  41. 128
      src/csharp/Grpc.Core/ContextPropagationToken.cs
  42. 4
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  43. 34
      src/csharp/Grpc.Core/Internal/ContextPropagationFlags.cs
  44. 119
      src/csharp/Grpc.Core/Internal/ContextPropagationTokenImpl.cs
  45. 2
      src/csharp/Grpc.Core/Internal/DefaultServerCallContext.cs
  46. 1
      src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
  47. 4
      src/objective-c/BoringSSL-GRPC.podspec
  48. 8
      src/php/bin/run_tests.sh
  49. 2310
      src/php/tests/MemoryLeakTest/MemoryLeakTest.php
  50. 10
      src/php/tests/unit_tests/CallTest.php
  51. 18
      src/proto/grpc/core/BUILD
  52. 94
      src/proto/grpc/testing/BUILD
  53. 8
      src/python/grpcio/grpc/_channel.py
  54. 4
      src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
  55. 4
      src/python/grpcio/grpc/_server.py
  56. 1
      src/python/grpcio_channelz/LICENSE
  57. 3
      src/python/grpcio_channelz/channelz_commands.py
  58. 1
      src/python/grpcio_health_checking/LICENSE
  59. 3
      src/python/grpcio_health_checking/health_commands.py
  60. 1
      src/python/grpcio_reflection/LICENSE
  61. 3
      src/python/grpcio_reflection/reflection_commands.py
  62. 1
      src/python/grpcio_status/LICENSE
  63. 19
      src/python/grpcio_status/setup.py
  64. 39
      src/python/grpcio_status/status_commands.py
  65. 1
      src/python/grpcio_testing/LICENSE
  66. 16
      src/python/grpcio_testing/setup.py
  67. 39
      src/python/grpcio_testing/testing_commands.py
  68. 12
      src/python/grpcio_tests/setup.py
  69. 1
      src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
  70. 103
      src/python/grpcio_tests/tests/qps/BUILD
  71. 102
      src/python/grpcio_tests/tests/qps/README.md
  72. 45
      src/python/grpcio_tests/tests/qps/basic_benchmark_test.sh
  73. 96
      src/python/grpcio_tests/tests/qps/scenarios.json
  74. 16
      src/ruby/ext/grpc/extconf.rb
  75. 4
      templates/gRPC-C++.podspec.template
  76. 4
      templates/gRPC-Core.podspec.template
  77. 1
      templates/gRPC-ProtoRPC.podspec.template
  78. 1
      templates/gRPC-RxLibrary.podspec.template
  79. 1
      templates/gRPC.podspec.template
  80. 1
      templates/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec.template
  81. 4
      templates/src/objective-c/BoringSSL-GRPC.podspec.template
  82. 1
      templates/test/cpp/naming/resolver_component_tests_defs.include
  83. 7
      templates/tools/dockerfile/php_valgrind.include
  84. 1
      templates/tools/dockerfile/test/php7_jessie_x64/Dockerfile.template
  85. 1
      templates/tools/dockerfile/test/php_jessie_x64/Dockerfile.template
  86. 2
      test/core/client_channel/resolvers/dns_resolver_test.cc
  87. 2
      test/core/end2end/tests/filter_status_code.cc
  88. 103
      test/core/iomgr/resolve_address_test.cc
  89. 69
      test/core/surface/completion_queue_test.cc
  90. 21
      test/core/util/test_lb_policies.cc
  91. 2
      test/cpp/end2end/BUILD
  92. 6
      test/cpp/interop/client.cc
  93. 19
      test/cpp/interop/interop_client.cc
  94. 2
      test/cpp/interop/interop_client.h
  95. 2
      test/cpp/microbenchmarks/bm_chttp2_transport.cc
  96. 1
      test/cpp/naming/resolver_component_tests_runner.py
  97. 6
      test/cpp/qps/BUILD
  98. 2
      tools/distrib/yapf_code.sh
  99. 8
      tools/dockerfile/test/php7_jessie_x64/Dockerfile
  100. 8
      tools/dockerfile/test/php_jessie_x64/Dockerfile
  101. Some files were not shown because too many files have changed in this diff Show More

@ -679,6 +679,44 @@ Client asserts:
by the auth library. The client can optionally check the username matches the
email address in the key file.
### google_default_credentials
Similar to the other auth tests, this test should only be run against prod
servers. Different from some of the other auth tests however, this test
may be also run from outside of GCP.
This test verifies unary calls succeed when the client uses
GoogleDefaultCredentials. The path to a service account key file in the
GOOGLE_APPLICATION_CREDENTIALS environment variable may or may not be
provided by the test runner. For example, the test runner might set
this environment when outside of GCP but keep it unset when on GCP.
The test uses `--default_service_account` with GCE service account email.
Server features:
* [UnaryCall][]
* [Echo Authenticated Username][]
Procedure:
1. Client configures the channel to use GoogleDefaultCredentials
* Note: the term `GoogleDefaultCredentials` within the context
of this test description refers to an API which encapsulates
both "transport credentials" and "call credentials" and which
is capable of transport creds auto-selection (including ALTS).
Similar APIs involving only auto-selection of OAuth mechanisms
might work for this test but aren't the intended subjects.
2. Client calls UnaryCall with:
```
{
fill_username: true
}
```
Client asserts:
* call was successful
* received SimpleResponse.username matches the value of
`--default_service_account`
### custom_metadata

@ -24,7 +24,7 @@ Pod::Spec.new do |s|
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
# version = '1.19.0-dev'
version = '0.0.6-dev'
version = '0.0.8-dev'
s.version = version
s.summary = 'gRPC C++ library'
s.homepage = 'https://grpc.io'
@ -40,6 +40,8 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpcpp'

@ -40,6 +40,8 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpc'
@ -181,7 +183,7 @@ Pod::Spec.new do |s|
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
ss.dependency 'BoringSSL-GRPC', '0.0.2'
ss.dependency 'BoringSSL-GRPC', '0.0.3'
ss.dependency 'nanopb', '~> 0.3'
ss.compiler_flags = '-DGRPC_SHADOW_BORINGSSL_SYMBOLS'

@ -35,6 +35,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'ProtoRPC'
s.module_name = name

@ -35,6 +35,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'RxLibrary'
s.module_name = name

@ -34,6 +34,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'GRPCClient'
s.module_name = name

@ -693,6 +693,10 @@ typedef struct grpc_experimental_completion_queue_functor {
pointer to this functor and a boolean that indicates whether the
operation succeeded (non-zero) or failed (zero) */
void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int);
/** The following fields are not API. They are meant for internal use. */
int internal_success;
struct grpc_experimental_completion_queue_functor* internal_next;
} grpc_experimental_completion_queue_functor;
/* The upgrade to version 2 is currently experimental. */

@ -107,6 +107,24 @@ class InterceptorBatchMethods {
/// of the hijacking interceptor.
virtual void Hijack() = 0;
/// Send Message Methods
/// GetSerializedSendMessage and GetSendMessage/ModifySendMessage are the
/// available methods to view and modify the request payload. An interceptor
/// can access the payload in either serialized form or non-serialized form
/// but not both at the same time.
/// gRPC performs serialization in a lazy manner, which means
/// that a call to GetSerializedSendMessage will result in a serialization
/// operation if the payload stored is not in the serialized form already; the
/// non-serialized form will be lost and GetSendMessage will no longer return
/// a valid pointer, and this will remain true for later interceptors too.
/// This can change however if ModifySendMessage is used to replace the
/// current payload. Note that ModifySendMessage requires a new payload
/// message in the non-serialized form. This will overwrite the existing
/// payload irrespective of whether it had been serialized earlier. Also note
/// that gRPC Async API requires early serialization of the payload which
/// means that the payload would be available in the serialized form only
/// unless an interceptor replaces the payload with ModifySendMessage.
/// Returns a modifable ByteBuffer holding the serialized form of the message
/// that is going to be sent. Valid for PRE_SEND_MESSAGE interceptions.
/// A return value of nullptr indicates that this ByteBuffer is not valid.
@ -114,15 +132,16 @@ class InterceptorBatchMethods {
/// Returns a non-modifiable pointer to the non-serialized form of the message
/// to be sent. Valid for PRE_SEND_MESSAGE interceptions. A return value of
/// nullptr indicates that this field is not valid. Also note that this is
/// only supported for sync and callback APIs at the present moment.
/// nullptr indicates that this field is not valid.
virtual const void* GetSendMessage() = 0;
/// Overwrites the message to be sent with \a message. \a message should be in
/// the non-serialized form expected by the method. Valid for PRE_SEND_MESSAGE
/// interceptions. Note that the interceptor is responsible for maintaining
/// the life of the message for the duration on the send operation, i.e., till
/// POST_SEND_MESSAGE.
/// the life of the message till it is serialized or it receives the
/// POST_SEND_MESSAGE interception point, whichever happens earlier. The
/// modifying interceptor may itself force early serialization by calling
/// GetSerializedSendMessage.
virtual void ModifySendMessage(const void* message) = 0;
/// Checks whether the SEND MESSAGE op succeeded. Valid for POST_SEND_MESSAGE

@ -248,8 +248,22 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
/// the \a sync_server_cqs)
std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
/// Outstanding callback requests
std::vector<std::unique_ptr<CallbackRequest>> callback_reqs_;
// Outstanding callback requests. The vector is indexed by method with a list
// per method. Each element should store its own iterator in the list and
// should erase it when the request is actually bound to an RPC. Synchronize
// this list with its own mu_ (not the server mu_) since these must be active
// at Shutdown when the server mu_ is locked.
// TODO(vjpai): Merge with the core request matcher to avoid duplicate work
struct MethodReqList {
std::mutex reqs_mu;
// Maintain our own list size count since list::size is still linear
// for some libraries (supposed to be constant since C++11)
// TODO(vjpai): Remove reqs_list_sz and use list::size when possible
size_t reqs_list_sz{0};
std::list<CallbackRequest*> reqs_list;
using iterator = decltype(reqs_list)::iterator;
};
std::vector<MethodReqList*> callback_reqs_;
// Server status
std::mutex mu_;
@ -259,6 +273,17 @@ class Server : public ServerInterface, private GrpcLibraryCodegen {
std::condition_variable shutdown_cv_;
// It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
// Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
// decremented under the lock in case it is the last request and enables the
// server shutdown. The increment is performance-critical since it happens
// during periods of increasing load; the decrement happens only when memory
// is maxed out, during server shutdown, or (possibly in a future version)
// during decreasing load, so it is less performance-critical.
std::mutex callback_reqs_mu_;
std::condition_variable callback_reqs_done_cv_;
std::atomic_int callback_reqs_outstanding_{0};
std::shared_ptr<GlobalCallbacks> global_callbacks_;
std::vector<grpc::string> services_;

@ -20,6 +20,7 @@
#include "src/core/ext/filters/client_channel/lb_policy.h"
#include "src/core/ext/filters/client_channel/lb_policy_registry.h"
#include "src/core/lib/iomgr/combiner.h"
grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
@ -27,11 +28,37 @@ grpc_core::DebugOnlyTraceFlag grpc_trace_lb_policy_refcount(
namespace grpc_core {
LoadBalancingPolicy::LoadBalancingPolicy(const Args& args)
grpc_json* LoadBalancingPolicy::ParseLoadBalancingConfig(
const grpc_json* lb_config_array) {
if (lb_config_array == nullptr || lb_config_array->type != GRPC_JSON_ARRAY) {
return nullptr;
}
// Find the first LB policy that this client supports.
for (const grpc_json* lb_config = lb_config_array->child;
lb_config != nullptr; lb_config = lb_config->next) {
if (lb_config->type != GRPC_JSON_OBJECT) return nullptr;
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT)
return nullptr;
if (policy != nullptr) return nullptr; // Violate "oneof" type.
policy = field;
}
if (policy == nullptr) return nullptr;
// If we support this policy, then select it.
if (LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(policy->key)) {
return policy;
}
}
return nullptr;
}
LoadBalancingPolicy::LoadBalancingPolicy(Args args)
: InternallyRefCounted(&grpc_trace_lb_policy_refcount),
combiner_(GRPC_COMBINER_REF(args.combiner, "lb_policy")),
client_channel_factory_(args.client_channel_factory),
subchannel_pool_(*args.subchannel_pool),
subchannel_pool_(std::move(args.subchannel_pool)),
interested_parties_(grpc_pollset_set_create()),
request_reresolution_(nullptr) {}

@ -55,7 +55,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
/// Used to create channels and subchannels.
grpc_client_channel_factory* client_channel_factory = nullptr;
/// Subchannel pool.
RefCountedPtr<SubchannelPoolInterface>* subchannel_pool;
RefCountedPtr<SubchannelPoolInterface> subchannel_pool;
/// Channel args from the resolver.
/// Note that the LB policy gets the set of addresses from the
/// GRPC_ARG_SERVER_ADDRESS_LIST channel arg.
@ -179,6 +179,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
GRPC_ERROR_NONE);
}
/// Returns the JSON node of policy (with both policy name and config content)
/// given the JSON node of a LoadBalancingConfig array.
static grpc_json* ParseLoadBalancingConfig(const grpc_json* lb_config_array);
/// Sets the re-resolution closure to \a request_reresolution.
void SetReresolutionClosureLocked(grpc_closure* request_reresolution) {
GPR_ASSERT(request_reresolution_ == nullptr);
@ -187,10 +191,10 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
grpc_pollset_set* interested_parties() const { return interested_parties_; }
/// Returns a pointer to the subchannel pool of type
/// RefCountedPtr<SubchannelPoolInterface>.
RefCountedPtr<SubchannelPoolInterface>* subchannel_pool() {
return &subchannel_pool_;
// Callers that need their own reference can call the returned
// object's Ref() method.
SubchannelPoolInterface* subchannel_pool() const {
return subchannel_pool_.get();
}
GRPC_ABSTRACT_BASE_CLASS
@ -198,7 +202,7 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {
protected:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
explicit LoadBalancingPolicy(const Args& args);
explicit LoadBalancingPolicy(Args args);
virtual ~LoadBalancingPolicy();
grpc_combiner* combiner() const { return combiner_; }

@ -125,7 +125,7 @@ constexpr char kGrpclb[] = "grpclb";
class GrpcLb : public LoadBalancingPolicy {
public:
explicit GrpcLb(const Args& args);
explicit GrpcLb(Args args);
const char* name() const override { return kGrpclb; }
@ -273,7 +273,7 @@ class GrpcLb : public LoadBalancingPolicy {
// Methods for dealing with the RR policy.
void CreateOrUpdateRoundRobinPolicyLocked();
grpc_channel_args* CreateRoundRobinPolicyArgsLocked();
void CreateRoundRobinPolicyLocked(const Args& args);
void CreateRoundRobinPolicyLocked(Args args);
bool PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error);
void UpdateConnectivityStateFromRoundRobinPolicyLocked(
@ -973,8 +973,8 @@ grpc_channel_args* BuildBalancerChannelArgs(
// ctor and dtor
//
GrpcLb::GrpcLb(const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
GrpcLb::GrpcLb(LoadBalancingPolicy::Args args)
: LoadBalancingPolicy(std::move(args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
BackOff::Options()
@ -1588,10 +1588,10 @@ bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
return pick_done;
}
void GrpcLb::CreateRoundRobinPolicyLocked(const Args& args) {
void GrpcLb::CreateRoundRobinPolicyLocked(Args args) {
GPR_ASSERT(rr_policy_ == nullptr);
rr_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", args);
"round_robin", std::move(args));
if (GPR_UNLIKELY(rr_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[grpclb %p] Failure creating a RoundRobin policy",
this);
@ -1693,8 +1693,8 @@ void GrpcLb::CreateOrUpdateRoundRobinPolicyLocked() {
lb_policy_args.combiner = combiner();
lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.args = args;
lb_policy_args.subchannel_pool = subchannel_pool();
CreateRoundRobinPolicyLocked(lb_policy_args);
lb_policy_args.subchannel_pool = subchannel_pool()->Ref();
CreateRoundRobinPolicyLocked(std::move(lb_policy_args));
}
grpc_channel_args_destroy(args);
}
@ -1802,7 +1802,7 @@ void GrpcLb::OnRoundRobinConnectivityChangedLocked(void* arg,
class GrpcLbFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
LoadBalancingPolicy::Args args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const ServerAddressList* addresses =
FindServerAddressListChannelArg(args.args);
@ -1815,7 +1815,7 @@ class GrpcLbFactory : public LoadBalancingPolicyFactory {
}
}
if (!found_balancer) return nullptr;
return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(args));
return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
}
const char* name() const override { return kGrpclb; }

@ -46,7 +46,7 @@ constexpr char kPickFirst[] = "pick_first";
class PickFirst : public LoadBalancingPolicy {
public:
explicit PickFirst(const Args& args);
explicit PickFirst(Args args);
const char* name() const override { return kPickFirst; }
@ -154,7 +154,7 @@ class PickFirst : public LoadBalancingPolicy {
channelz::ChildRefsList child_channels_;
};
PickFirst::PickFirst(const Args& args) : LoadBalancingPolicy(args) {
PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
@ -619,8 +619,8 @@ void PickFirst::PickFirstSubchannelData::
class PickFirstFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(args));
LoadBalancingPolicy::Args args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(std::move(args)));
}
const char* name() const override { return kPickFirst; }

@ -56,7 +56,7 @@ constexpr char kRoundRobin[] = "round_robin";
class RoundRobin : public LoadBalancingPolicy {
public:
explicit RoundRobin(const Args& args);
explicit RoundRobin(Args args);
const char* name() const override { return kRoundRobin; }
@ -210,7 +210,7 @@ class RoundRobin : public LoadBalancingPolicy {
channelz::ChildRefsList child_channels_;
};
RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) {
RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
GPR_ASSERT(args.client_channel_factory != nullptr);
gpr_mu_init(&child_refs_mu_);
grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE,
@ -697,8 +697,8 @@ void RoundRobin::UpdateLocked(const grpc_channel_args& args,
class RoundRobinFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args));
LoadBalancingPolicy::Args args) const override {
return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(std::move(args)));
}
const char* name() const override { return kRoundRobin; }

@ -514,8 +514,8 @@ SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
// policy, which does not use a SubchannelList.
GPR_ASSERT(!addresses[i].IsBalancer());
InlinedVector<grpc_arg, 4> args_to_add;
args_to_add.emplace_back(SubchannelPoolInterface::CreateChannelArg(
policy_->subchannel_pool()->get()));
args_to_add.emplace_back(
SubchannelPoolInterface::CreateChannelArg(policy_->subchannel_pool()));
const size_t subchannel_address_arg_index = args_to_add.size();
args_to_add.emplace_back(
grpc_create_subchannel_address_arg(&addresses[i].address()));

@ -100,6 +100,7 @@
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/service_config.h"
#include "src/core/lib/transport/static_metadata.h"
#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
@ -118,7 +119,7 @@ constexpr char kXds[] = "xds_experimental";
class XdsLb : public LoadBalancingPolicy {
public:
explicit XdsLb(const Args& args);
explicit XdsLb(Args args);
const char* name() const override { return kXds; }
@ -247,6 +248,12 @@ class XdsLb : public LoadBalancingPolicy {
// Helper function used in ctor and UpdateLocked().
void ProcessChannelArgsLocked(const grpc_channel_args& args);
// Parses the xds config given the JSON node of the first child of XdsConfig.
// If parsing succeeds, updates \a balancer_name, and updates \a
// child_policy_json_dump_ and \a fallback_policy_json_dump_ if they are also
// found. Does nothing upon failure.
void ParseLbConfig(grpc_json* xds_config_json);
// Methods for dealing with the balancer channel and call.
void StartPickingLocked();
void StartBalancerCallLocked();
@ -265,7 +272,7 @@ class XdsLb : public LoadBalancingPolicy {
// Methods for dealing with the child policy.
void CreateOrUpdateChildPolicyLocked();
grpc_channel_args* CreateChildPolicyArgsLocked();
void CreateChildPolicyLocked(const Args& args);
void CreateChildPolicyLocked(const char* name, Args args);
bool PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error);
void UpdateConnectivityStateFromChildPolicyLocked(
@ -278,6 +285,9 @@ class XdsLb : public LoadBalancingPolicy {
// Who the client is trying to communicate with.
const char* server_name_ = nullptr;
// Name of the balancer to connect to.
UniquePtr<char> balancer_name_;
// Current channel args from the resolver.
grpc_channel_args* args_ = nullptr;
@ -318,6 +328,7 @@ class XdsLb : public LoadBalancingPolicy {
// Timeout in milliseconds for before using fallback backend addresses.
// 0 means not using fallback.
UniquePtr<char> fallback_policy_json_string_;
int lb_fallback_timeout_ms_ = 0;
// The backend addresses from the resolver.
UniquePtr<ServerAddressList> fallback_backend_addresses_;
@ -331,6 +342,7 @@ class XdsLb : public LoadBalancingPolicy {
// The policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
UniquePtr<char> child_policy_json_string_;
grpc_connectivity_state child_connectivity_state_;
grpc_closure on_child_connectivity_changed_;
grpc_closure on_child_request_reresolution_;
@ -892,8 +904,8 @@ grpc_channel_args* BuildBalancerChannelArgs(
//
// TODO(vishalpowar): Use lb_config in args to configure LB policy.
XdsLb::XdsLb(const LoadBalancingPolicy::Args& args)
: LoadBalancingPolicy(args),
XdsLb::XdsLb(LoadBalancingPolicy::Args args)
: LoadBalancingPolicy(std::move(args)),
response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
lb_call_backoff_(
BackOff::Options()
@ -934,6 +946,8 @@ XdsLb::XdsLb(const LoadBalancingPolicy::Args& args)
arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
lb_fallback_timeout_ms_ = grpc_channel_arg_get_integer(
arg, {GRPC_XDS_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
// Parse the LB config.
ParseLbConfig(args.lb_config);
// Process channel args.
ProcessChannelArgsLocked(*args.args);
}
@ -1184,8 +1198,44 @@ void XdsLb::ProcessChannelArgsLocked(const grpc_channel_args& args) {
grpc_channel_args_destroy(lb_channel_args);
}
// TODO(vishalpowar): Use lb_config to configure LB policy.
void XdsLb::ParseLbConfig(grpc_json* xds_config_json) {
const char* balancer_name = nullptr;
grpc_json* child_policy = nullptr;
grpc_json* fallback_policy = nullptr;
for (grpc_json* field = xds_config_json; field != nullptr;
field = field->next) {
if (field->key == nullptr) return;
if (strcmp(field->key, "balancerName") == 0) {
if (balancer_name != nullptr) return; // Duplicate.
if (field->type != GRPC_JSON_STRING) return;
balancer_name = field->value;
} else if (strcmp(field->key, "childPolicy") == 0) {
if (child_policy != nullptr) return; // Duplicate.
child_policy = ParseLoadBalancingConfig(field);
} else if (strcmp(field->key, "fallbackPolicy") == 0) {
if (fallback_policy != nullptr) return; // Duplicate.
fallback_policy = ParseLoadBalancingConfig(field);
}
}
if (balancer_name == nullptr) return; // Required field.
if (child_policy != nullptr) {
child_policy_json_string_ =
UniquePtr<char>(grpc_json_dump_to_string(child_policy, 0 /* indent */));
}
if (fallback_policy != nullptr) {
fallback_policy_json_string_ = UniquePtr<char>(
grpc_json_dump_to_string(fallback_policy, 0 /* indent */));
}
balancer_name_ = UniquePtr<char>(gpr_strdup(balancer_name));
}
void XdsLb::UpdateLocked(const grpc_channel_args& args, grpc_json* lb_config) {
ParseLbConfig(lb_config);
// TODO(juanlishen): Pass fallback policy config update after fallback policy
// is added.
if (balancer_name_ == nullptr) {
gpr_log(GPR_ERROR, "[xdslb %p] LB config parsing fails.", this);
}
ProcessChannelArgsLocked(args);
// Update the existing child policy.
// Note: We have disabled fallback mode in the code, so this child policy must
@ -1436,10 +1486,10 @@ bool XdsLb::PickFromChildPolicyLocked(bool force_async, PendingPick* pp,
return pick_done;
}
void XdsLb::CreateChildPolicyLocked(const Args& args) {
void XdsLb::CreateChildPolicyLocked(const char* name, Args args) {
GPR_ASSERT(child_policy_ == nullptr);
child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
"round_robin", args);
name, std::move(args));
if (GPR_UNLIKELY(child_policy_ == nullptr)) {
gpr_log(GPR_ERROR, "[xdslb %p] Failure creating a child policy", this);
return;
@ -1512,26 +1562,43 @@ void XdsLb::CreateOrUpdateChildPolicyLocked() {
if (shutting_down_) return;
grpc_channel_args* args = CreateChildPolicyArgsLocked();
GPR_ASSERT(args != nullptr);
const char* child_policy_name = nullptr;
grpc_json* child_policy_config = nullptr;
grpc_json* child_policy_json =
grpc_json_parse_string(child_policy_json_string_.get());
// TODO(juanlishen): If the child policy is not configured via service config,
// use whatever algorithm is specified by the balancer.
if (child_policy_json != nullptr) {
child_policy_name = child_policy_json->key;
child_policy_config = child_policy_json->child;
} else {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] No valid child policy LB config", this);
}
child_policy_name = "round_robin";
}
// TODO(juanlishen): Switch policy according to child_policy_config->key.
if (child_policy_ != nullptr) {
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Updating the child policy %p", this,
child_policy_.get());
}
// TODO(vishalpowar): Pass the correct LB config.
child_policy_->UpdateLocked(*args, nullptr);
child_policy_->UpdateLocked(*args, child_policy_config);
} else {
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner();
lb_policy_args.client_channel_factory = client_channel_factory();
lb_policy_args.subchannel_pool = subchannel_pool();
lb_policy_args.subchannel_pool = subchannel_pool()->Ref();
lb_policy_args.args = args;
CreateChildPolicyLocked(lb_policy_args);
lb_policy_args.lb_config = child_policy_config;
CreateChildPolicyLocked(child_policy_name, std::move(lb_policy_args));
if (grpc_lb_xds_trace.enabled()) {
gpr_log(GPR_INFO, "[xdslb %p] Created a new child policy %p", this,
child_policy_.get());
}
}
grpc_channel_args_destroy(args);
grpc_json_destroy(child_policy_json);
}
void XdsLb::OnChildPolicyRequestReresolutionLocked(void* arg,
@ -1637,7 +1704,7 @@ void XdsLb::OnChildPolicyConnectivityChangedLocked(void* arg,
class XdsFactory : public LoadBalancingPolicyFactory {
public:
OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const override {
LoadBalancingPolicy::Args args) const override {
/* Count the number of gRPC-LB addresses. There must be at least one. */
const ServerAddressList* addresses =
FindServerAddressListChannelArg(args.args);
@ -1650,7 +1717,7 @@ class XdsFactory : public LoadBalancingPolicyFactory {
}
}
if (!found_balancer_address) return nullptr;
return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(args));
return OrphanablePtr<LoadBalancingPolicy>(New<XdsLb>(std::move(args)));
}
const char* name() const override { return kXds; }

@ -31,7 +31,12 @@ class LoadBalancingPolicyFactory {
public:
/// Returns a new LB policy instance.
virtual OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const LoadBalancingPolicy::Args& args) const GRPC_ABSTRACT;
LoadBalancingPolicy::Args args) const {
std::move(args); // Suppress clang-tidy complaint.
// The rest of this is copied from the GRPC_ABSTRACT macro.
gpr_log(GPR_ERROR, "Function marked GRPC_ABSTRACT was not implemented");
GPR_ASSERT(false);
}
/// Returns the LB policy name that this factory provides.
/// Caller does NOT take ownership of result.

@ -84,14 +84,14 @@ void LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
OrphanablePtr<LoadBalancingPolicy>
LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
const char* name, const LoadBalancingPolicy::Args& args) {
const char* name, LoadBalancingPolicy::Args args) {
GPR_ASSERT(g_state != nullptr);
// Find factory.
LoadBalancingPolicyFactory* factory =
g_state->GetLoadBalancingPolicyFactory(name);
if (factory == nullptr) return nullptr; // Specified name not found.
// Create policy via factory.
return factory->CreateLoadBalancingPolicy(args);
return factory->CreateLoadBalancingPolicy(std::move(args));
}
bool LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(const char* name) {

@ -46,7 +46,7 @@ class LoadBalancingPolicyRegistry {
/// Creates an LB policy of the type specified by \a name.
static OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
const char* name, const LoadBalancingPolicy::Args& args);
const char* name, LoadBalancingPolicy::Args args);
/// Returns true if the LB policy factory specified by \a name exists in this
/// registry.

@ -676,7 +676,7 @@ void RequestRouter::CreateNewLbPolicyLocked(
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.combiner = combiner_;
lb_policy_args.client_channel_factory = client_channel_factory_;
lb_policy_args.subchannel_pool = &subchannel_pool_;
lb_policy_args.subchannel_pool = subchannel_pool_;
lb_policy_args.args = resolver_result_;
lb_policy_args.lb_config = lb_config;
OrphanablePtr<LoadBalancingPolicy> new_lb_policy =

@ -478,8 +478,7 @@ static grpc_address_resolver_vtable ares_resolver = {
grpc_resolve_address_ares, blocking_resolve_address_ares};
static bool should_use_ares(const char* resolver_env) {
return resolver_env == nullptr || strlen(resolver_env) == 0 ||
gpr_stricmp(resolver_env, "ares") == 0;
return resolver_env != nullptr && gpr_stricmp(resolver_env, "ares") == 0;
}
void grpc_resolver_dns_ares_init() {

@ -548,13 +548,13 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
r, name, default_port);
// Early out if the target is an ipv4 or ipv6 literal.
if (resolve_as_ip_literal_locked(name, default_port, addrs)) {
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_ares_complete_request_locked(r);
return r;
}
// Early out if the target is localhost and we're on Windows.
if (grpc_ares_maybe_resolve_localhost_manually_locked(name, default_port,
addrs)) {
GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_NONE);
grpc_ares_complete_request_locked(r);
return r;
}
// Don't query for SRV and TXT records if the target is "localhost", so

@ -141,42 +141,14 @@ void ProcessedResolverResult::ParseServiceConfig(
void ProcessedResolverResult::ParseLbConfigFromServiceConfig(
const grpc_json* field) {
if (lb_policy_config_ != nullptr) return; // Already found.
// Find the LB config global parameter.
if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0 ||
field->type != GRPC_JSON_ARRAY) {
return; // Not valid lb config array.
if (field->key == nullptr || strcmp(field->key, "loadBalancingConfig") != 0) {
return; // Not the LB config global parameter.
}
// Find the first LB policy that this client supports.
for (grpc_json* lb_config = field->child; lb_config != nullptr;
lb_config = lb_config->next) {
if (lb_config->type != GRPC_JSON_OBJECT) return;
// Find the policy object.
grpc_json* policy = nullptr;
for (grpc_json* field = lb_config->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || strcmp(field->key, "policy") != 0 ||
field->type != GRPC_JSON_OBJECT) {
return;
}
if (policy != nullptr) return; // Duplicate.
policy = field;
}
// Find the specific policy content since the policy object is of type
// "oneof".
grpc_json* policy_content = nullptr;
for (grpc_json* field = policy->child; field != nullptr;
field = field->next) {
if (field->key == nullptr || field->type != GRPC_JSON_OBJECT) return;
if (policy_content != nullptr) return; // Violate "oneof" type.
policy_content = field;
}
// If we support this policy, then select it.
if (grpc_core::LoadBalancingPolicyRegistry::LoadBalancingPolicyExists(
policy_content->key)) {
lb_policy_name_.reset(gpr_strdup(policy_content->key));
lb_policy_config_ = policy_content->child;
return;
}
const grpc_json* policy =
LoadBalancingPolicy::ParseLoadBalancingConfig(field);
if (policy != nullptr) {
lb_policy_name_.reset(gpr_strdup(policy->key));
lb_policy_config_ = policy->child;
}
}

@ -236,6 +236,7 @@ class ConnectedSubchannelStateWatcher
GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
}
// Must be called while holding subchannel_->mu.
void Orphan() override { health_check_client_.reset(); }
private:
@ -302,12 +303,12 @@ class ConnectedSubchannelStateWatcher
static void OnHealthChanged(void* arg, grpc_error* error) {
auto* self = static_cast<ConnectedSubchannelStateWatcher*>(arg);
grpc_subchannel* c = self->subchannel_;
MutexLock lock(&c->mu);
if (self->health_state_ == GRPC_CHANNEL_SHUTDOWN) {
self->Unref();
return;
}
grpc_subchannel* c = self->subchannel_;
MutexLock lock(&c->mu);
if (self->last_connectivity_state_ == GRPC_CHANNEL_READY) {
grpc_connectivity_state_set(&c->state_and_health_tracker,
self->health_state_, GRPC_ERROR_REF(error),

@ -43,6 +43,7 @@
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@ -963,6 +964,10 @@ void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
static grpc_closure_scheduler* write_scheduler(grpc_chttp2_transport* t,
bool early_results_scheduled,
bool partial_write) {
// If we're already in a background poller, don't offload this to an executor
if (grpc_iomgr_is_any_background_poller_thread()) {
return grpc_schedule_on_exec_ctx;
}
/* if it's not the first write in a batch, always offload to the executor:
we'll probably end up queuing against the kernel anyway, so we'll likely
get better latency overall if we switch writing work elsewhere and continue

@ -115,6 +115,7 @@ grpc_closure_scheduler* grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
namespace grpc_core {
GPR_TLS_CLASS_DEF(ExecCtx::exec_ctx_);
GPR_TLS_CLASS_DEF(ApplicationCallbackExecCtx::callback_exec_ctx_);
// WARNING: for testing purposes only!
void ExecCtx::TestOnlyGlobalInit(gpr_timespec new_val) {

@ -21,12 +21,14 @@
#include <grpc/support/port_platform.h>
#include <grpc/impl/codegen/grpc_types.h>
#include <grpc/support/atm.h>
#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include "src/core/lib/gpr/tls.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/closure.h"
typedef int64_t grpc_millis;
@ -34,9 +36,8 @@ typedef int64_t grpc_millis;
#define GRPC_MILLIS_INF_FUTURE INT64_MAX
#define GRPC_MILLIS_INF_PAST INT64_MIN
/** A workqueue represents a list of work to be executed asynchronously.
Forward declared here to avoid a circular dependency with workqueue.h. */
typedef struct grpc_workqueue grpc_workqueue;
/** A combiner represents a list of work to be executed later.
Forward declared here to avoid a circular dependency with combiner.h. */
typedef struct grpc_combiner grpc_combiner;
/* This exec_ctx is ready to return: either pre-populated, or cached as soon as
@ -226,6 +227,56 @@ class ExecCtx {
GPR_TLS_CLASS_DECL(exec_ctx_);
ExecCtx* last_exec_ctx_ = Get();
};
class ApplicationCallbackExecCtx {
public:
ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == nullptr) {
grpc_core::Fork::IncExecCtxCount();
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(this));
}
}
~ApplicationCallbackExecCtx() {
if (reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_)) == this) {
while (head_ != nullptr) {
auto* f = head_;
head_ = f->internal_next;
if (f->internal_next == nullptr) {
tail_ = nullptr;
}
(*f->functor_run)(f, f->internal_success);
}
gpr_tls_set(&callback_exec_ctx_, reinterpret_cast<intptr_t>(nullptr));
grpc_core::Fork::DecExecCtxCount();
} else {
GPR_DEBUG_ASSERT(head_ == nullptr);
GPR_DEBUG_ASSERT(tail_ == nullptr);
}
}
static void Enqueue(grpc_experimental_completion_queue_functor* functor,
int is_success) {
functor->internal_success = is_success;
functor->internal_next = nullptr;
auto* ctx = reinterpret_cast<ApplicationCallbackExecCtx*>(
gpr_tls_get(&callback_exec_ctx_));
if (ctx->head_ == nullptr) {
ctx->head_ = functor;
}
if (ctx->tail_ != nullptr) {
ctx->tail_->internal_next = functor;
}
ctx->tail_ = functor;
}
private:
grpc_experimental_completion_queue_functor* head_{nullptr};
grpc_experimental_completion_queue_functor* tail_{nullptr};
GPR_TLS_CLASS_DECL(callback_exec_ctx_);
};
} // namespace grpc_core
#endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */

@ -111,6 +111,13 @@ size_t Executor::RunClosures(const char* executor_name,
grpc_closure_list list) {
size_t n = 0;
// In the executor, the ExecCtx for the thread is declared in the executor
// thread itself, but this is the point where we could start seeing
// application-level callbacks. No need to create a new ExecCtx, though,
// since there already is one and it is flushed (but not destructed) in this
// function itself.
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_closure* c = list.head;
while (c != nullptr) {
grpc_closure* next = c->next_data.next;

@ -105,6 +105,13 @@ void grpc_timer_manager_tick() {
}
static void run_some_timers() {
// In the case of timers, the ExecCtx for the thread is declared
// in the timer thread itself, but this is the point where we
// could start seeing application-level callbacks. No need to
// create a new ExecCtx, though, since there already is one and it is
// flushed (but not destructed) in this function itself
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
// if there's something to execute...
gpr_mu_lock(&g_mu);
// remove a waiter from the pool, and start another thread if necessary

@ -556,6 +556,7 @@ void grpc_call_unref(grpc_call* c) {
GPR_TIMER_SCOPE("grpc_call_unref", 0);
child_call* cc = c->child;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
@ -597,6 +598,7 @@ void grpc_call_unref(grpc_call* c) {
grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
GPR_ASSERT(!reserved);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
cancel_with_error(call, GRPC_ERROR_CANCELLED);
return GRPC_CALL_OK;
@ -646,6 +648,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
grpc_status_code status,
const char* description,
void* reserved) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE(
"grpc_call_cancel_with_status("
@ -1894,7 +1897,6 @@ done_with_error:
grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* tag, void* reserved) {
grpc_core::ExecCtx exec_ctx;
grpc_call_error err;
GRPC_API_TRACE(
@ -1905,6 +1907,8 @@ grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
if (reserved != nullptr) {
err = GRPC_CALL_ERROR;
} else {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
err = call_start_batch(call, ops, nops, tag, 0);
}

@ -868,7 +868,7 @@ static void cq_end_op_for_callback(
GRPC_ERROR_UNREF(error);
auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
(*functor->functor_run)(functor, is_success);
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, is_success);
}
void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
@ -1352,7 +1352,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
GPR_ASSERT(cqd->shutdown_called);
cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
(*callback->functor_run)(callback, true);
grpc_core::ApplicationCallbackExecCtx::Enqueue(callback, true);
}
static void cq_shutdown_callback(grpc_completion_queue* cq) {
@ -1385,6 +1385,7 @@ static void cq_shutdown_callback(grpc_completion_queue* cq) {
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
cq->vtable->shutdown(cq);

@ -1302,6 +1302,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
listener* l;
shutdown_tag* sdt;
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
@ -1369,6 +1370,7 @@ void grpc_server_shutdown_and_notify(grpc_server* server,
void grpc_server_cancel_all_calls(grpc_server* server) {
channel_broadcaster broadcaster;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
@ -1384,6 +1386,7 @@ void grpc_server_cancel_all_calls(grpc_server* server) {
void grpc_server_destroy(grpc_server* server) {
listener* l;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
@ -1469,6 +1472,7 @@ grpc_call_error grpc_server_request_call(
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
@ -1515,11 +1519,11 @@ grpc_call_error grpc_server_request_registered_call(
grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
grpc_completion_queue* cq_bound_to_call,
grpc_completion_queue* cq_for_notification, void* tag) {
grpc_call_error error;
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
registered_method* rm = static_cast<registered_method*>(rmp);
GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
GRPC_API_TRACE(
"grpc_server_request_registered_call("
"server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
@ -1537,19 +1541,17 @@ grpc_call_error grpc_server_request_registered_call(
}
if (cq_idx == server->cq_count) {
gpr_free(rc);
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
}
if ((optional_payload == nullptr) !=
(rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
gpr_free(rc);
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
}
if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
gpr_free(rc);
error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
goto done;
return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
}
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
@ -1561,10 +1563,7 @@ grpc_call_error grpc_server_request_registered_call(
rc->data.registered.deadline = deadline;
rc->initial_metadata = initial_metadata;
rc->data.registered.optional_payload = optional_payload;
error = queue_call_request(server, cq_idx, rc);
done:
return error;
return queue_call_request(server, cq_idx, rc);
}
static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,

@ -30,6 +30,7 @@
#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/transport/transport_impl.h"
@ -63,8 +64,9 @@ void grpc_stream_unref(grpc_stream_refcount* refcount, const char* reason) {
void grpc_stream_unref(grpc_stream_refcount* refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
if (grpc_core::ExecCtx::Get()->flags() &
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP) {
if (!grpc_iomgr_is_any_background_poller_thread() &&
(grpc_core::ExecCtx::Get()->flags() &
GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP)) {
/* Ick.
The thread we're running on MAY be owned (indirectly) by a call-stack.
If that's the case, destroying the call-stack MAY try to destroy the

@ -52,6 +52,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
return true;
}
void Set(::grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
@ -72,6 +73,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
// Don't use any CQ at all. Instead just use the timer to fire the function
callback_ = std::move(f);
@ -87,6 +89,7 @@ class AlarmImpl : public ::grpc::internal::CompletionQueueTag {
&on_alarm_);
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
}

@ -211,8 +211,8 @@ void LoadReporterAsyncServiceImpl::ReportLoadHandler::OnReadDone(
load_key_);
const auto& load_report_interval = initial_request.load_report_interval();
load_report_interval_ms_ =
static_cast<uint64_t>(load_report_interval.seconds() * 1000 +
load_report_interval.nanos() / 1000);
static_cast<unsigned long>(load_report_interval.seconds() * 1000 +
load_report_interval.nanos() / 1000);
gpr_log(
GPR_INFO,
"[LRS %p] Initial request received. Start load reporting (load "

@ -59,7 +59,15 @@ namespace {
#define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
// How many callback requests of each method should we pre-register at start
#define DEFAULT_CALLBACK_REQS_PER_METHOD 32
#define DEFAULT_CALLBACK_REQS_PER_METHOD 512
// What is the (soft) limit for outstanding requests in the server
#define SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING 30000
// If the number of unmatched requests for a method drops below this amount, try
// to allocate extra unless it pushes the total number of callbacks above the
// soft maximum
#define SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD 128
class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
public:
@ -177,11 +185,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
if (grpc_server_request_registered_call(
server, method_tag_, &call_, &deadline_, &request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_,
notify_cq, this)) {
notify_cq, this) != GRPC_CALL_OK) {
TeardownRequest();
return;
}
@ -343,9 +350,10 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
class Server::CallbackRequest final : public internal::CompletionQueueTag {
public:
CallbackRequest(Server* server, internal::RpcServiceMethod* method,
void* method_tag)
CallbackRequest(Server* server, Server::MethodReqList* list,
internal::RpcServiceMethod* method, void* method_tag)
: server_(server),
req_list_(list),
method_(method),
method_tag_(method_tag),
has_request_payload_(
@ -353,12 +361,22 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
method->method_type() == internal::RpcMethod::SERVER_STREAMING),
cq_(server->CallbackCQ()),
tag_(this) {
server_->callback_reqs_outstanding_++;
Setup();
}
~CallbackRequest() { Clear(); }
~CallbackRequest() {
Clear();
// The counter of outstanding requests must be decremented
// under a lock in case it causes the server shutdown.
std::lock_guard<std::mutex> l(server_->callback_reqs_mu_);
if (--server_->callback_reqs_outstanding_ == 0) {
server_->callback_reqs_done_cv_.notify_one();
}
}
void Request() {
bool Request() {
if (method_tag_) {
if (GRPC_CALL_OK !=
grpc_server_request_registered_call(
@ -366,7 +384,7 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
&request_metadata_,
has_request_payload_ ? &request_payload_ : nullptr, cq_->cq(),
cq_->cq(), static_cast<void*>(&tag_))) {
return;
return false;
}
} else {
if (!call_details_) {
@ -376,9 +394,10 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
if (grpc_server_request_call(server_->c_server(), &call_, call_details_,
&request_metadata_, cq_->cq(), cq_->cq(),
static_cast<void*>(&tag_)) != GRPC_CALL_OK) {
return;
return false;
}
}
return true;
}
bool FinalizeResult(void** tag, bool* status) override { return false; }
@ -409,10 +428,48 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
GPR_ASSERT(!req_->FinalizeResult(&ignored, &new_ok));
GPR_ASSERT(ignored == req_);
if (!ok) {
// The call has been shutdown
req_->Clear();
return;
bool spawn_new = false;
{
std::unique_lock<std::mutex> l(req_->req_list_->reqs_mu);
req_->req_list_->reqs_list.erase(req_->req_list_iterator_);
req_->req_list_->reqs_list_sz--;
if (!ok) {
// The call has been shutdown.
// Delete its contents to free up the request.
// First release the lock in case the deletion of the request
// completes the full server shutdown and allows the destructor
// of the req_list to proceed.
l.unlock();
delete req_;
return;
}
// If this was the last request in the list or it is below the soft
// minimum and there are spare requests available, set up a new one, but
// do it outside the lock since the Request could otherwise deadlock
if (req_->req_list_->reqs_list_sz == 0 ||
(req_->req_list_->reqs_list_sz <
SOFT_MINIMUM_SPARE_CALLBACK_REQS_PER_METHOD &&
req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING)) {
spawn_new = true;
}
}
if (spawn_new) {
auto* new_req = new CallbackRequest(req_->server_, req_->req_list_,
req_->method_, req_->method_tag_);
if (!new_req->Request()) {
// The server must have just decided to shutdown. Erase
// from the list under lock but release the lock before
// deleting the new_req (in case that request was what
// would allow the destruction of the req_list)
{
std::lock_guard<std::mutex> l(new_req->req_list_->reqs_mu);
new_req->req_list_->reqs_list.erase(new_req->req_list_iterator_);
new_req->req_list_->reqs_list_sz--;
}
delete new_req;
}
}
// Bind the call, deadline, and metadata from what we got
@ -462,17 +519,30 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
[this] {
req_->Reset();
req_->Request();
// Recycle this request if there aren't too many outstanding.
// Note that we don't have to worry about a case where there
// are no requests waiting to match for this method since that
// is already taken care of when binding a request to a call.
// TODO(vjpai): Also don't recycle this request if the dynamic
// load no longer justifies it. Consider measuring
// dynamic load and setting a target accordingly.
if (req_->server_->callback_reqs_outstanding_ <
SOFT_MAXIMUM_CALLBACK_REQS_OUTSTANDING) {
req_->Clear();
req_->Setup();
} else {
// We can free up this request because there are too many
delete req_;
return;
}
if (!req_->Request()) {
// The server must have just decided to shutdown.
delete req_;
}
}));
}
};
void Reset() {
Clear();
Setup();
}
void Clear() {
if (call_details_) {
delete call_details_;
@ -492,9 +562,15 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
request_payload_ = nullptr;
request_ = nullptr;
request_status_ = Status();
std::lock_guard<std::mutex> l(req_list_->reqs_mu);
req_list_->reqs_list.push_front(this);
req_list_->reqs_list_sz++;
req_list_iterator_ = req_list_->reqs_list.begin();
}
Server* const server_;
Server::MethodReqList* req_list_;
Server::MethodReqList::iterator req_list_iterator_;
internal::RpcServiceMethod* const method_;
void* const method_tag_;
const bool has_request_payload_;
@ -715,6 +791,13 @@ Server::~Server() {
}
grpc_server_destroy(server_);
for (auto* method_list : callback_reqs_) {
// The entries of the method_list should have already been emptied
// during Shutdown as each request is failed by Shutdown. Check that
// this actually happened.
GPR_ASSERT(method_list->reqs_list.empty());
delete method_list;
}
}
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
@ -794,10 +877,12 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
}
} else {
// a callback method. Register at least some callback requests
callback_reqs_.push_back(new Server::MethodReqList);
auto* method_req_list = callback_reqs_.back();
// TODO(vjpai): Register these dynamically based on need
for (int i = 0; i < DEFAULT_CALLBACK_REQS_PER_METHOD; i++) {
auto* req = new CallbackRequest(this, method, method_registration_tag);
callback_reqs_.emplace_back(req);
new CallbackRequest(this, method_req_list, method,
method_registration_tag);
}
// Enqueue it so that it will be Request'ed later once
// all request matchers are created at core server startup
@ -889,8 +974,10 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
(*it)->Start();
}
for (auto& cbreq : callback_reqs_) {
cbreq->Request();
for (auto* cbmethods : callback_reqs_) {
for (auto* cbreq : cbmethods->reqs_list) {
GPR_ASSERT(cbreq->Request());
}
}
if (default_health_check_service_impl != nullptr) {
@ -900,49 +987,69 @@ void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
void Server::ShutdownInternal(gpr_timespec deadline) {
std::unique_lock<std::mutex> lock(mu_);
if (!shutdown_) {
shutdown_ = true;
if (shutdown_) {
return;
}
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
shutdown_ = true;
shutdown_cq.Shutdown();
/// The completion queue to use for server shutdown completion notification
CompletionQueue shutdown_cq;
ShutdownTag shutdown_tag; // Dummy shutdown tag
grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
shutdown_cq.Shutdown();
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
void* tag;
bool ok;
CompletionQueue::NextStatus status =
shutdown_cq.AsyncNext(&tag, &ok, deadline);
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// If this timed out, it means we are done with the grace period for a clean
// shutdown. We should force a shutdown now by cancelling all inflight calls
if (status == CompletionQueue::NextStatus::TIMEOUT) {
grpc_server_cancel_all_calls(server_);
}
// Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
// successfully shutdown
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
// Shutdown all ThreadManagers. This will try to gracefully stop all the
// threads in the ThreadManagers (once they process any inflight requests)
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Shutdown(); // ThreadManager's Shutdown()
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
// Wait for threads in all ThreadManagers to terminate
for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
(*it)->Wait();
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
// Wait for all outstanding callback requests to complete
// (whether waiting for a match or already active).
// We know that no new requests will be created after this point
// because they are only created at server startup time or when
// we have a successful match on a request. During the shutdown phase,
// requests that have not yet matched will be failed rather than
// allowed to succeed, which will cause the server to delete the
// request and decrement the count. Possibly a request will match before
// the shutdown but then find that shutdown has already started by the
// time it tries to register a new request. In that case, the registration
// will report a failure, indicating a shutdown and again we won't end
// up incrementing the counter.
{
std::unique_lock<std::mutex> cblock(callback_reqs_mu_);
callback_reqs_done_cv_.wait(
cblock, [this] { return callback_reqs_outstanding_ == 0; });
}
// Drain the shutdown queue (if the previous call to AsyncNext() timed out
// and we didn't remove the tag from the queue yet)
while (shutdown_cq.Next(&tag, &ok)) {
// Nothing to be done here. Just ignore ok and tag values
}
shutdown_notified_ = true;
shutdown_cv_.notify_all();
}
void Server::Wait() {

@ -45,7 +45,7 @@ namespace Grpc.Core.Tests
var writeOptions = new WriteOptions();
Assert.AreSame(writeOptions, options.WithWriteOptions(writeOptions).WriteOptions);
var propagationToken = new ContextPropagationToken(CallSafeHandle.NullInstance, DateTime.UtcNow,
var propagationToken = new ContextPropagationTokenImpl(CallSafeHandle.NullInstance, DateTime.UtcNow,
CancellationToken.None, ContextPropagationOptions.Default);
Assert.AreSame(propagationToken, options.WithPropagationToken(propagationToken).PropagationToken);
@ -72,13 +72,13 @@ namespace Grpc.Core.Tests
Assert.AreEqual(DateTime.MaxValue, new CallOptions().Normalize().Deadline.Value);
var deadline = DateTime.UtcNow;
var propagationToken1 = new ContextPropagationToken(CallSafeHandle.NullInstance, deadline, CancellationToken.None,
var propagationToken1 = new ContextPropagationTokenImpl(CallSafeHandle.NullInstance, deadline, CancellationToken.None,
new ContextPropagationOptions(propagateDeadline: true, propagateCancellation: false));
Assert.AreEqual(deadline, new CallOptions(propagationToken: propagationToken1).Normalize().Deadline.Value);
Assert.Throws(typeof(ArgumentException), () => new CallOptions(deadline: deadline, propagationToken: propagationToken1).Normalize());
var token = new CancellationTokenSource().Token;
var propagationToken2 = new ContextPropagationToken(CallSafeHandle.NullInstance, deadline, token,
var propagationToken2 = new ContextPropagationTokenImpl(CallSafeHandle.NullInstance, deadline, token,
new ContextPropagationOptions(propagateDeadline: false, propagateCancellation: true));
Assert.AreEqual(token, new CallOptions(propagationToken: propagationToken2).Normalize().CancellationToken);
Assert.Throws(typeof(ArgumentException), () => new CallOptions(cancellationToken: token, propagationToken: propagationToken2).Normalize());

@ -72,7 +72,7 @@ namespace Grpc.Core.Tests
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
var propagationToken = context.CreatePropagationToken();
Assert.IsNotNull(propagationToken.ParentCall);
Assert.IsNotNull(propagationToken.AsImplOrNull().ParentCall);
var callOptions = new CallOptions(propagationToken: propagationToken);
try
@ -154,5 +154,28 @@ namespace Grpc.Core.Tests
await call.RequestStream.CompleteAsync();
Assert.AreEqual("PASS", await call);
}
[Test]
public void ForeignPropagationTokenInterpretedAsNull()
{
Assert.IsNull(new ForeignContextPropagationToken().AsImplOrNull());
}
[Test]
public async Task ForeignPropagationTokenIsIgnored()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
return Task.FromResult("PASS");
});
var callOptions = new CallOptions(propagationToken: new ForeignContextPropagationToken());
await Calls.AsyncUnaryCall(helper.CreateUnaryCall(callOptions), "xyz");
}
// For testing, represents context propagation token that's not generated by Grpc.Core
private class ForeignContextPropagationToken : ContextPropagationToken
{
}
}
}

@ -236,22 +236,24 @@ namespace Grpc.Core
internal CallOptions Normalize()
{
var newOptions = this;
if (propagationToken != null)
// silently ignore the context propagation token if it wasn't produced by "us"
var propagationTokenImpl = propagationToken.AsImplOrNull();
if (propagationTokenImpl != null)
{
if (propagationToken.Options.IsPropagateDeadline)
if (propagationTokenImpl.Options.IsPropagateDeadline)
{
GrpcPreconditions.CheckArgument(!newOptions.deadline.HasValue,
"Cannot propagate deadline from parent call. The deadline has already been set explicitly.");
newOptions.deadline = propagationToken.ParentDeadline;
newOptions.deadline = propagationTokenImpl.ParentDeadline;
}
if (propagationToken.Options.IsPropagateCancellation)
if (propagationTokenImpl.Options.IsPropagateCancellation)
{
GrpcPreconditions.CheckArgument(!newOptions.cancellationToken.CanBeCanceled,
"Cannot propagate cancellation token from parent call. The cancellation token has already been set to a non-default value.");
newOptions.cancellationToken = propagationToken.ParentCancellationToken;
newOptions.cancellationToken = propagationTokenImpl.ParentCancellationToken;
}
}
newOptions.headers = newOptions.headers ?? Metadata.Empty;
newOptions.deadline = newOptions.deadline ?? DateTime.MaxValue;
return newOptions;

@ -0,0 +1,59 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Options for <see cref="ContextPropagationToken"/>.
/// </summary>
public class ContextPropagationOptions
{
/// <summary>
/// The context propagation options that will be used by default.
/// </summary>
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
bool propagateDeadline;
bool propagateCancellation;
/// <summary>
/// Creates new context propagation options.
/// </summary>
/// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param>
/// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param>
public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true)
{
this.propagateDeadline = propagateDeadline;
this.propagateCancellation = propagateCancellation;
}
/// <summary><c>true</c> if parent call's deadline should be propagated to the child call.</summary>
public bool IsPropagateDeadline
{
get { return this.propagateDeadline; }
}
/// <summary><c>true</c> if parent call's cancellation token should be propagated to the child call.</summary>
public bool IsPropagateCancellation
{
get { return this.propagateCancellation; }
}
}
}

@ -16,12 +16,6 @@
#endregion
using System;
using System.Threading;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
@ -29,127 +23,13 @@ namespace Grpc.Core
/// In situations when a backend is making calls to another backend,
/// it makes sense to propagate properties like deadline and cancellation
/// token of the server call to the child call.
/// The gRPC native layer provides some other contexts (like tracing context) that
/// are not accessible to explicitly C# layer, but this token still allows propagating them.
/// </summary>
public class ContextPropagationToken
{
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
/// and cancellation token by our own means.
/// </summary>
internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
& ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
readonly CallSafeHandle parentCall;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly ContextPropagationOptions options;
internal ContextPropagationToken(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
{
this.parentCall = GrpcPreconditions.CheckNotNull(parentCall);
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.options = options ?? ContextPropagationOptions.Default;
}
/// <summary>
/// Gets the native handle of the parent call.
/// </summary>
internal CallSafeHandle ParentCall
{
get
{
return this.parentCall;
}
}
/// <summary>
/// Gets the parent call's deadline.
/// </summary>
internal DateTime ParentDeadline
{
get
{
return this.deadline;
}
}
/// <summary>
/// Gets the parent call's cancellation token.
/// </summary>
internal CancellationToken ParentCancellationToken
{
get
{
return this.cancellationToken;
}
}
/// <summary>
/// Get the context propagation options.
/// </summary>
internal ContextPropagationOptions Options
{
get
{
return this.options;
}
}
}
/// <summary>
/// Options for <see cref="ContextPropagationToken"/>.
/// Underlying gRPC implementation may provide other "opaque" contexts (like tracing context) that
/// are not explicitly accesible via the public C# API, but this token still allows propagating them.
/// </summary>
public class ContextPropagationOptions
public abstract class ContextPropagationToken
{
/// <summary>
/// The context propagation options that will be used by default.
/// </summary>
public static readonly ContextPropagationOptions Default = new ContextPropagationOptions();
bool propagateDeadline;
bool propagateCancellation;
/// <summary>
/// Creates new context propagation options.
/// </summary>
/// <param name="propagateDeadline">If set to <c>true</c> parent call's deadline will be propagated to the child call.</param>
/// <param name="propagateCancellation">If set to <c>true</c> parent call's cancellation token will be propagated to the child call.</param>
public ContextPropagationOptions(bool propagateDeadline = true, bool propagateCancellation = true)
{
this.propagateDeadline = propagateDeadline;
this.propagateCancellation = propagateCancellation;
}
/// <summary><c>true</c> if parent call's deadline should be propagated to the child call.</summary>
public bool IsPropagateDeadline
internal ContextPropagationToken()
{
get { return this.propagateDeadline; }
}
/// <summary><c>true</c> if parent call's cancellation token should be propagated to the child call.</summary>
public bool IsPropagateCancellation
{
get { return this.propagateCancellation; }
}
}
/// <summary>
/// Context propagation flags from grpc/grpc.h.
/// </summary>
[Flags]
internal enum ContextPropagationFlags
{
Deadline = 1,
CensusStatsContext = 2,
CensusTracingContext = 4,
Cancellation = 8
}
}

@ -494,13 +494,13 @@ namespace Grpc.Core.Internal
return injectedNativeCall; // allows injecting a mock INativeCall in tests.
}
var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance;
var parentCall = details.Options.PropagationToken.AsImplOrNull()?.ParentCall ?? CallSafeHandle.NullInstance;
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
parentCall, ContextPropagationTokenImpl.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
}

@ -0,0 +1,34 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
namespace Grpc.Core.Internal
{
/// <summary>
/// Context propagation flags from grpc/grpc.h.
/// </summary>
[Flags]
internal enum ContextPropagationFlags
{
Deadline = 1,
CensusStatsContext = 2,
CensusTracingContext = 4,
Cancellation = 8
}
}

@ -0,0 +1,119 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Implementation of <c>ContextPropagationToken</c> that carries
/// all fields needed for context propagation by C-core based implementation of gRPC.
/// Instances of <c>ContextPropagationToken</c> that are not of this
/// type will be recognized as "foreign" and will be silently ignored
/// (treated as if null).
/// </summary>
internal class ContextPropagationTokenImpl : ContextPropagationToken
{
/// <summary>
/// Default propagation mask used by C core.
/// </summary>
private const ContextPropagationFlags DefaultCoreMask = (ContextPropagationFlags)0xffff;
/// <summary>
/// Default propagation mask used by C# - we want to propagate deadline
/// and cancellation token by our own means, everything else will be propagated
/// by C core automatically (according to <c>DefaultCoreMask</c>).
/// </summary>
internal const ContextPropagationFlags DefaultMask = DefaultCoreMask
& ~ContextPropagationFlags.Deadline & ~ContextPropagationFlags.Cancellation;
readonly CallSafeHandle parentCall;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly ContextPropagationOptions options;
internal ContextPropagationTokenImpl(CallSafeHandle parentCall, DateTime deadline, CancellationToken cancellationToken, ContextPropagationOptions options)
{
this.parentCall = GrpcPreconditions.CheckNotNull(parentCall);
this.deadline = deadline;
this.cancellationToken = cancellationToken;
this.options = options ?? ContextPropagationOptions.Default;
}
/// <summary>
/// Gets the native handle of the parent call.
/// </summary>
internal CallSafeHandle ParentCall
{
get
{
return this.parentCall;
}
}
/// <summary>
/// Gets the parent call's deadline.
/// </summary>
internal DateTime ParentDeadline
{
get
{
return this.deadline;
}
}
/// <summary>
/// Gets the parent call's cancellation token.
/// </summary>
internal CancellationToken ParentCancellationToken
{
get
{
return this.cancellationToken;
}
}
/// <summary>
/// Get the context propagation options.
/// </summary>
internal ContextPropagationOptions Options
{
get
{
return this.options;
}
}
}
internal static class ContextPropagationTokenExtensions
{
/// <summary>
/// Converts given <c>ContextPropagationToken</c> to <c>ContextPropagationTokenImpl</c>
/// if possible or returns null.
/// Being able to convert means that the context propagation token is recognized as
/// "ours" (was created by this implementation).
/// </summary>
public static ContextPropagationTokenImpl AsImplOrNull(this ContextPropagationToken instanceOrNull)
{
return instanceOrNull as ContextPropagationTokenImpl;
}
}
}

@ -63,7 +63,7 @@ namespace Grpc.Core
protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions options)
{
return new ContextPropagationToken(callHandle, deadline, cancellationToken, options);
return new ContextPropagationTokenImpl(callHandle, deadline, cancellationToken, options);
}
protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)

@ -105,6 +105,7 @@ Pod::Spec.new do |s|
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
# Restrict the gRPC runtime version to the one supported by this plugin.
s.dependency 'gRPC-ProtoRPC', v

@ -1,4 +1,5 @@
# This file has been automatically generated from a template file.
# Please make modifications to
# `templates/src/objective-c/BoringSSL-GRPC.podspec.template` instead. This
@ -38,7 +39,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL-GRPC'
version = '0.0.2'
version = '0.0.3'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google\'s needs.'
# Adapted from the homepage:
@ -80,6 +81,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = '5.0'
s.osx.deployment_target = '10.7'
s.tvos.deployment_target = '10.0'
name = 'openssl_grpc'

@ -22,9 +22,17 @@ cd src/php/bin
source ./determine_extension_dir.sh
# in some jenkins macos machine, somehow the PHP build script can't find libgrpc.dylib
export DYLD_LIBRARY_PATH=$root/libs/$CONFIG
php $extension_dir -d max_execution_time=300 $(which phpunit) -v --debug \
--exclude-group persistent_list_bound_tests ../tests/unit_tests
php $extension_dir -d max_execution_time=300 $(which phpunit) -v --debug \
../tests/unit_tests/PersistentChannelTests
export ZEND_DONT_UNLOAD_MODULES=1
export USE_ZEND_ALLOC=0
# Detect whether valgrind is executable
if [ -x "$(command -v valgrind)" ]; then
valgrind --error-exitcode=10 --leak-check=yes php $extension_dir -d max_execution_time=300 \
../tests/MemoryLeakTest/MemoryLeakTest.php
fi

File diff suppressed because it is too large Load Diff

@ -86,6 +86,16 @@ class CallTest extends PHPUnit_Framework_TestCase
$this->assertTrue($result->send_metadata);
}
public function testAddMultiAndMultiValueMetadata()
{
$batch = [
Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1', 'value2'],
'key2' => ['value3', 'value4'],],
];
$result = $this->call->startBatch($batch);
$this->assertTrue($result->send_metadata);
}
public function testGetPeer()
{
$this->assertTrue(is_string($this->call->getPeer()));

@ -14,11 +14,25 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
grpc_package(name = "core", visibility = "public")
grpc_package(
name = "core",
visibility = "public",
)
grpc_proto_library(
name = "stats_proto",
srcs = ["stats.proto"],
)
py_proto_library(
name = "py_stats_proto",
protos = ["stats.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)

@ -14,11 +14,14 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
load("//bazel:grpc_build_system.bzl", "grpc_package", "grpc_proto_library")
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
grpc_package(name = "testing", visibility = "public")
grpc_package(
name = "testing",
visibility = "public",
)
exports_files([
"echo.proto",
@ -50,9 +53,11 @@ grpc_proto_library(
grpc_proto_library(
name = "echo_proto",
srcs = ["echo.proto"],
deps = ["echo_messages_proto",
"simple_messages_proto"],
generate_mocks = True,
deps = [
"echo_messages_proto",
"simple_messages_proto",
],
)
grpc_proto_library(
@ -63,10 +68,10 @@ grpc_proto_library(
py_proto_library(
name = "py_empty_proto",
protos = ["empty.proto",],
protos = ["empty.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
@ -78,10 +83,10 @@ grpc_proto_library(
py_proto_library(
name = "py_messages_proto",
protos = ["messages.proto",],
protos = ["messages.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
@ -100,7 +105,7 @@ grpc_proto_library(
name = "benchmark_service_proto",
srcs = ["benchmark_service.proto"],
deps = [
"messages_proto",
"messages_proto",
],
)
@ -108,7 +113,7 @@ grpc_proto_library(
name = "report_qps_scenario_service_proto",
srcs = ["report_qps_scenario_service.proto"],
deps = [
"control_proto",
"control_proto",
],
)
@ -116,7 +121,7 @@ grpc_proto_library(
name = "worker_service_proto",
srcs = ["worker_service.proto"],
deps = [
"control_proto",
"control_proto",
],
)
@ -132,7 +137,7 @@ grpc_proto_library(
has_services = False,
deps = [
"//src/proto/grpc/core:stats_proto",
]
],
)
grpc_proto_library(
@ -146,14 +151,71 @@ grpc_proto_library(
py_proto_library(
name = "py_test_proto",
protos = ["test.proto",],
proto_deps = [
":py_empty_proto",
":py_messages_proto",
],
protos = ["test.proto"],
with_grpc = True,
deps = [
requirement('protobuf'),
requirement("protobuf"),
],
)
py_proto_library(
name = "py_benchmark_service_proto",
proto_deps = [
":py_empty_proto",
":py_messages_proto",
]
],
protos = ["benchmark_service.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_payloads_proto",
protos = ["payloads.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_stats_proto",
proto_deps = [
"//src/proto/grpc/core:py_stats_proto",
],
protos = ["stats.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_control_proto",
proto_deps = [
":py_payloads_proto",
":py_stats_proto",
],
protos = ["control.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)
py_proto_library(
name = "py_worker_service_proto",
proto_deps = [
":py_control_proto",
],
protos = ["worker_service.proto"],
with_grpc = True,
deps = [
requirement("protobuf"),
],
)

@ -498,7 +498,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
self._context = cygrpc.build_census_context()
def _prepare(self, request, timeout, metadata, wait_for_ready):
deadline, serialized_request, rendezvous = _start_unary_request(
@ -589,7 +589,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
self._context = cygrpc.build_census_context()
def __call__(self,
request,
@ -636,7 +636,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
self._context = cygrpc.build_census_context()
def _blocking(self, request_iterator, timeout, metadata, credentials,
wait_for_ready):
@ -713,7 +713,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
self._method = method
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
self._context = cygrpc.build_context()
self._context = cygrpc.build_census_context()
def __call__(self,
request_iterator,

@ -16,13 +16,13 @@
cdef object _custom_op_on_c_call(int op, grpc_call *call):
raise NotImplementedError("No custom hooks are implemented")
def install_census_context_from_call(Call call):
def install_context_from_call(Call call):
pass
def uninstall_context():
pass
def build_context():
def build_census_context():
pass
cdef class CensusContext:

@ -483,7 +483,7 @@ def _status(rpc_event, state, serialized_response):
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
cygrpc.install_census_context_from_call(rpc_event.call)
cygrpc.install_context_from_call(rpc_event.call)
try:
argument = argument_thunk()
if argument is not None:
@ -500,7 +500,7 @@ def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
cygrpc.install_census_context_from_call(rpc_event.call)
cygrpc.install_context_from_call(rpc_event.call)
try:
argument = argument_thunk()
if argument is not None:

@ -21,6 +21,7 @@ import setuptools
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
CHANNELZ_PROTO = os.path.join(ROOT_DIR,
'../../proto/grpc/channelz/channelz.proto')
LICENSE = os.path.join(ROOT_DIR, '../../../LICENSE')
class Preprocess(setuptools.Command):
@ -41,6 +42,8 @@ class Preprocess(setuptools.Command):
shutil.copyfile(CHANNELZ_PROTO,
os.path.join(ROOT_DIR,
'grpc_channelz/v1/channelz.proto'))
if os.path.isfile(LICENSE):
shutil.copyfile(LICENSE, os.path.join(ROOT_DIR, 'LICENSE'))
class BuildPackageProtos(setuptools.Command):

@ -20,6 +20,7 @@ import setuptools
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
HEALTH_PROTO = os.path.join(ROOT_DIR, '../../proto/grpc/health/v1/health.proto')
LICENSE = os.path.join(ROOT_DIR, '../../../LICENSE')
class Preprocess(setuptools.Command):
@ -40,6 +41,8 @@ class Preprocess(setuptools.Command):
shutil.copyfile(HEALTH_PROTO,
os.path.join(ROOT_DIR,
'grpc_health/v1/health.proto'))
if os.path.isfile(LICENSE):
shutil.copyfile(LICENSE, os.path.join(ROOT_DIR, 'LICENSE'))
class BuildPackageProtos(setuptools.Command):

@ -21,6 +21,7 @@ import setuptools
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
REFLECTION_PROTO = os.path.join(
ROOT_DIR, '../../proto/grpc/reflection/v1alpha/reflection.proto')
LICENSE = os.path.join(ROOT_DIR, '../../../LICENSE')
class Preprocess(setuptools.Command):
@ -42,6 +43,8 @@ class Preprocess(setuptools.Command):
REFLECTION_PROTO,
os.path.join(ROOT_DIR,
'grpc_reflection/v1alpha/reflection.proto'))
if os.path.isfile(LICENSE):
shutil.copyfile(LICENSE, os.path.join(ROOT_DIR, 'LICENSE'))
class BuildPackageProtos(setuptools.Command):

@ -63,11 +63,20 @@ INSTALL_REQUIRES = (
'googleapis-common-protos>=1.5.5',
)
COMMAND_CLASS = {
# wire up commands to no-op not to break the external dependencies
'preprocess': _NoOpCommand,
'build_package_protos': _NoOpCommand,
}
try:
import status_commands as _status_commands
# we are in the build environment, otherwise the above import fails
COMMAND_CLASS = {
# Run preprocess from the repository *before* doing any packaging!
'preprocess': _status_commands.Preprocess,
'build_package_protos': _NoOpCommand,
}
except ImportError:
COMMAND_CLASS = {
# wire up commands to no-op not to break the external dependencies
'preprocess': _NoOpCommand,
'build_package_protos': _NoOpCommand,
}
setuptools.setup(
name='grpcio-status',

@ -0,0 +1,39 @@
# Copyright 2018 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides distutils command classes for the GRPC Python setup process."""
import os
import shutil
import setuptools
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
LICENSE = os.path.join(ROOT_DIR, '../../../LICENSE')
class Preprocess(setuptools.Command):
"""Command to copy LICENSE from root directory."""
description = ''
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
if os.path.isfile(LICENSE):
shutil.copyfile(LICENSE, os.path.join(ROOT_DIR, 'LICENSE'))

@ -50,10 +50,18 @@ INSTALL_REQUIRES = (
'grpcio>={version}'.format(version=grpc_version.VERSION),
)
COMMAND_CLASS = {
# wire up commands to no-op not to break the external dependencies
'preprocess': _NoOpCommand,
}
try:
import testing_commands as _testing_commands
# we are in the build environment, otherwise the above import fails
COMMAND_CLASS = {
# Run preprocess from the repository *before* doing any packaging!
'preprocess': _testing_commands.Preprocess,
}
except ImportError:
COMMAND_CLASS = {
# wire up commands to no-op not to break the external dependencies
'preprocess': _NoOpCommand,
}
setuptools.setup(
name='grpcio-testing',

@ -0,0 +1,39 @@
# Copyright 2018 gRPC Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides distutils command classes for the GRPC Python setup process."""
import os
import shutil
import setuptools
ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
LICENSE = os.path.join(ROOT_DIR, '../../../LICENSE')
class Preprocess(setuptools.Command):
"""Command to copy LICENSE from root directory."""
description = ''
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
if os.path.isfile(LICENSE):
shutil.copyfile(LICENSE, os.path.join(ROOT_DIR, 'LICENSE'))

@ -37,19 +37,13 @@ PACKAGE_DIRECTORIES = {
}
INSTALL_REQUIRES = (
'coverage>=4.0',
'enum34>=1.0.4',
'coverage>=4.0', 'enum34>=1.0.4',
'grpcio>={version}'.format(version=grpc_version.VERSION),
# TODO(https://github.com/pypa/warehouse/issues/5196)
# Re-enable it once we got the name back
# 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-status>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
'oauth2client>=1.4.7',
'protobuf>=3.6.0',
'six>=1.10',
'google-auth>=1.0.0',
'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0',
'requests>=2.14.2')
if not PY3:

@ -91,7 +91,6 @@ def _close_channel_server_pairs(pairs):
pair.channel.close()
@unittest.skip('https://github.com/pypa/warehouse/issues/5196')
class ChannelzServicerTest(unittest.TestCase):
def _send_successful_unary_unary(self, idx):

@ -0,0 +1,103 @@
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
package(default_visibility = ["//visibility:public"])
load("@grpc_python_dependencies//:requirements.bzl", "requirement")
py_library(
name = "benchmark_client",
srcs = ["benchmark_client.py"],
imports = ["../../"],
deps = [
requirement("six"),
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_messages_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:resources",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
py_library(
name = "benchmark_server",
srcs = ["benchmark_server.py"],
imports = ["../../"],
deps = [
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_messages_proto",
],
)
py_library(
name = "client_runner",
srcs = ["client_runner.py"],
imports = ["../../"],
)
py_library(
name = "histogram",
srcs = ["histogram.py"],
imports = ["../../"],
deps = [
"//src/proto/grpc/testing:py_stats_proto",
],
)
py_library(
name = "worker_server",
srcs = ["worker_server.py"],
imports = ["../../"],
deps = [
":benchmark_client",
":benchmark_server",
":client_runner",
":histogram",
"//src/proto/grpc/testing:py_benchmark_service_proto",
"//src/proto/grpc/testing:py_control_proto",
"//src/proto/grpc/testing:py_stats_proto",
"//src/proto/grpc/testing:py_worker_service_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:resources",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
py_binary(
name = "qps_worker",
srcs = ["qps_worker.py"],
imports = ["../../"],
main = "qps_worker.py",
deps = [
":worker_server",
"//src/proto/grpc/testing:py_worker_service_proto",
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_tests/tests/unit:test_common",
],
)
filegroup(
name = "scenarios",
srcs = ["scenarios.json"],
)
sh_test(
name = "basic_benchmark_test",
srcs = ["basic_benchmark_test.sh"],
data = [
":qps_worker",
":scenarios",
"//test/cpp/qps:qps_json_driver",
],
)

@ -0,0 +1,102 @@
# Python Benchmark Tools
## Scenarios
In `src/proto/grpc/testing/control.proto`, it defines the fields of a scenario.
In `tools/run_tests/performance/scenario_config.py`, the script generates actual scenario content that usually in json format, or piped to another script.
All Python related benchmark scenarios are:
* netperf
* python_generic_sync_streaming_ping_pong
* python_protobuf_sync_streaming_ping_pong
* python_protobuf_async_unary_ping_pong
* python_protobuf_sync_unary_ping_pong
* python_protobuf_sync_unary_qps_unconstrained
* python_protobuf_sync_streaming_qps_unconstrained
* python_protobuf_sync_unary_ping_pong_1MB
Here we picked a small but representative subset, and reduce their benchmark duration from 30 seconds to 10 seconds:
* python_protobuf_async_unary_ping_pong
* python_protobuf_sync_streaming_ping_pong
## Why keep the scenario file if it can be generated?
Well... The `tools/run_tests/performance/scenario_config.py` is 1274 lines long. The intention of building these benchmark tools is reducing the complexity of existing infrastructure code. So, instead of calling layers of abstraction to generate the scenario file, keeping a valid static copy is preferable.
Also, if the use case for this tool grows beyond simple static scenarios, we can incorporate automatic generation and selection of scenarios into the tool.
## How to run it?
```shell
bazel test --test_output=streamed src/python/grpcio_tests/tests/qps:basic_benchmark_test
```
## What does the output look like?
```
RUNNING SCENARIO: python_protobuf_async_unary_ping_pong
I0123 00:26:04.746195000 140736237159296 driver.cc:288] Starting server on localhost:10086 (worker #0)
D0123 00:26:04.747190000 140736237159296 ev_posix.cc:170] Using polling engine: poll
D0123 00:26:04.747264000 140736237159296 dns_resolver_ares.cc:488] Using ares dns resolver
I0123 00:26:04.748445000 140736237159296 subchannel.cc:869] Connect failed: {"created":"@1548203164.748403000","description":"Failed to connect to remote host: Connection refused","errno":61,"file":"src/core/lib/iomgr/tcp_client_posix.cc","file_line":207,"os_error":"Connection refused","syscall":"connect","target_address":"ipv6:[::1]:10086"}
I0123 00:26:04.748585000 140736237159296 subchannel.cc:869] Connect failed: {"created":"@1548203164.748564000","description":"Failed to connect to remote host: Connection refused","errno":61,"file":"src/core/lib/iomgr/tcp_client_posix.cc","file_line":207,"os_error":"Connection refused","syscall":"connect","target_address":"ipv4:127.0.0.1:10086"}
I0123 00:26:04.748596000 140736237159296 subchannel.cc:751] Subchannel 0x7fca43c19360: Retry in 999 milliseconds
I0123 00:26:05.751251000 123145571299328 subchannel.cc:710] Failed to connect to channel, retrying
I0123 00:26:05.752209000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca45000060 for subchannel 0x7fca43c19360
I0123 00:26:05.772291000 140736237159296 driver.cc:349] Starting client on localhost:10087 (worker #1)
D0123 00:26:05.772384000 140736237159296 driver.cc:373] Client 0 gets 1 channels
I0123 00:26:05.773286000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca45004a80 for subchannel 0x7fca451034b0
I0123 00:26:05.789797000 140736237159296 driver.cc:394] Initiating
I0123 00:26:05.790858000 140736237159296 driver.cc:415] Warming up
I0123 00:26:07.791078000 140736237159296 driver.cc:421] Starting
I0123 00:26:07.791860000 140736237159296 driver.cc:448] Running
I0123 00:26:17.790915000 140736237159296 driver.cc:462] Finishing clients
I0123 00:26:17.791821000 140736237159296 driver.cc:476] Received final status from client 0
I0123 00:26:17.792148000 140736237159296 driver.cc:508] Finishing servers
I0123 00:26:17.792493000 140736237159296 driver.cc:522] Received final status from server 0
I0123 00:26:17.795786000 140736237159296 report.cc:82] QPS: 2066.6
I0123 00:26:17.795799000 140736237159296 report.cc:122] QPS: 2066.6 (258.3/server core)
I0123 00:26:17.795805000 140736237159296 report.cc:127] Latencies (50/90/95/99/99.9%-ile): 467.9/504.8/539.0/653.3/890.4 us
I0123 00:26:17.795811000 140736237159296 report.cc:137] Server system time: 100.00%
I0123 00:26:17.795815000 140736237159296 report.cc:139] Server user time: 100.00%
I0123 00:26:17.795818000 140736237159296 report.cc:141] Client system time: 100.00%
I0123 00:26:17.795821000 140736237159296 report.cc:143] Client user time: 100.00%
I0123 00:26:17.795825000 140736237159296 report.cc:148] Server CPU usage: 0.00%
I0123 00:26:17.795828000 140736237159296 report.cc:153] Client Polls per Request: 0.00
I0123 00:26:17.795831000 140736237159296 report.cc:155] Server Polls per Request: 0.00
I0123 00:26:17.795834000 140736237159296 report.cc:160] Server Queries/CPU-sec: 1033.19
I0123 00:26:17.795837000 140736237159296 report.cc:162] Client Queries/CPU-sec: 1033.32
RUNNING SCENARIO: python_protobuf_sync_streaming_ping_pong
I0123 00:26:17.795888000 140736237159296 driver.cc:288] Starting server on localhost:10086 (worker #0)
D0123 00:26:17.795964000 140736237159296 ev_posix.cc:170] Using polling engine: poll
D0123 00:26:17.795978000 140736237159296 dns_resolver_ares.cc:488] Using ares dns resolver
I0123 00:26:17.796613000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca43c15820 for subchannel 0x7fca43d12140
I0123 00:26:17.810911000 140736237159296 driver.cc:349] Starting client on localhost:10087 (worker #1)
D0123 00:26:17.811037000 140736237159296 driver.cc:373] Client 0 gets 1 channels
I0123 00:26:17.811892000 140736237159296 subchannel.cc:832] New connected subchannel at 0x7fca43d18f40 for subchannel 0x7fca43d16b80
I0123 00:26:17.818902000 140736237159296 driver.cc:394] Initiating
I0123 00:26:17.820776000 140736237159296 driver.cc:415] Warming up
I0123 00:26:19.824685000 140736237159296 driver.cc:421] Starting
I0123 00:26:19.825970000 140736237159296 driver.cc:448] Running
I0123 00:26:29.821866000 140736237159296 driver.cc:462] Finishing clients
I0123 00:26:29.823259000 140736237159296 driver.cc:476] Received final status from client 0
I0123 00:26:29.827195000 140736237159296 driver.cc:508] Finishing servers
I0123 00:26:29.827599000 140736237159296 driver.cc:522] Received final status from server 0
I0123 00:26:29.828739000 140736237159296 report.cc:82] QPS: 619.5
I0123 00:26:29.828752000 140736237159296 report.cc:122] QPS: 619.5 (77.4/server core)
I0123 00:26:29.828760000 140736237159296 report.cc:127] Latencies (50/90/95/99/99.9%-ile): 1589.8/1854.3/1920.4/2015.8/2204.8 us
I0123 00:26:29.828765000 140736237159296 report.cc:137] Server system time: 100.00%
I0123 00:26:29.828769000 140736237159296 report.cc:139] Server user time: 100.00%
I0123 00:26:29.828772000 140736237159296 report.cc:141] Client system time: 100.00%
I0123 00:26:29.828776000 140736237159296 report.cc:143] Client user time: 100.00%
I0123 00:26:29.828780000 140736237159296 report.cc:148] Server CPU usage: 0.00%
I0123 00:26:29.828784000 140736237159296 report.cc:153] Client Polls per Request: 0.00
I0123 00:26:29.828788000 140736237159296 report.cc:155] Server Polls per Request: 0.00
I0123 00:26:29.828792000 140736237159296 report.cc:160] Server Queries/CPU-sec: 309.58
I0123 00:26:29.828795000 140736237159296 report.cc:162] Client Queries/CPU-sec: 309.75
```
## Future Works (TODOs)
1. Generate a target for each scenario.
2. Simplify the main entrance of our benchmark related code, or make it depends on Bazel.

@ -0,0 +1,45 @@
#! /bin/bash
# Copyright 2019 The gRPC Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This test benchmarks Python client/server.
set -ex
declare -a DRIVER_PORTS=("10086" "10087")
SCENARIOS_FILE=src/python/grpcio_tests/tests/qps/scenarios.json
function join { local IFS="$1"; shift; echo "$*"; }
if [[ -e "${SCENARIOS_FILE}" ]]; then
echo "Running against ${SCENARIOS_FILE}:"
cat "${SCENARIOS_FILE}"
else
echo "Failed to find ${SCENARIOS_FILE}!"
exit 1
fi
echo "Starting Python qps workers..."
qps_workers=()
for DRIVER_PORT in "${DRIVER_PORTS[@]}"
do
echo -e "\tRunning Python qps worker listening at localhost:${DRIVER_PORT}..."
src/python/grpcio_tests/tests/qps/qps_worker \
--driver_port="${DRIVER_PORT}" &
qps_workers+=("localhost:${DRIVER_PORT}")
done
echo "Running qps json driver..."
QPS_WORKERS=$(join , ${qps_workers[@]})
export QPS_WORKERS
test/cpp/qps/qps_json_driver --scenarios_file="${SCENARIOS_FILE}"

@ -0,0 +1,96 @@
{
"scenarios": [
{
"name": "python_protobuf_async_unary_ping_pong",
"clientConfig": {
"clientType": "ASYNC_CLIENT",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"outstandingRpcsPerChannel": 1,
"clientChannels": 1,
"asyncClientThreads": 1,
"loadParams": {
"closedLoop": {}
},
"payloadConfig": {
"simpleParams": {}
},
"histogramParams": {
"resolution": 0.01,
"maxPossible": 60000000000
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numClients": 1,
"serverConfig": {
"serverType": "ASYNC_SERVER",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numServers": 1,
"warmupSeconds": 2,
"benchmarkSeconds": 10
},
{
"name": "python_protobuf_sync_streaming_ping_pong",
"clientConfig": {
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"outstandingRpcsPerChannel": 1,
"clientChannels": 1,
"asyncClientThreads": 1,
"rpcType": "STREAMING",
"loadParams": {
"closedLoop": {}
},
"payloadConfig": {
"simpleParams": {}
},
"histogramParams": {
"resolution": 0.01,
"maxPossible": 60000000000
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numClients": 1,
"serverConfig": {
"serverType": "ASYNC_SERVER",
"securityParams": {
"useTestCa": true,
"serverHostOverride": "foo.test.google.fr"
},
"channelArgs": [
{
"name": "grpc.optimization_target",
"strValue": "latency"
}
]
},
"numServers": 1,
"warmupSeconds": 2,
"benchmarkSeconds": 10
}
]
}

@ -24,10 +24,18 @@ grpc_config = ENV['GRPC_CONFIG'] || 'opt'
ENV['MACOSX_DEPLOYMENT_TARGET'] = '10.7'
ENV['AR'] = RbConfig::CONFIG['AR'] + ' rcs'
ENV['CC'] = RbConfig::CONFIG['CC']
ENV['CXX'] = RbConfig::CONFIG['CXX']
ENV['LD'] = ENV['CC']
if ENV['AR'].nil? || ENV['AR'].size == 0
ENV['AR'] = RbConfig::CONFIG['AR'] + ' rcs'
end
if ENV['CC'].nil? || ENV['CC'].size == 0
ENV['CC'] = RbConfig::CONFIG['CC']
end
if ENV['CXX'].nil? || ENV['CXX'].size == 0
ENV['CXX'] = RbConfig::CONFIG['CXX']
end
if ENV['LD'].nil? || ENV['LD'].size == 0
ENV['LD'] = ENV['CC']
end
ENV['AR'] = 'libtool -o' if RUBY_PLATFORM =~ /darwin/

@ -140,7 +140,7 @@
s.name = 'gRPC-C++'
# TODO (mxyan): use version that match gRPC version when pod is stabilized
# version = '${settings.version}'
version = '${modify_podspec_version_string('0.0.6', settings.version)}'
version = '${modify_podspec_version_string('0.0.8', settings.version)}'
s.version = version
s.summary = 'gRPC C++ library'
s.homepage = 'https://grpc.io'
@ -156,6 +156,8 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpcpp'

@ -99,6 +99,8 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
s.requires_arc = false
name = 'grpc'
@ -174,7 +176,7 @@
ss.header_mappings_dir = '.'
ss.libraries = 'z'
ss.dependency "#{s.name}/Interface", version
ss.dependency 'BoringSSL-GRPC', '0.0.2'
ss.dependency 'BoringSSL-GRPC', '0.0.3'
ss.dependency 'nanopb', '~> 0.3'
ss.compiler_flags = '-DGRPC_SHADOW_BORINGSSL_SYMBOLS'

@ -37,6 +37,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'ProtoRPC'
s.module_name = name

@ -37,6 +37,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'RxLibrary'
s.module_name = name

@ -36,6 +36,7 @@
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
name = 'GRPCClient'
s.module_name = name

@ -107,6 +107,7 @@
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
s.tvos.deployment_target = '10.0'
# Restrict the gRPC runtime version to the one supported by this plugin.
s.dependency 'gRPC-ProtoRPC', v

@ -4,6 +4,7 @@
def expand_symbol_list(symbol_list):
return ',\n '.join("'#define %s GRPC_SHADOW_%s'" % (symbol, symbol) for symbol in symbol_list)
%>
# This file has been automatically generated from a template file.
# Please make modifications to
# `templates/src/objective-c/BoringSSL-GRPC.podspec.template` instead. This
@ -43,7 +44,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL-GRPC'
version = '0.0.2'
version = '0.0.3'
s.version = version
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google\'s needs.'
# Adapted from the homepage:
@ -85,6 +86,7 @@
s.ios.deployment_target = '5.0'
s.osx.deployment_target = '10.7'
s.tvos.deployment_target = '10.0'
name = 'openssl_grpc'

@ -55,6 +55,7 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,

@ -0,0 +1,7 @@
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y ${'\\'}
valgrind

@ -19,6 +19,7 @@
<%include file="../../php7_deps.include"/>
<%include file="../../gcp_api_libraries.include"/>
<%include file="../../python_deps.include"/>
<%include file="../../php_valgrind.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
CMD ["bash"]

@ -20,6 +20,7 @@
<%include file="../../gcp_api_libraries.include"/>
<%include file="../../python_deps.include"/>
<%include file="../../php_deps.include"/>
<%include file="../../php_valgrind.include"/>
<%include file="../../run_tests_addons.include"/>
# Define the default command.
CMD ["bash"]

@ -75,7 +75,7 @@ int main(int argc, char** argv) {
test_succeeds(dns, "dns:www.google.com");
test_succeeds(dns, "dns:///www.google.com");
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
if (resolver_env != nullptr && gpr_stricmp(resolver_env, "native") == 0) {
if (resolver_env == nullptr || gpr_stricmp(resolver_env, "native") == 0) {
test_fails(dns, "dns://8.8.8.8/8.8.8.8:8888");
} else {
test_succeeds(dns, "dns://8.8.8.8/8.8.8.8:8888");

@ -260,6 +260,7 @@ typedef struct final_status_data {
static void server_start_transport_stream_op_batch(
grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
auto* data = static_cast<final_status_data*>(elem->call_data);
gpr_mu_lock(&g_mu);
if (data->call == g_server_call_stack) {
if (op->send_initial_metadata) {
auto* batch = op->payload->send_initial_metadata.send_initial_metadata;
@ -270,6 +271,7 @@ static void server_start_transport_stream_op_batch(
}
}
}
gpr_mu_unlock(&g_mu);
grpc_call_next_op(elem, op);
}

@ -23,6 +23,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <address_sorting/address_sorting.h>
#include <string.h>
#include "src/core/lib/gpr/env.h"
@ -120,6 +122,35 @@ static void must_fail(void* argsp, grpc_error* err) {
gpr_mu_unlock(args->mu);
}
// This test assumes the environment has an ipv6 loopback
static void must_succeed_with_ipv6_first(void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
GPR_ASSERT(args->addrs->naddrs > 0);
const struct sockaddr* first_address =
reinterpret_cast<const struct sockaddr*>(args->addrs->addrs[0].addr);
GPR_ASSERT(first_address->sa_family == AF_INET6);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
static void must_succeed_with_ipv4_first(void* argsp, grpc_error* err) {
args_struct* args = static_cast<args_struct*>(argsp);
GPR_ASSERT(err == GRPC_ERROR_NONE);
GPR_ASSERT(args->addrs != nullptr);
GPR_ASSERT(args->addrs->naddrs > 0);
const struct sockaddr* first_address =
reinterpret_cast<const struct sockaddr*>(args->addrs->addrs[0].addr);
GPR_ASSERT(first_address->sa_family == AF_INET);
gpr_atm_rel_store(&args->done_atm, 1);
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, nullptr));
gpr_mu_unlock(args->mu);
}
static void test_localhost(void) {
grpc_core::ExecCtx exec_ctx;
args_struct args;
@ -146,6 +177,33 @@ static void test_default_port(void) {
args_finish(&args);
}
static void test_localhost_result_has_ipv6_first(void) {
grpc_core::ExecCtx exec_ctx;
args_struct args;
args_init(&args);
grpc_resolve_address("localhost:1", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed_with_ipv6_first, &args,
grpc_schedule_on_exec_ctx),
&args.addrs);
grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&args);
args_finish(&args);
}
static void test_localhost_result_has_ipv4_first_when_ipv6_isnt_available(
void) {
grpc_core::ExecCtx exec_ctx;
args_struct args;
args_init(&args);
grpc_resolve_address("localhost:1", nullptr, args.pollset_set,
GRPC_CLOSURE_CREATE(must_succeed_with_ipv4_first, &args,
grpc_schedule_on_exec_ctx),
&args.addrs);
grpc_core::ExecCtx::Get()->Flush();
poll_pollset_until_request_done(&args);
args_finish(&args);
}
static void test_non_numeric_default_port(void) {
grpc_core::ExecCtx exec_ctx;
args_struct args;
@ -245,6 +303,34 @@ static void test_unparseable_hostports(void) {
}
}
typedef struct mock_ipv6_disabled_source_addr_factory {
address_sorting_source_addr_factory base;
} mock_ipv6_disabled_source_addr_factory;
static bool mock_ipv6_disabled_source_addr_factory_get_source_addr(
address_sorting_source_addr_factory* factory,
const address_sorting_address* dest_addr,
address_sorting_address* source_addr) {
// Mock lack of IPv6. For IPv4, set the source addr to be the same
// as the destination; tests won't actually connect on the result anyways.
if (address_sorting_abstract_get_family(dest_addr) ==
ADDRESS_SORTING_AF_INET6) {
return false;
}
memcpy(source_addr->addr, &dest_addr->addr, dest_addr->len);
source_addr->len = dest_addr->len;
return true;
}
void mock_ipv6_disabled_source_addr_factory_destroy(
address_sorting_source_addr_factory* factory) {}
const address_sorting_source_addr_factory_vtable
kMockIpv6DisabledSourceAddrFactoryVtable = {
mock_ipv6_disabled_source_addr_factory_get_source_addr,
mock_ipv6_disabled_source_addr_factory_destroy,
};
int main(int argc, char** argv) {
// First set the resolver type based off of --resolver
const char* resolver_type = nullptr;
@ -289,11 +375,26 @@ int main(int argc, char** argv) {
// these unit tests under c-ares risks flakiness.
test_invalid_ip_addresses();
test_unparseable_hostports();
} else {
test_localhost_result_has_ipv6_first();
}
grpc_core::Executor::ShutdownAll();
}
gpr_cmdline_destroy(cl);
grpc_shutdown();
// The following test uses
// "address_sorting_override_source_addr_factory_for_testing", which works
// on a per-grpc-init basis, and so it's simplest to run this next test
// within a standalone grpc_init/grpc_shutdown pair.
if (gpr_stricmp(resolver_type, "ares") == 0) {
// Run a test case in which c-ares's address sorter
// thinks that IPv4 is available and IPv6 isn't.
grpc_init();
mock_ipv6_disabled_source_addr_factory factory;
factory.base.vtable = &kMockIpv6DisabledSourceAddrFactoryVtable;
address_sorting_override_source_addr_factory_for_testing(&factory.base);
test_localhost_result_has_ipv4_first_when_ipv6_isnt_available();
grpc_shutdown();
}
return 0;
}

@ -389,46 +389,49 @@ static void test_callback(void) {
attr.cq_shutdown_cb = &shutdown_cb;
for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
grpc_core::ExecCtx exec_ctx; // reset exec_ctx
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
int sumtags = 0;
int counter = 0;
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
{
// reset exec_ctx types
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
attr.cq_polling_type = polling_types[pidx];
cc = grpc_completion_queue_create(
grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
class TagCallback : public grpc_experimental_completion_queue_functor {
public:
TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
functor_run = &TagCallback::Run;
}
~TagCallback() {}
static void Run(grpc_experimental_completion_queue_functor* cb,
int ok) {
GPR_ASSERT(static_cast<bool>(ok));
auto* callback = static_cast<TagCallback*>(cb);
*callback->counter_ += callback->tag_;
grpc_core::Delete(callback);
};
private:
int* counter_;
int tag_;
};
private:
int* counter_;
int tag_;
};
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
int sumtags = 0;
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
sumtags += i;
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
nullptr, &completions[i]);
shutdown_and_destroy(cc);
}
GPR_ASSERT(sumtags == counter);
shutdown_and_destroy(cc);
GPR_ASSERT(got_shutdown);
got_shutdown = false;
}

@ -48,11 +48,17 @@ namespace {
// A minimal forwarding class to avoid implementing a standalone test LB.
class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
public:
ForwardingLoadBalancingPolicy(const Args& args,
ForwardingLoadBalancingPolicy(Args args,
const std::string& delegate_policy_name)
: LoadBalancingPolicy(args) {
: LoadBalancingPolicy(std::move(args)) {
Args delegate_args;
delegate_args.combiner = combiner();
delegate_args.client_channel_factory = client_channel_factory();
delegate_args.subchannel_pool = subchannel_pool()->Ref();
delegate_args.args = args.args;
delegate_args.lb_config = args.lb_config;
delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
delegate_policy_name.c_str(), args);
delegate_policy_name.c_str(), std::move(delegate_args));
grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
interested_parties());
// Give re-resolution closure to delegate.
@ -143,9 +149,8 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
: public ForwardingLoadBalancingPolicy {
public:
InterceptRecvTrailingMetadataLoadBalancingPolicy(
const Args& args, InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: ForwardingLoadBalancingPolicy(args,
Args args, InterceptRecvTrailingMetadataCallback cb, void* user_data)
: ForwardingLoadBalancingPolicy(std::move(args),
/*delegate_lb_policy_name=*/"pick_first"),
cb_(cb),
user_data_(user_data) {}
@ -212,10 +217,10 @@ class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>
CreateLoadBalancingPolicy(
const grpc_core::LoadBalancingPolicy::Args& args) const override {
grpc_core::LoadBalancingPolicy::Args args) const override {
return grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy>(
grpc_core::New<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
args, cb_, user_data_));
std::move(args), cb_, user_data_));
}
const char* name() const override {

@ -222,6 +222,8 @@ grpc_cc_test(
deps = [
":end2end_test_lib",
],
size = "large", # with poll-cv this takes long, see #17493
timeout = "long",
)
grpc_cc_test(

@ -54,6 +54,7 @@ DEFINE_string(
"custom_metadata: server will echo custom metadata;\n"
"empty_stream : bi-di stream with no request/response;\n"
"empty_unary : empty (zero bytes) request and response;\n"
"google_default_credentials: large unary using GDC;\n"
"half_duplex : half-duplex streaming;\n"
"jwt_token_creds: large_unary with JWT token auth;\n"
"large_unary : single request and (large) response;\n"
@ -151,6 +152,11 @@ int main(int argc, char** argv) {
std::bind(&grpc::testing::InteropClient::DoPerRpcCreds, &client,
GetServiceAccountJsonKey());
}
if (FLAGS_custom_credentials_type == "google_default_credentials") {
actions["google_default_credentials"] =
std::bind(&grpc::testing::InteropClient::DoGoogleDefaultCredentials,
&client, FLAGS_default_service_account);
}
actions["status_code_and_message"] =
std::bind(&grpc::testing::InteropClient::DoStatusWithMessage, &client);
actions["custom_metadata"] =

@ -294,6 +294,25 @@ bool InteropClient::DoJwtTokenCreds(const grpc::string& username) {
return true;
}
bool InteropClient::DoGoogleDefaultCredentials(
const grpc::string& default_service_account) {
gpr_log(GPR_DEBUG,
"Sending a large unary rpc with GoogleDefaultCredentials...");
SimpleRequest request;
SimpleResponse response;
request.set_fill_username(true);
if (!PerformLargeUnary(&request, &response)) {
return false;
}
gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
GPR_ASSERT(!response.username().empty());
GPR_ASSERT(response.username().c_str() == default_service_account);
gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
return true;
}
bool InteropClient::DoLargeUnary() {
gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
SimpleRequest request;

@ -89,6 +89,8 @@ class InteropClient {
const grpc::string& oauth_scope);
// username is a string containing the user email
bool DoPerRpcCreds(const grpc::string& json_key);
// username is the GCE default service account email
bool DoGoogleDefaultCredentials(const grpc::string& username);
private:
class ServiceStub {

@ -101,8 +101,6 @@ class DummyEndpoint : public grpc_endpoint {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
static grpc_workqueue* get_workqueue(grpc_endpoint* ep) { return nullptr; }
static void add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
static void add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset) {

@ -55,6 +55,7 @@ if cur_resolver and cur_resolver != 'ares':
'needs to use GRPC_DNS_RESOLVER=ares.'))
test_runner_log('Exit 1 without running tests.')
sys.exit(1)
os.environ.update({'GRPC_DNS_RESOLVER': 'ares'})
os.environ.update({'GRPC_TRACE': 'cares_resolver'})
def wait_until_dns_server_is_up(args,

@ -14,8 +14,10 @@
licenses(["notice"]) # Apache v2
load("//bazel:grpc_build_system.bzl", "grpc_cc_test", "grpc_cc_library", "grpc_cc_binary", "grpc_package")
load("//test/cpp/qps:qps_benchmark_script.bzl", "qps_json_driver_batch", "json_run_localhost_batch")
package(default_visibility = ["//visibility:public"])
load("//bazel:grpc_build_system.bzl", "grpc_cc_binary", "grpc_cc_library", "grpc_cc_test", "grpc_package")
load("//test/cpp/qps:qps_benchmark_script.bzl", "json_run_localhost_batch", "qps_json_driver_batch")
grpc_package(name = "test/cpp/qps")

@ -54,7 +54,7 @@ else
tempdir=$(mktemp -d)
cp -RT "${dir}" "${tempdir}"
yapf "${tempdir}"
diff -x 'LICENSE' -x '*.pyc' -ru "${dir}" "${tempdir}" || ok=no
diff -x '*.pyc' -ru "${dir}" "${tempdir}" || ok=no
rm -rf "${tempdir}"
done
if [[ ${ok} == no ]]; then

@ -79,6 +79,14 @@ RUN pip install --upgrade pip==10.0.1
RUN pip install virtualenv
RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 twisted==17.5.0
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y \
valgrind
RUN mkdir /var/local/jenkins

@ -76,6 +76,14 @@ RUN pip install futures==2.2.0 enum34==1.0.4 protobuf==3.5.2.post1 six==1.10.0 t
RUN apt-get update && apt-get install -y \
git php5 php5-dev phpunit unzip
#=================
# PHP Test dependencies
# Install dependencies
RUN apt-get update && apt-get install -y \
valgrind
RUN mkdir /var/local/jenkins

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save