Merge branch 'master' into errqueueredefine

pull/16456/head
Yash Tibrewal 7 years ago
commit 0d73deb239
  1. 8
      bazel/grpc_deps.bzl
  2. 33
      src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
  3. 9
      src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
  4. 13
      src/core/ext/filters/client_channel/resolver.h
  5. 12
      src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
  6. 12
      src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.cc
  7. 18
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.cc
  8. 3
      src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h
  9. 7
      src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.cc
  10. 4
      src/core/lib/iomgr/internal_errqueue.cc
  11. 1
      src/core/lib/iomgr/port.h
  12. 4
      src/core/tsi/ssl_transport_security.cc
  13. 37
      src/csharp/Grpc.Core.Tests/ChannelCredentialsTest.cs
  14. 10
      src/csharp/Grpc.Core.Tests/FakeCredentials.cs
  15. 2
      src/csharp/Grpc.Core.Tests/MetadataTest.cs
  16. 2
      src/csharp/Grpc.Core/Channel.cs
  17. 39
      src/csharp/Grpc.Core/ChannelCredentials.cs
  18. 27
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  19. 32
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  20. 10
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  21. 4
      src/csharp/Grpc.Core/Metadata.cs
  22. 4
      src/proto/grpc/testing/control.proto
  23. 117
      test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
  24. 58
      test/core/client_channel/resolvers/fake_resolver_test.cc
  25. 6
      test/core/tsi/ssl_transport_security_test.cc
  26. 68
      test/cpp/end2end/client_lb_end2end_test.cc
  27. 45
      test/cpp/qps/client.h
  28. 6
      test/cpp/qps/driver.cc
  29. 3
      test/cpp/qps/driver.h
  30. 5
      test/cpp/qps/histogram.h
  31. 2
      test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
  32. 19
      test/cpp/qps/qps_json_driver.cc
  33. 2
      test/cpp/qps/qps_openloop_test.cc
  34. 2
      test/cpp/qps/secure_sync_unary_ping_pong_test.cc
  35. 9
      tools/internal_ci/linux/grpc_bazel_on_foundry_base.sh
  36. 11
      tools/internal_ci/linux/grpc_msan_on_foundry.sh
  37. 9
      tools/internal_ci/linux/grpc_ubsan_on_foundry.sh
  38. 9
      tools/internal_ci/linux/pull_request/grpc_ubsan_on_foundry.sh

@ -169,12 +169,12 @@ def grpc_deps():
if "com_github_bazelbuild_bazeltoolchains" not in native.existing_rules():
native.http_archive(
name = "com_github_bazelbuild_bazeltoolchains",
strip_prefix = "bazel-toolchains-4653c01284d8a4a536f8f9bb47b7d10f94c549e7",
strip_prefix = "bazel-toolchains-cdea5b8675914d0a354d89f108de5d28e54e0edc",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/bazel-toolchains/archive/4653c01284d8a4a536f8f9bb47b7d10f94c549e7.tar.gz",
"https://github.com/bazelbuild/bazel-toolchains/archive/4653c01284d8a4a536f8f9bb47b7d10f94c549e7.tar.gz",
"https://mirror.bazel.build/github.com/bazelbuild/bazel-toolchains/archive/cdea5b8675914d0a354d89f108de5d28e54e0edc.tar.gz",
"https://github.com/bazelbuild/bazel-toolchains/archive/cdea5b8675914d0a354d89f108de5d28e54e0edc.tar.gz",
],
sha256 = "1c4a532b396c698e6467a1548554571cb85fa091e472b05e398ebc836c315d77",
sha256 = "cefb6ccf86ca592baaa029bcef04148593c0efe8f734542f10293ea58f170715",
)
if "io_opencensus_cpp" not in native.existing_rules():

@ -126,7 +126,6 @@ class PickFirst : public LoadBalancingPolicy {
void ShutdownLocked() override;
void StartPickingLocked();
void DestroyUnselectedSubchannelsLocked();
void UpdateChildRefsLocked();
// All our subchannels.
@ -250,14 +249,9 @@ void PickFirst::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask,
void PickFirst::StartPickingLocked() {
started_picking_ = true;
if (subchannel_list_ != nullptr) {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
if (subchannel_list_->subchannel(i)->subchannel() != nullptr) {
subchannel_list_->subchannel(i)
->CheckConnectivityStateAndStartWatchingLocked();
break;
}
}
if (subchannel_list_ != nullptr && subchannel_list_->num_subchannels() > 0) {
subchannel_list_->subchannel(0)
->CheckConnectivityStateAndStartWatchingLocked();
}
}
@ -294,15 +288,6 @@ bool PickFirst::PickLocked(PickState* pick, grpc_error** error) {
return false;
}
void PickFirst::DestroyUnselectedSubchannelsLocked() {
for (size_t i = 0; i < subchannel_list_->num_subchannels(); ++i) {
PickFirstSubchannelData* sd = subchannel_list_->subchannel(i);
if (selected_ != sd) {
sd->UnrefSubchannelLocked("selected_different_subchannel");
}
}
}
grpc_connectivity_state PickFirst::CheckConnectivityLocked(grpc_error** error) {
return grpc_connectivity_state_get(&state_tracker_, error);
}
@ -419,7 +404,6 @@ void PickFirst::UpdateLocked(const grpc_channel_args& args) {
if (sd->CheckConnectivityStateLocked(&error) == GRPC_CHANNEL_READY) {
selected_ = sd;
subchannel_list_ = std::move(subchannel_list);
DestroyUnselectedSubchannelsLocked();
sd->StartConnectivityWatchLocked();
// If there was a previously pending update (which may or may
// not have contained the currently selected subchannel), drop
@ -504,7 +488,6 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
p->TryReresolutionLocked(&grpc_lb_pick_first_trace, GRPC_ERROR_NONE);
// In transient failure. Rely on re-resolution to recover.
p->selected_ = nullptr;
UnrefSubchannelLocked("pf_selected_shutdown");
StopConnectivityWatchLocked();
} else {
grpc_connectivity_state_set(&p->state_tracker_, connectivity_state,
@ -535,11 +518,9 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
StopConnectivityWatchLocked();
PickFirstSubchannelData* sd = this;
do {
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
} while (sd->subchannel() == nullptr);
size_t next_index =
(sd->Index() + 1) % subchannel_list()->num_subchannels();
sd = subchannel_list()->subchannel(next_index);
// Case 1: Only set state to TRANSIENT_FAILURE if we've tried
// all subchannels.
if (sd->Index() == 0 && subchannel_list() == p->subchannel_list_.get()) {
@ -600,8 +581,6 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
if (grpc_lb_pick_first_trace.enabled()) {
gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
}
// Drop all other subchannels, since we are now connected.
p->DestroyUnselectedSubchannelsLocked();
// Update any calls that were waiting for a pick.
PickState* pick;
while ((pick = p->pending_picks_)) {

@ -102,11 +102,6 @@ class SubchannelData {
return pending_connectivity_state_unsafe_;
}
// Unrefs the subchannel. May be used if an individual subchannel is
// no longer needed even though the subchannel list as a whole is not
// being unreffed.
virtual void UnrefSubchannelLocked(const char* reason);
// Resets the connection backoff.
// TODO(roth): This method should go away when we move the backoff
// code out of the subchannel and into the LB policies.
@ -154,6 +149,10 @@ class SubchannelData {
grpc_connectivity_state connectivity_state,
grpc_error* error) GRPC_ABSTRACT;
// Unrefs the subchannel. May be overridden by subclasses that need
// to perform extra cleanup when unreffing the subchannel.
virtual void UnrefSubchannelLocked(const char* reason);
private:
// Updates connected_subchannel_ based on pending_connectivity_state_unsafe_.
// Returns true if the connectivity state should be reported.

@ -81,18 +81,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
///
/// If this causes new data to become available, then the currently
/// pending call to \a NextLocked() will return the new result.
///
/// Note: Currently, all resolvers are required to return a new result
/// shortly after this method is called. For pull-based mechanisms, if
/// the implementation decides to delay querying the name service, it
/// should immediately return a new copy of the previously returned
/// result (and it can then return the updated data later, when it
/// actually does query the name service). For push-based mechanisms,
/// the implementation should immediately return a new copy of the
/// last-seen result.
/// TODO(roth): Remove this requirement once we fix pick_first to not
/// throw away unselected subchannels.
virtual void RequestReresolutionLocked() GRPC_ABSTRACT;
virtual void RequestReresolutionLocked() {}
/// Resets the re-resolution backoff, if any.
/// This needs to be implemented only by pull-based implementations;

@ -373,13 +373,7 @@ void AresDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void AresDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -401,10 +395,6 @@ void AresDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -247,13 +247,7 @@ void NativeDnsResolver::OnResolvedLocked(void* arg, grpc_error* error) {
void NativeDnsResolver::MaybeStartResolvingLocked() {
// If there is an existing timer, the time it fires is the earliest time we
// can start the next resolution.
if (have_next_resolution_timer_) {
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
if (have_next_resolution_timer_) return;
if (last_resolution_timestamp_ >= 0) {
const grpc_millis earliest_next_resolution =
last_resolution_timestamp_ + min_time_between_resolutions_;
@ -275,10 +269,6 @@ void NativeDnsResolver::MaybeStartResolvingLocked() {
self.release();
grpc_timer_init(&next_resolution_timer_, ms_until_next_resolution,
&on_next_resolution_);
// TODO(dgq): remove the following two lines once Pick First stops
// discarding subchannels after selecting.
++resolved_version_;
MaybeFinishNextLocked();
return;
}
}

@ -73,11 +73,6 @@ class FakeResolver : public Resolver {
// Results to use for the pretended re-resolution in
// RequestReresolutionLocked().
grpc_channel_args* reresolution_results_ = nullptr;
// TODO(juanlishen): This can go away once pick_first is changed to not throw
// away its subchannels, since that will eliminate its dependence on
// channel_saw_error_locked() causing an immediate resolver return.
// A copy of the most-recently used resolution results.
grpc_channel_args* last_used_results_ = nullptr;
// pending next completion, or NULL
grpc_closure* next_completion_ = nullptr;
// target result address for next completion
@ -96,7 +91,6 @@ FakeResolver::FakeResolver(const ResolverArgs& args) : Resolver(args.combiner) {
FakeResolver::~FakeResolver() {
grpc_channel_args_destroy(next_results_);
grpc_channel_args_destroy(reresolution_results_);
grpc_channel_args_destroy(last_used_results_);
grpc_channel_args_destroy(channel_args_);
}
@ -109,17 +103,11 @@ void FakeResolver::NextLocked(grpc_channel_args** target_result,
}
void FakeResolver::RequestReresolutionLocked() {
// A resolution must have been returned before an error is seen.
GPR_ASSERT(last_used_results_ != nullptr);
grpc_channel_args_destroy(next_results_);
if (reresolution_results_ != nullptr) {
grpc_channel_args_destroy(next_results_);
next_results_ = grpc_channel_args_copy(reresolution_results_);
} else {
// If reresolution_results is unavailable, re-resolve with the most-recently
// used results to avoid a no-op re-resolution.
next_results_ = grpc_channel_args_copy(last_used_results_);
MaybeFinishNextLocked();
}
MaybeFinishNextLocked();
}
void FakeResolver::MaybeFinishNextLocked() {
@ -161,8 +149,6 @@ void FakeResolverResponseGenerator::SetResponseLocked(void* arg,
FakeResolver* resolver = closure_arg->generator->resolver_;
grpc_channel_args_destroy(resolver->next_results_);
resolver->next_results_ = closure_arg->response;
grpc_channel_args_destroy(resolver->last_used_results_);
resolver->last_used_results_ = grpc_channel_args_copy(closure_arg->response);
resolver->MaybeFinishNextLocked();
Delete(closure_arg);
}

@ -53,7 +53,8 @@ class FakeResolverResponseGenerator
// The new re-resolution response replaces any previous re-resolution
// response that may have been set by a previous call.
// If the re-resolution response is set to NULL, then the fake
// resolver will return the last value set via \a SetResponse().
// resolver will not return anything when \a RequestReresolutionLocked()
// is called.
void SetReresolutionResponse(grpc_channel_args* response);
// Tells the resolver to return a transient failure (signalled by

@ -50,8 +50,6 @@ class SockaddrResolver : public Resolver {
void NextLocked(grpc_channel_args** result,
grpc_closure* on_complete) override;
void RequestReresolutionLocked() override;
void ShutdownLocked() override;
private:
@ -90,11 +88,6 @@ void SockaddrResolver::NextLocked(grpc_channel_args** target_result,
MaybeFinishNextLocked();
}
void SockaddrResolver::RequestReresolutionLocked() {
published_ = false;
MaybeFinishNextLocked();
}
void SockaddrResolver::ShutdownLocked() {
if (next_completion_ != nullptr) {
*target_result_ = nullptr;

@ -24,10 +24,6 @@
#ifdef GRPC_POSIX_SOCKET_TCP
#ifdef GPR_LINUX
#include <linux/version.h>
#endif /* GPR_LINUX */
bool kernel_supports_errqueue() {
#ifdef LINUX_VERSION_CODE
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)

@ -60,7 +60,6 @@
#define GRPC_HAVE_IP_PKTINFO 1
#define GRPC_HAVE_MSG_NOSIGNAL 1
#define GRPC_HAVE_UNIX_SOCKET 1
#include <linux/version.h>
#ifdef LINUX_VERSION_CODE
#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0)
#define GRPC_LINUX_ERRQUEUE 1

@ -1051,9 +1051,9 @@ static tsi_result ssl_handshaker_result_extract_peer(
}
const char* session_reused = SSL_session_reused(impl->ssl) ? "true" : "false";
result = tsi_construct_string_peer_property(
result = tsi_construct_string_peer_property_from_cstring(
TSI_SSL_SESSION_REUSED_PEER_PROPERTY, session_reused,
strlen(session_reused) + 1, &peer->properties[peer->property_count]);
&peer->properties[peer->property_count]);
if (result != TSI_OK) return result;
peer->property_count++;

@ -17,13 +17,7 @@
#endregion
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
@ -44,9 +38,38 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(ArgumentNullException), () => ChannelCredentials.Create(null, new FakeCallCredentials()));
Assert.Throws(typeof(ArgumentNullException), () => ChannelCredentials.Create(new FakeChannelCredentials(true), null));
// forbid composing non-composable
Assert.Throws(typeof(ArgumentException), () => ChannelCredentials.Create(new FakeChannelCredentials(false), new FakeCallCredentials()));
}
[Test]
public void ChannelCredentials_NativeCredentialsAreReused()
{
// always returning the same native object is critical for subchannel sharing to work with secure channels
var creds = new SslCredentials();
var nativeCreds1 = creds.GetNativeCredentials();
var nativeCreds2 = creds.GetNativeCredentials();
Assert.AreSame(nativeCreds1, nativeCreds2);
}
[Test]
public void ChannelCredentials_CreateExceptionIsCached()
{
var creds = new ChannelCredentialsWithCreateNativeThrows();
var ex1 = Assert.Throws(typeof(Exception), () => creds.GetNativeCredentials());
var ex2 = Assert.Throws(typeof(Exception), () => creds.GetNativeCredentials());
Assert.AreSame(ex1, ex2);
}
internal class ChannelCredentialsWithCreateNativeThrows : ChannelCredentials
{
internal override bool IsComposable => false;
internal override ChannelCredentialsSafeHandle CreateNativeCredentials()
{
throw new Exception("Creation of native credentials has failed on purpose.");
}
}
}
}

@ -16,15 +16,7 @@
#endregion
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
@ -42,7 +34,7 @@ namespace Grpc.Core.Tests
get { return composable; }
}
internal override ChannelCredentialsSafeHandle ToNativeCredentials()
internal override ChannelCredentialsSafeHandle CreateNativeCredentials()
{
return null;
}

@ -66,6 +66,8 @@ namespace Grpc.Core.Tests
new Metadata.Entry("0123456789abc", "XYZ");
new Metadata.Entry("-abc", "XYZ");
new Metadata.Entry("a_bc_", "XYZ");
new Metadata.Entry("abc.xyz", "XYZ");
new Metadata.Entry("abc.xyz-bin", new byte[] {1, 2, 3});
Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc[", "xyz"));
Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc/", "xyz"));
}

@ -72,9 +72,9 @@ namespace Grpc.Core
this.environment = GrpcEnvironment.AddRef();
this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
var nativeCredentials = credentials.GetNativeCredentials();
if (nativeCredentials != null)
{
this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);

@ -31,6 +31,19 @@ namespace Grpc.Core
public abstract class ChannelCredentials
{
static readonly ChannelCredentials InsecureInstance = new InsecureCredentialsImpl();
readonly Lazy<ChannelCredentialsSafeHandle> cachedNativeCredentials;
/// <summary>
/// Creates a new instance of channel credentials
/// </summary>
public ChannelCredentials()
{
// Native credentials object need to be kept alive once initialized for subchannel sharing to work correctly
// with secure connections. See https://github.com/grpc/grpc/issues/15207.
// We rely on finalizer to clean up the native portion of ChannelCredentialsSafeHandle after the ChannelCredentials
// instance becomes unused.
this.cachedNativeCredentials = new Lazy<ChannelCredentialsSafeHandle>(() => CreateNativeCredentials());
}
/// <summary>
/// Returns instance of credentials that provides no security and
@ -57,11 +70,22 @@ namespace Grpc.Core
}
/// <summary>
/// Creates native object for the credentials. May return null if insecure channel
/// should be created.
/// Gets native object for the credentials, creating one if it already doesn't exist. May return null if insecure channel
/// should be created. Caller must not call <c>Dispose()</c> on the returned native credentials as their lifetime
/// is managed by this class (and instances of native credentials are cached).
/// </summary>
/// <returns>The native credentials.</returns>
internal ChannelCredentialsSafeHandle GetNativeCredentials()
{
return cachedNativeCredentials.Value;
}
/// <summary>
/// Creates a new native object for the credentials. May return null if insecure channel
/// should be created. For internal use only, use <see cref="GetNativeCredentials"/> instead.
/// </summary>
/// <returns>The native credentials.</returns>
internal abstract ChannelCredentialsSafeHandle ToNativeCredentials();
internal abstract ChannelCredentialsSafeHandle CreateNativeCredentials();
/// <summary>
/// Returns <c>true</c> if this credential type allows being composed by <c>CompositeCredentials</c>.
@ -73,7 +97,7 @@ namespace Grpc.Core
private sealed class InsecureCredentialsImpl : ChannelCredentials
{
internal override ChannelCredentialsSafeHandle ToNativeCredentials()
internal override ChannelCredentialsSafeHandle CreateNativeCredentials()
{
return null;
}
@ -145,7 +169,7 @@ namespace Grpc.Core
get { return true; }
}
internal override ChannelCredentialsSafeHandle ToNativeCredentials()
internal override ChannelCredentialsSafeHandle CreateNativeCredentials()
{
return ChannelCredentialsSafeHandle.CreateSslCredentials(rootCertificates, keyCertificatePair);
}
@ -173,12 +197,11 @@ namespace Grpc.Core
GrpcPreconditions.CheckArgument(channelCredentials.IsComposable, "Supplied channel credentials do not allow composition.");
}
internal override ChannelCredentialsSafeHandle ToNativeCredentials()
internal override ChannelCredentialsSafeHandle CreateNativeCredentials()
{
using (var channelCreds = channelCredentials.ToNativeCredentials())
using (var callCreds = callCredentials.ToNativeCredentials())
{
var nativeComposite = ChannelCredentialsSafeHandle.CreateComposite(channelCreds, callCreds);
var nativeComposite = ChannelCredentialsSafeHandle.CreateComposite(channelCredentials.GetNativeCredentials(), callCreds);
if (nativeComposite.IsInvalid)
{
throw new ArgumentException("Error creating native composite credentials. Likely, this is because you are trying to compose incompatible credentials.");

@ -325,9 +325,18 @@ namespace Grpc.Core.Internal
}
}
protected override void OnAfterReleaseResources()
protected override void OnAfterReleaseResourcesLocked()
{
details.Channel.RemoveCallReference(this);
}
protected override void OnAfterReleaseResourcesUnlocked()
{
// If cancellation callback is in progress, this can block
// so we need to do this outside of call's lock to prevent
// deadlock.
// See https://github.com/grpc/grpc/issues/14777
// See https://github.com/dotnet/corefx/issues/14903
cancellationTokenRegistration?.Dispose();
}
@ -448,6 +457,7 @@ namespace Grpc.Core.Internal
TResponse msg = default(TResponse);
var deserializeException = TryDeserialize(receivedMessage, out msg);
bool releasedResources;
lock (myLock)
{
finished = true;
@ -464,7 +474,12 @@ namespace Grpc.Core.Internal
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
responseHeadersTcs.SetResult(responseHeaders);
@ -494,6 +509,7 @@ namespace Grpc.Core.Internal
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
bool releasedResources;
lock (myLock)
{
finished = true;
@ -504,7 +520,12 @@ namespace Grpc.Core.Internal
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
if (delayedStreamingWriteTcs != null)

@ -196,10 +196,14 @@ namespace Grpc.Core.Internal
call.Dispose();
}
disposed = true;
OnAfterReleaseResources();
OnAfterReleaseResourcesLocked();
}
protected virtual void OnAfterReleaseResources()
protected virtual void OnAfterReleaseResourcesLocked()
{
}
protected virtual void OnAfterReleaseResourcesUnlocked()
{
}
@ -235,6 +239,7 @@ namespace Grpc.Core.Internal
{
bool delayCompletion = false;
TaskCompletionSource<object> origTcs = null;
bool releasedResources;
lock (myLock)
{
if (!success && !finished && IsClient) {
@ -252,7 +257,12 @@ namespace Grpc.Core.Internal
streamingWriteTcs = null;
}
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
if (!success)
@ -282,9 +292,15 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
bool releasedResources;
lock (myLock)
{
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
if (!success)
@ -310,6 +326,7 @@ namespace Grpc.Core.Internal
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
TaskCompletionSource<TRead> origTcs = null;
bool releasedResources;
lock (myLock)
{
origTcs = streamingReadTcs;
@ -332,7 +349,12 @@ namespace Grpc.Core.Internal
streamingReadTcs = null;
}
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
if (deserializeException != null && !IsClient)

@ -184,7 +184,7 @@ namespace Grpc.Core.Internal
throw new InvalidOperationException("Call be only called for client calls");
}
protected override void OnAfterReleaseResources()
protected override void OnAfterReleaseResourcesLocked()
{
server.RemoveCallReference(this);
}
@ -206,6 +206,7 @@ namespace Grpc.Core.Internal
{
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
// success will be always set to true.
bool releasedResources;
lock (myLock)
{
finished = true;
@ -217,7 +218,12 @@ namespace Grpc.Core.Internal
streamingReadTcs = new TaskCompletionSource<TRequest>();
streamingReadTcs.SetResult(default(TRequest));
}
ReleaseResourcesIfPossible();
releasedResources = ReleaseResourcesIfPossible();
}
if (releasedResources)
{
OnAfterReleaseResourcesUnlocked();
}
if (cancelled)

@ -225,7 +225,7 @@ namespace Grpc.Core
/// </summary>
public class Entry
{
private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$");
private static readonly Regex ValidKeyRegex = new Regex("^[.a-z0-9_-]+$");
readonly string key;
readonly string value;
@ -360,7 +360,7 @@ namespace Grpc.Core
{
var normalized = GrpcPreconditions.CheckNotNull(key, "key").ToLowerInvariant();
GrpcPreconditions.CheckArgument(ValidKeyRegex.IsMatch(normalized),
"Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens.");
"Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores, hyphens and dots.");
return normalized;
}

@ -111,6 +111,10 @@ message ClientConfig {
// Use coalescing API when possible.
bool use_coalesce_api = 19;
// If 0, disabled. Else, specifies the period between gathering latency
// medians in milliseconds.
int32 median_latency_collection_interval_millis = 20;
}
message ClientStatus { ClientStats stats = 1; }

@ -28,12 +28,16 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "test/core/util/test_config.h"
constexpr int kMinResolutionPeriodMs = 1000;
// Provide some slack when checking intervals, to allow for test timing issues.
constexpr int kMinResolutionPeriodForCheckMs = 900;
extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner;
grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@ -43,7 +47,7 @@ grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
// times a system-level resolution has happened.
static int g_resolution_count;
struct iomgr_args {
static struct iomgr_args {
gpr_event ev;
gpr_atm done_atm;
gpr_mu* mu;
@ -61,6 +65,16 @@ static void test_resolve_address_impl(const char* name,
default_resolve_address->resolve_address(
name, default_port, g_iomgr_args.pollset_set, on_done, addrs);
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
if (last_resolution_time == 0) {
last_resolution_time =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
} else {
grpc_millis now =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
last_resolution_time = now;
}
}
static grpc_error* test_blocking_resolve_address_impl(
@ -73,7 +87,7 @@ static grpc_error* test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl};
grpc_ares_request* test_dns_lookup_ares_locked(
static grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@ -82,6 +96,16 @@ grpc_ares_request* test_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs,
check_grpclb, service_config_json, combiner);
++g_resolution_count;
static grpc_millis last_resolution_time = 0;
if (last_resolution_time == 0) {
last_resolution_time =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
} else {
grpc_millis now =
grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
last_resolution_time = now;
}
return result;
}
@ -91,7 +115,7 @@ static gpr_timespec test_deadline(void) {
static void do_nothing(void* arg, grpc_error* error) {}
void iomgr_args_init(iomgr_args* args) {
static void iomgr_args_init(iomgr_args* args) {
gpr_event_init(&args->ev);
args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args->pollset, &args->mu);
@ -100,7 +124,7 @@ void iomgr_args_init(iomgr_args* args) {
gpr_atm_rel_store(&args->done_atm, 0);
}
void iomgr_args_finish(iomgr_args* args) {
static void iomgr_args_finish(iomgr_args* args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
grpc_pollset_set_destroy(args->pollset_set);
@ -146,29 +170,19 @@ struct OnResolutionCallbackArg {
const char* uri_str = nullptr;
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr;
grpc_millis delay_before_second_resolution = 0;
};
// Counter for the number of times a resolution notification callback has been
// invoked.
static int g_on_resolution_invocations_count;
// Set to true by the last callback in the resolution chain.
bool g_all_callbacks_invoked;
static bool g_all_callbacks_invoked;
void on_fourth_resolution(void* arg, grpc_error* error) {
static void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"4th: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// In this case we expect to have incurred in another system-level resolution
// because on_third_resolution slept for longer than the min resolution
// period.
GPR_ASSERT(g_on_resolution_invocations_count == 4);
GPR_ASSERT(g_resolution_count == 3);
gpr_log(GPR_INFO, "2nd: g_resolution_count: %d", g_resolution_count);
// The resolution callback was not invoked until new data was
// available, which was delayed until after the cooldown period.
GPR_ASSERT(g_resolution_count == 2);
cb_arg->resolver.reset();
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_mu_lock(g_iomgr_args.mu);
@ -179,67 +193,13 @@ void on_fourth_resolution(void* arg, grpc_error* error) {
g_all_callbacks_invoked = true;
}
void on_third_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The timer set because of the previous re-resolution request fires, so a new
// system-level resolution happened.
GPR_ASSERT(g_on_resolution_invocations_count == 3);
GPR_ASSERT(g_resolution_count == 2);
grpc_core::ExecCtx::Get()->TestOnlySetNow(
cb_arg->delay_before_second_resolution * 2);
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_fourth_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
cb_arg->resolver->RequestReresolutionLocked();
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
// The resolution request for which this function is the callback happened
// before the min resolution period. Therefore, no new system-level
// resolutions happened, as indicated by g_resolution_count. But a resolution
// timer was set to fire when the cooldown finishes.
GPR_ASSERT(g_on_resolution_invocations_count == 2);
GPR_ASSERT(g_resolution_count == 1);
// Register a new callback to capture the timer firing.
cb_arg->resolver->NextLocked(
&cb_arg->result,
GRPC_CLOSURE_CREATE(on_third_resolution, arg,
grpc_combiner_scheduler(g_combiner)));
gpr_mu_lock(g_iomgr_args.mu);
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
gpr_mu_unlock(g_iomgr_args.mu);
}
void on_first_resolution(void* arg, grpc_error* error) {
static void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
++g_on_resolution_invocations_count;
gpr_log(GPR_INFO,
"1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
g_on_resolution_invocations_count, g_resolution_count);
gpr_log(GPR_INFO, "1st: g_resolution_count: %d", g_resolution_count);
// There's one initial system-level resolution and one invocation of a
// notification callback (the current function).
GPR_ASSERT(g_on_resolution_invocations_count == 1);
GPR_ASSERT(g_resolution_count == 1);
cb_arg->resolver->NextLocked(
&cb_arg->result,
@ -265,9 +225,7 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
g_on_resolution_invocations_count = 0;
g_resolution_count = 0;
constexpr int kMinResolutionPeriodMs = 1000;
grpc_arg cooldown_arg;
cooldown_arg.key =
@ -280,7 +238,6 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
res_cb_arg->resolver = factory->CreateResolver(args);
grpc_channel_args_destroy(cooldown_channel_args);
GPR_ASSERT(res_cb_arg->resolver != nullptr);
res_cb_arg->delay_before_second_resolution = kMinResolutionPeriodMs;
// First resolution, would incur in system-level resolution.
res_cb_arg->resolver->NextLocked(
&res_cb_arg->result,

@ -124,8 +124,8 @@ static void test_fake_resolver() {
build_fake_resolver(combiner, response_generator.get());
GPR_ASSERT(resolver.get() != nullptr);
// Test 1: normal resolution.
// next_results != NULL, reresolution_results == NULL, last_used_results ==
// NULL. Expected response is next_results.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
grpc_channel_args* results = create_new_resolver_result();
on_resolution_arg on_res_arg = create_on_resolution_arg(results);
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
@ -137,10 +137,9 @@ static void test_fake_resolver() {
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 2: update resolution.
// next_results != NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is next_results.
// next_results != NULL, reresolution_results == NULL.
// Expected response is next_results.
results = create_new_resolver_result();
grpc_channel_args* last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -150,21 +149,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 3: fallback re-resolution.
// next_results == NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 4: normal re-resolution.
// next_results == NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is reresolution_results.
// Test 3: normal re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
grpc_channel_args* reresolution_results = create_new_resolver_result();
on_res_arg =
create_on_resolution_arg(grpc_channel_args_copy(reresolution_results));
@ -180,9 +167,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 5: repeat re-resolution.
// next_results == NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is reresolution_results.
// Test 4: repeat re-resolution.
// next_results == NULL, reresolution_results != NULL.
// Expected response is reresolution_results.
on_res_arg = create_on_resolution_arg(reresolution_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -192,11 +179,10 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 6: normal resolution.
// next_results != NULL, reresolution_results != NULL, last_used_results !=
// NULL. Expected response is next_results.
// Test 5: normal resolution.
// next_results != NULL, reresolution_results != NULL.
// Expected response is next_results.
results = create_new_resolver_result();
last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@ -206,23 +192,7 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 7: fallback re-resolution.
// next_results == NULL, reresolution_results == NULL, last_used_results !=
// NULL. Expected response is last_used_results.
on_res_arg = create_on_resolution_arg(last_used_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
// Reset reresolution_results.
response_generator->SetReresolutionResponse(nullptr);
// Flush here to guarantee that reresolution_results has been reset.
grpc_core::ExecCtx::Get()->Flush();
// Trigger a re-resolution.
resolver->RequestReresolutionLocked();
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 8: no-op.
// Test 6: no-op.
// Requesting a new resolution without setting the response shouldn't trigger
// the resolution callback.
memset(&on_res_arg, 0, sizeof(on_res_arg));

@ -208,9 +208,11 @@ static void check_session_reusage(ssl_tsi_test_fixture* ssl_fixture,
tsi_peer_get_property_by_name(peer, TSI_SSL_SESSION_REUSED_PEER_PROPERTY);
GPR_ASSERT(session_reused != nullptr);
if (ssl_fixture->session_reused) {
GPR_ASSERT(strcmp(session_reused->value.data, "true") == 0);
GPR_ASSERT(strncmp(session_reused->value.data, "true",
session_reused->value.length) == 0);
} else {
GPR_ASSERT(strcmp(session_reused->value.data, "false") == 0);
GPR_ASSERT(strncmp(session_reused->value.data, "false",
session_reused->value.length) == 0);
}
}

@ -129,12 +129,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
void CreateServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
servers_.clear();
for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
servers_.emplace_back(new ServerData(server_host_, port));
servers_.emplace_back(new ServerData(port));
}
}
void StartServer(size_t index) { servers_[index]->Start(server_host_); }
void StartServers(size_t num_servers,
std::vector<int> ports = std::vector<int>()) {
CreateServers(num_servers, ports);
for (size_t i = 0; i < num_servers; ++i) {
StartServer(i);
}
}
@ -240,20 +251,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
explicit ServerData(const grpc::string& server_host, int port = 0) {
explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
}
void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
void Start(const grpc::string& server_host, std::mutex* mu,
void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
@ -601,6 +615,41 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 0, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[0]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
StartServers(1, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
TEST_F(ClientLbEnd2endTest,
PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
std::vector<int> ports = {grpc_pick_unused_port_or_die(),
grpc_pick_unused_port_or_die()};
CreateServers(2, ports);
StartServer(1);
auto channel = BuildChannel("pick_first");
auto stub = BuildStub(channel);
SetNextResolution(ports);
gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
WaitForServer(stub, 1, DEBUG_LOCATION);
gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
servers_[1]->Shutdown();
EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
StartServers(2, ports);
WaitForServer(stub, 0, DEBUG_LOCATION);
}
TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
std::vector<int> ports = {grpc_pick_unused_port_or_die()};
StartServers(1, ports);
@ -613,7 +662,6 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
servers_[0]->Shutdown();
// Channel 1 will receive a re-resolution containing the same server. It will
// create a new subchannel and hold a ref to it.
servers_.clear();
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
auto channel_2 = BuildChannel("pick_first");
@ -632,7 +680,6 @@ TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
// Channel 2 will also receive a re-resolution containing the same server.
// Both channels will ref the same subchannel that failed.
servers_.clear();
StartServers(1, ports);
gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
@ -860,7 +907,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
for (size_t i = 0; i < servers_.size(); ++i) {
servers_[i]->Shutdown(false);
servers_[i]->Shutdown(true);
}
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
@ -921,7 +968,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
// Bring the first server back up.
servers_[0].reset(new ServerData(server_host_, ports[0]));
servers_[0].reset(new ServerData(ports[0]));
StartServer(0);
// Requests should start arriving at the first server either right away (if
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before

@ -180,6 +180,19 @@ class Client {
timer_result = timer_->Mark();
}
// Print the median latency per interval for one thread.
// If the number of warmup seconds is x, then the first x + 1 numbers in the
// vector are from the warmup period and should be discarded.
if (median_latency_collection_interval_seconds_ > 0) {
std::vector<double> medians_per_interval =
threads_[0]->GetMedianPerIntervalList();
gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
for (size_t j = 0; j < medians_per_interval.size(); j++) {
gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
}
}
grpc_stats_data core_stats;
grpc_stats_collect(&core_stats);
@ -210,6 +223,12 @@ class Client {
}
}
// Returns the interval (in seconds) between collecting latency medians. If 0,
// no periodic median latencies will be collected.
double GetLatencyCollectionIntervalInSeconds() {
return median_latency_collection_interval_seconds_;
}
virtual int GetPollCount() {
// For sync client.
return 0;
@ -218,6 +237,7 @@ class Client {
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
@ -299,10 +319,27 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
std::vector<double> GetMedianPerIntervalList() {
return medians_each_interval_list_;
}
void UpdateHistogram(HistogramEntry* entry) {
std::lock_guard<std::mutex> g(mu_);
if (entry->value_used()) {
histogram_.Add(entry->value());
if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
histogram_per_interval_.Add(entry->value());
double now = UsageTimer::Now();
if ((now - interval_start_time_) >=
client_->GetLatencyCollectionIntervalInSeconds()) {
// Record the median latency of requests from the last interval.
// Divide by 1e3 to get microseconds.
medians_each_interval_list_.push_back(
histogram_per_interval_.Percentile(50) / 1e3);
histogram_per_interval_.Reset();
interval_start_time_ = now;
}
}
}
if (entry->status_used()) {
statuses_[entry->status()]++;
@ -334,6 +371,11 @@ class Client {
Client* client_;
const size_t idx_;
std::thread impl_;
// The following are used only if
// median_latency_collection_interval_seconds_ is greater than 0
Histogram histogram_per_interval_;
std::vector<double> medians_each_interval_list_;
double interval_start_time_;
};
bool ThreadCompleted() {
@ -392,7 +434,8 @@ class ClientImpl : public Client {
for (auto& t : connecting_threads) {
t->join();
}
median_latency_collection_interval_seconds_ =
config.median_latency_collection_interval_millis() / 1e3;
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
}

@ -198,7 +198,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
const grpc::string& credential_type, bool run_inproc) {
const grpc::string& credential_type, bool run_inproc,
int32_t median_latency_collection_interval_millis) {
if (run_inproc) {
g_inproc_servers = new std::vector<grpc::testing::Server*>;
}
@ -317,6 +318,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}
client_config.set_median_latency_collection_interval_millis(
median_latency_collection_interval_millis);
// Targets are all set by now
result_client_config = client_config;
// Start clients

@ -32,7 +32,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
const grpc::string& credential_type, bool run_inproc);
const grpc::string& credential_type, bool run_inproc,
int32_t median_latency_collection_interval_millis);
bool RunQuit(const grpc::string& credential_type);
} // namespace testing

@ -34,6 +34,11 @@ class Histogram {
~Histogram() {
if (impl_) grpc_histogram_destroy(impl_);
}
void Reset() {
if (impl_) grpc_histogram_destroy(impl_);
impl_ = grpc_histogram_create(default_resolution(), default_max_possible());
}
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); }

@ -48,7 +48,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
kInsecureCredentialsType, true);
kInsecureCredentialsType, true, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);

@ -66,6 +66,11 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to.");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with workers");
DEFINE_bool(run_inproc, false, "Perform an in-process transport test");
DEFINE_int32(
median_latency_collection_interval_millis, 0,
"Specifies the period between gathering latency medians in "
"milliseconds. The medians will be logged out on the client at the "
"end of the benchmark run. If 0, this periodic collection is disabled.");
namespace grpc {
namespace testing {
@ -73,13 +78,13 @@ namespace testing {
static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";
auto result =
RunScenario(scenario.client_config(), scenario.num_clients(),
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
!FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
FLAGS_qps_server_target_override, FLAGS_credential_type,
FLAGS_run_inproc);
auto result = RunScenario(
scenario.client_config(), scenario.num_clients(),
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
!FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
FLAGS_qps_server_target_override, FLAGS_credential_type, FLAGS_run_inproc,
FLAGS_median_latency_collection_interval_millis);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.

@ -52,7 +52,7 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
kInsecureCredentialsType, false);
kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);

@ -55,7 +55,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
kInsecureCredentialsType, false);
kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);

@ -22,8 +22,8 @@ mkdir -p ${KOKORO_KEYSTORE_DIR}
cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321_grpc-testing-service
temp_dir=$(mktemp -d)
ln -f "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5"
ln -f "${KOKORO_GFILE_DIR}/bazel-latest-release" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-latest-release"
export PATH="${temp_dir}:${PATH}"
# This should show ${temp_dir}/bazel
which bazel
@ -49,13 +49,14 @@ source tools/internal_ci/helper_scripts/prepare_build_linux_rc
--strategy=Closure=remote \
--genrule_strategy=remote \
--experimental_strict_action_env=true \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/default:toolchain \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/default:toolchain \
--define GRPC_PORT_ISOLATED_RUNTIME=1 \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/cpp:cc-toolchain-clang-x86_64-default \
--extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
--host_platform=//third_party/toolchains:rbe_ubuntu1604 \
--platforms=//third_party/toolchains:rbe_ubuntu1604 \
--remote_instance_name=grpc-testing/instances/default_instance \
$1 \
-- //test/... || FAILED="true"

@ -23,8 +23,8 @@ mkdir -p ${KOKORO_KEYSTORE_DIR}
cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321_grpc-testing-service
temp_dir=$(mktemp -d)
ln -f "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5"
ln -f "${KOKORO_GFILE_DIR}/bazel-latest-release" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-latest-release"
export PATH="${temp_dir}:${PATH}"
# This should show ${temp_dir}/bazel
which bazel
@ -58,13 +58,14 @@ source tools/internal_ci/helper_scripts/prepare_build_linux_rc
--linkopt=-fsanitize=memory \
--copt=-fsanitize-memory-track-origins \
--action_env=LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH \
--host_crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/default:toolchain \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/msan:toolchain \
--host_crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/default:toolchain \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/msan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/cpp:cc-toolchain-clang-x86_64-default \
--extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
--host_platform=//third_party/toolchains:rbe_ubuntu1604 \
--platforms=//third_party/toolchains:rbe_ubuntu1604 \
--remote_instance_name=grpc-testing/instances/default_instance \
-- //test/... || FAILED="true"
# Sleep to let ResultStore finish writing results before querying

@ -23,8 +23,8 @@ mkdir -p ${KOKORO_KEYSTORE_DIR}
cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321_grpc-testing-service
temp_dir=$(mktemp -d)
ln -f "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5"
ln -f "${KOKORO_GFILE_DIR}/bazel-latest-release" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-latest-release"
export PATH="${temp_dir}:${PATH}"
# This should show ${temp_dir}/bazel
which bazel
@ -55,13 +55,14 @@ source tools/internal_ci/helper_scripts/prepare_build_linux_rc
--strip=never \
--copt=-fsanitize=undefined \
--linkopt=-fsanitize=undefined \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.13.0/ubsan:toolchain \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.15.0/ubsan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/cpp:cc-toolchain-clang-x86_64-default \
--extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
--host_platform=//third_party/toolchains:rbe_ubuntu1604 \
--platforms=//third_party/toolchains:rbe_ubuntu1604 \
--cache_test_results=no \
--remote_instance_name=grpc-testing/instances/default_instance \
-- //test/... || FAILED="true"
# Sleep to let ResultStore finish writing results before querying

@ -23,8 +23,8 @@ mkdir -p ${KOKORO_KEYSTORE_DIR}
cp ${KOKORO_GFILE_DIR}/GrpcTesting-d0eeee2db331.json ${KOKORO_KEYSTORE_DIR}/4321_grpc-testing-service
temp_dir=$(mktemp -d)
ln -f "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-rc-0.14.0rc5"
ln -f "${KOKORO_GFILE_DIR}/bazel-latest-release" ${temp_dir}/bazel
chmod 755 "${KOKORO_GFILE_DIR}/bazel-latest-release"
export PATH="${temp_dir}:${PATH}"
# This should show ${temp_dir}/bazel
which bazel
@ -55,12 +55,13 @@ source tools/internal_ci/helper_scripts/prepare_build_linux_rc
--strip=never \
--copt=-fsanitize=undefined \
--linkopt=-fsanitize=undefined \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.13.0/ubsan:toolchain \
--crosstool_top=@com_github_bazelbuild_bazeltoolchains//configs/experimental/ubuntu16_04_clang/1.0/bazel_0.15.0/ubsan:toolchain \
--action_env=BAZEL_DO_NOT_DETECT_CPP_TOOLCHAIN=1 \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.13.0/cpp:cc-toolchain-clang-x86_64-default \
--extra_toolchains=@com_github_bazelbuild_bazeltoolchains//configs/ubuntu16_04_clang/1.0/bazel_0.16.1/cpp:cc-toolchain-clang-x86_64-default \
--extra_execution_platforms=//third_party/toolchains:rbe_ubuntu1604 \
--host_platform=//third_party/toolchains:rbe_ubuntu1604 \
--platforms=//third_party/toolchains:rbe_ubuntu1604 \
--remote_instance_name=grpc-testing/instances/default_instance \
-- //test/... || FAILED="true"
if [ "$FAILED" != "" ]

Loading…
Cancel
Save