Merge remote-tracking branch 'upstream/master' into libuv_em_basic

pull/21216/head
Guantao Liu 5 years ago
commit 81b4bc02db
  1. 81
      src/core/ext/filters/client_channel/xds/xds_bootstrap.cc
  2. 22
      src/core/ext/filters/client_channel/xds/xds_bootstrap.h
  3. 5
      src/core/ext/filters/client_channel/xds/xds_channel.cc
  4. 14
      src/core/ext/filters/client_channel/xds/xds_channel_secure.cc
  5. 2
      src/core/ext/filters/client_channel/xds/xds_client.cc
  6. 98
      src/csharp/Grpc.Core.Api/AsyncCallState.cs
  7. 41
      src/csharp/Grpc.Core.Api/AsyncClientStreamingCall.cs
  8. 41
      src/csharp/Grpc.Core.Api/AsyncDuplexStreamingCall.cs
  9. 38
      src/csharp/Grpc.Core.Api/AsyncServerStreamingCall.cs
  10. 38
      src/csharp/Grpc.Core.Api/AsyncUnaryCall.cs
  11. 82
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallStateTest.cs
  12. 29
      src/csharp/Grpc.Core/Calls.cs
  13. 20
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  14. 26
      src/csharp/Grpc.HealthCheck.Tests/TestResponseStreamWriter.cs
  15. 7
      src/csharp/Grpc.HealthCheck/HealthServiceImpl.cs
  16. 1
      src/csharp/tests.json
  17. 115
      test/core/client_channel/xds_bootstrap_test.cc
  18. 36
      test/cpp/end2end/xds_end2end_test.cc

@ -58,23 +58,23 @@ XdsBootstrap::XdsBootstrap(grpc_slice contents, grpc_error** error)
return;
}
InlinedVector<grpc_error*, 1> error_list;
bool seen_xds_server = false;
bool seen_xds_servers = false;
bool seen_node = false;
for (grpc_json* child = tree_->child; child != nullptr; child = child->next) {
if (child->key == nullptr) {
error_list.push_back(
GRPC_ERROR_CREATE_FROM_STATIC_STRING("JSON key is null"));
} else if (strcmp(child->key, "xds_server") == 0) {
if (child->type != GRPC_JSON_OBJECT) {
} else if (strcmp(child->key, "xds_servers") == 0) {
if (child->type != GRPC_JSON_ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"xds_server\" field is not an object"));
"\"xds_servers\" field is not an array"));
}
if (seen_xds_server) {
if (seen_xds_servers) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"duplicate \"xds_server\" field"));
"duplicate \"xds_servers\" field"));
}
seen_xds_server = true;
grpc_error* parse_error = ParseXdsServer(child);
seen_xds_servers = true;
grpc_error* parse_error = ParseXdsServerList(child);
if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error);
} else if (strcmp(child->key, "node") == 0) {
if (child->type != GRPC_JSON_OBJECT) {
@ -90,9 +90,9 @@ XdsBootstrap::XdsBootstrap(grpc_slice contents, grpc_error** error)
if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error);
}
}
if (!seen_xds_server) {
if (!seen_xds_servers) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"xds_server\" field not present"));
"\"xds_servers\" field not present"));
}
*error = GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing xds bootstrap file",
&error_list);
@ -103,9 +103,33 @@ XdsBootstrap::~XdsBootstrap() {
grpc_slice_unref_internal(contents_);
}
grpc_error* XdsBootstrap::ParseXdsServer(grpc_json* json) {
grpc_error* XdsBootstrap::ParseXdsServerList(grpc_json* json) {
InlinedVector<grpc_error*, 1> error_list;
server_uri_ = nullptr;
size_t idx = 0;
for (grpc_json *child = json->child; child != nullptr;
child = child->next, ++idx) {
if (child->key != nullptr) {
char* msg;
gpr_asprintf(&msg, "array element %" PRIuPTR " key is not null", idx);
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg));
}
if (child->type != GRPC_JSON_OBJECT) {
char* msg;
gpr_asprintf(&msg, "array element %" PRIuPTR " is not an object", idx);
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg));
} else {
grpc_error* parse_error = ParseXdsServer(child, idx);
if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error);
}
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"xds_servers\" array",
&error_list);
}
grpc_error* XdsBootstrap::ParseXdsServer(grpc_json* json, size_t idx) {
InlinedVector<grpc_error*, 1> error_list;
servers_.emplace_back();
XdsServer& server = servers_[servers_.size() - 1];
bool seen_channel_creds = false;
for (grpc_json* child = json->child; child != nullptr; child = child->next) {
if (child->key == nullptr) {
@ -116,11 +140,11 @@ grpc_error* XdsBootstrap::ParseXdsServer(grpc_json* json) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"server_uri\" field is not a string"));
}
if (server_uri_ != nullptr) {
if (server.server_uri != nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"duplicate \"server_uri\" field"));
}
server_uri_ = child->value;
server.server_uri = child->value;
} else if (strcmp(child->key, "channel_creds") == 0) {
if (child->type != GRPC_JSON_ARRAY) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
@ -131,19 +155,29 @@ grpc_error* XdsBootstrap::ParseXdsServer(grpc_json* json) {
"duplicate \"channel_creds\" field"));
}
seen_channel_creds = true;
grpc_error* parse_error = ParseChannelCredsArray(child);
grpc_error* parse_error = ParseChannelCredsArray(child, &server);
if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error);
}
}
if (server_uri_ == nullptr) {
if (server.server_uri == nullptr) {
error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"\"server_uri\" field not present"));
}
return GRPC_ERROR_CREATE_FROM_VECTOR("errors parsing \"xds_server\" object",
&error_list);
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
if (error_list.empty()) return GRPC_ERROR_NONE;
char* msg;
gpr_asprintf(&msg, "errors parsing index %" PRIuPTR, idx);
grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
gpr_free(msg);
for (size_t i = 0; i < error_list.size(); ++i) {
error = grpc_error_add_child(error, error_list[i]);
}
return error;
}
grpc_error* XdsBootstrap::ParseChannelCredsArray(grpc_json* json) {
grpc_error* XdsBootstrap::ParseChannelCredsArray(grpc_json* json,
XdsServer* server) {
InlinedVector<grpc_error*, 1> error_list;
size_t idx = 0;
for (grpc_json *child = json->child; child != nullptr;
@ -158,7 +192,7 @@ grpc_error* XdsBootstrap::ParseChannelCredsArray(grpc_json* json) {
gpr_asprintf(&msg, "array element %" PRIuPTR " is not an object", idx);
error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg));
} else {
grpc_error* parse_error = ParseChannelCreds(child, idx);
grpc_error* parse_error = ParseChannelCreds(child, idx, server);
if (parse_error != GRPC_ERROR_NONE) error_list.push_back(parse_error);
}
}
@ -166,7 +200,8 @@ grpc_error* XdsBootstrap::ParseChannelCredsArray(grpc_json* json) {
&error_list);
}
grpc_error* XdsBootstrap::ParseChannelCreds(grpc_json* json, size_t idx) {
grpc_error* XdsBootstrap::ParseChannelCreds(grpc_json* json, size_t idx,
XdsServer* server) {
InlinedVector<grpc_error*, 1> error_list;
ChannelCreds channel_creds;
for (grpc_json* child = json->child; child != nullptr; child = child->next) {
@ -195,7 +230,9 @@ grpc_error* XdsBootstrap::ParseChannelCreds(grpc_json* json, size_t idx) {
channel_creds.config = child;
}
}
if (channel_creds.type != nullptr) channel_creds_.push_back(channel_creds);
if (channel_creds.type != nullptr) {
server->channel_creds.push_back(channel_creds);
}
// Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
// string is not static in this case.
if (error_list.empty()) return GRPC_ERROR_NONE;

@ -58,6 +58,11 @@ class XdsBootstrap {
grpc_json* config = nullptr;
};
struct XdsServer {
const char* server_uri = nullptr;
InlinedVector<ChannelCreds, 1> channel_creds;
};
// If *error is not GRPC_ERROR_NONE after returning, then there was an
// error reading the file.
static std::unique_ptr<XdsBootstrap> ReadFromFile(grpc_error** error);
@ -66,16 +71,16 @@ class XdsBootstrap {
XdsBootstrap(grpc_slice contents, grpc_error** error);
~XdsBootstrap();
const char* server_uri() const { return server_uri_; }
const InlinedVector<ChannelCreds, 1>& channel_creds() const {
return channel_creds_;
}
// TODO(roth): We currently support only one server. Fix this when we
// add support for fallback for the xds channel.
const XdsServer& server() const { return servers_[0]; }
const Node* node() const { return node_.get(); }
private:
grpc_error* ParseXdsServer(grpc_json* json);
grpc_error* ParseChannelCredsArray(grpc_json* json);
grpc_error* ParseChannelCreds(grpc_json* json, size_t idx);
grpc_error* ParseXdsServerList(grpc_json* json);
grpc_error* ParseXdsServer(grpc_json* json, size_t idx);
grpc_error* ParseChannelCredsArray(grpc_json* json, XdsServer* server);
grpc_error* ParseChannelCreds(grpc_json* json, size_t idx, XdsServer* server);
grpc_error* ParseNode(grpc_json* json);
grpc_error* ParseLocality(grpc_json* json);
@ -90,8 +95,7 @@ class XdsBootstrap {
grpc_slice contents_;
grpc_json* tree_ = nullptr;
const char* server_uri_ = nullptr;
InlinedVector<ChannelCreds, 1> channel_creds_;
InlinedVector<XdsServer, 1> servers_;
std::unique_ptr<Node> node_;
};

@ -30,8 +30,9 @@ grpc_channel_args* ModifyXdsChannelArgs(grpc_channel_args* args) {
grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
const grpc_channel_args& args) {
if (!bootstrap.channel_creds().empty()) return nullptr;
return grpc_insecure_channel_create(bootstrap.server_uri(), &args, nullptr);
if (!bootstrap.server().channel_creds.empty()) return nullptr;
return grpc_insecure_channel_create(bootstrap.server().server_uri, &args,
nullptr);
}
} // namespace grpc_core

@ -67,12 +67,14 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
const grpc_channel_args& args) {
grpc_channel_credentials* creds = nullptr;
RefCountedPtr<grpc_channel_credentials> creds_to_unref;
if (!bootstrap.channel_creds().empty()) {
for (size_t i = 0; i < bootstrap.channel_creds().size(); ++i) {
if (strcmp(bootstrap.channel_creds()[i].type, "google_default") == 0) {
if (!bootstrap.server().channel_creds.empty()) {
for (size_t i = 0; i < bootstrap.server().channel_creds.size(); ++i) {
if (strcmp(bootstrap.server().channel_creds[i].type, "google_default") ==
0) {
creds = grpc_google_default_credentials_create();
break;
} else if (strcmp(bootstrap.channel_creds()[i].type, "fake") == 0) {
} else if (strcmp(bootstrap.server().channel_creds[i].type, "fake") ==
0) {
creds = grpc_fake_transport_security_credentials_create();
break;
}
@ -83,7 +85,7 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
creds = grpc_channel_credentials_find_in_args(&args);
if (creds == nullptr) {
// Built with security but parent channel is insecure.
return grpc_insecure_channel_create(bootstrap.server_uri(), &args,
return grpc_insecure_channel_create(bootstrap.server().server_uri, &args,
nullptr);
}
}
@ -91,7 +93,7 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
grpc_channel_args* new_args =
grpc_channel_args_copy_and_remove(&args, &arg_to_remove, 1);
grpc_channel* channel = grpc_secure_channel_create(
creds, bootstrap.server_uri(), new_args, nullptr);
creds, bootstrap.server().server_uri, new_args, nullptr);
grpc_channel_args_destroy(new_args);
return channel;
}

@ -1268,7 +1268,7 @@ XdsClient::XdsClient(Combiner* combiner, grpc_pollset_set* interested_parties,
}
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p: creating channel to %s", this,
bootstrap_->server_uri());
bootstrap_->server().server_uri);
}
chand_ = MakeOrphanable<ChannelState>(
Ref(DEBUG_LOCATION, "XdsClient+ChannelState"), channel_args);

@ -0,0 +1,98 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading.Tasks;
namespace Grpc.Core
{
/// <summary>
/// Provides an abstraction over the callback providers
/// used by AsyncUnaryCall, AsyncDuplexStreamingCall, etc
/// </summary>
internal struct AsyncCallState
{
readonly object responseHeadersAsync; // Task<Metadata> or Func<object, Task<Metadata>>
readonly object getStatusFunc; // Func<Status> or Func<object, Status>
readonly object getTrailersFunc; // Func<Metadata> or Func<object, Metadata>
readonly object disposeAction; // Action or Action<object>
readonly object callbackState; // arg0 for the callbacks above, if needed
internal AsyncCallState(
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object callbackState)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = callbackState;
}
internal AsyncCallState(
Task<Metadata> responseHeadersAsync,
Func<Status> getStatusFunc,
Func<Metadata> getTrailersFunc,
Action disposeAction)
{
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callbackState = null;
}
internal Task<Metadata> ResponseHeadersAsync()
{
var withState = responseHeadersAsync as Func<object, Task<Metadata>>;
return withState != null ? withState(callbackState)
: (Task<Metadata>)responseHeadersAsync;
}
internal Status GetStatus()
{
var withState = getStatusFunc as Func<object, Status>;
return withState != null ? withState(callbackState)
: ((Func<Status>)getStatusFunc)();
}
internal Metadata GetTrailers()
{
var withState = getTrailersFunc as Func<object, Metadata>;
return withState != null ? withState(callbackState)
: ((Func<Metadata>)getTrailersFunc)();
}
internal void Dispose()
{
var withState = disposeAction as Action<object>;
if (withState != null)
{
withState(callbackState);
}
else
{
((Action)disposeAction)();
}
}
}
}

@ -31,10 +31,7 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> responseAsync;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
@ -54,10 +51,30 @@ namespace Grpc.Core
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncClientStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream,
Task<TResponse> responseAsync,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseAsync = responseAsync;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -78,7 +95,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -108,7 +125,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -117,7 +134,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -132,7 +149,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -30,10 +30,7 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
@ -53,10 +50,30 @@ namespace Grpc.Core
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="requestStream">Stream of request values.</param>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream,
IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -88,7 +105,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -98,7 +115,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -107,7 +124,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -122,7 +139,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -28,10 +28,7 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
@ -48,10 +45,27 @@ namespace Grpc.Core
Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncDuplexStreamingCall object with the specified properties.
/// </summary>
/// <param name="responseStream">Stream of response values.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.responseStream = responseStream;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -72,7 +86,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -82,7 +96,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -91,7 +105,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -106,7 +120,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -29,10 +29,7 @@ namespace Grpc.Core
public sealed class AsyncUnaryCall<TResponse> : IDisposable
{
readonly Task<TResponse> responseAsync;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
readonly AsyncCallState callState;
/// <summary>
@ -50,10 +47,27 @@ namespace Grpc.Core
Action disposeAction)
{
this.responseAsync = responseAsync;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
this.callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction);
}
/// <summary>
/// Creates a new AsyncUnaryCall object with the specified properties.
/// </summary>
/// <param name="responseAsync">The response of the asynchronous call.</param>
/// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param>
/// <param name="getStatusFunc">Delegate returning the status of the call.</param>
/// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param>
/// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param>
/// <param name="state">State object for use with the callback parameters.</param>
public AsyncUnaryCall(Task<TResponse> responseAsync,
Func<object, Task<Metadata>> responseHeadersAsync,
Func<object, Status> getStatusFunc,
Func<object, Metadata> getTrailersFunc,
Action<object> disposeAction,
object state)
{
this.responseAsync = responseAsync;
callState = new AsyncCallState(responseHeadersAsync, getStatusFunc, getTrailersFunc, disposeAction, state);
}
/// <summary>
@ -74,7 +88,7 @@ namespace Grpc.Core
{
get
{
return this.responseHeadersAsync;
return callState.ResponseHeadersAsync();
}
}
@ -92,7 +106,7 @@ namespace Grpc.Core
/// </summary>
public Status GetStatus()
{
return getStatusFunc();
return callState.GetStatus();
}
/// <summary>
@ -101,7 +115,7 @@ namespace Grpc.Core
/// </summary>
public Metadata GetTrailers()
{
return getTrailersFunc();
return callState.GetTrailers();
}
/// <summary>
@ -116,7 +130,7 @@ namespace Grpc.Core
/// </remarks>
public void Dispose()
{
disposeAction.Invoke();
callState.Dispose();
}
}
}

@ -0,0 +1,82 @@
#region Copyright notice and license
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System.Threading.Tasks;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class AsyncCallStateTest
{
[Test]
public void Stateless()
{
bool disposed = false;
Task<Metadata> responseHeaders = Task.FromResult(new Metadata());
Metadata trailers = new Metadata();
var state = new AsyncCallState(responseHeaders, () => new Status(StatusCode.DataLoss, "oops"),
() => trailers, () => disposed = true);
Assert.AreSame(responseHeaders, state.ResponseHeadersAsync());
var status = state.GetStatus();
Assert.AreEqual(StatusCode.DataLoss, status.StatusCode);
Assert.AreEqual("oops", status.Detail);
Assert.AreSame(trailers, state.GetTrailers());
Assert.False(disposed);
state.Dispose();
Assert.True(disposed);
}
class State
{
public bool disposed = false;
public Task<Metadata> responseHeaders = Task.FromResult(new Metadata());
public Metadata trailers = new Metadata();
public Status status = new Status(StatusCode.DataLoss, "oops");
public void Dispose() { disposed = true; }
}
[Test]
public void WithState()
{
var callbackState = new State();
var state = new AsyncCallState(
obj => ((State)obj).responseHeaders,
obj => ((State)obj).status,
obj => ((State)obj).trailers,
obj => ((State)obj).Dispose(),
callbackState);
Assert.AreSame(callbackState.responseHeaders, state.ResponseHeadersAsync());
var status = state.GetStatus();
Assert.AreEqual(StatusCode.DataLoss, status.StatusCode);
Assert.AreEqual("oops", status.Detail);
Assert.AreSame(callbackState.trailers, state.GetTrailers());
Assert.False(callbackState.disposed);
state.Dispose();
Assert.True(callbackState.disposed);
}
}
}

@ -16,6 +16,7 @@
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@ -59,7 +60,10 @@ namespace Grpc.Core
{
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var asyncResult = asyncCall.UnaryCallAsync(req);
return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncUnaryCall<TResponse>(asyncResult,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -78,7 +82,10 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncServerStreamingCall<TResponse>(responseStream,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -96,7 +103,10 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
/// <summary>
@ -116,7 +126,18 @@ namespace Grpc.Core
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream,
Callbacks<TRequest, TResponse>.GetHeaders, Callbacks<TRequest, TResponse>.GetStatus,
Callbacks<TRequest, TResponse>.GetTrailers, Callbacks<TRequest, TResponse>.Cancel,
asyncCall);
}
private static class Callbacks<TRequest, TResponse>
{
internal static readonly Func<object, Task<Metadata>> GetHeaders = state => ((AsyncCall<TRequest, TResponse>)state).ResponseHeadersAsync;
internal static readonly Func<object, Status> GetStatus = state => ((AsyncCall<TRequest, TResponse>)state).GetStatus();
internal static readonly Func<object, Metadata> GetTrailers = state => ((AsyncCall<TRequest, TResponse>)state).GetTrailers();
internal static readonly Action<object> Cancel = state => ((AsyncCall<TRequest, TResponse>)state).Cancel();
}
}
}

@ -201,18 +201,22 @@ namespace Grpc.HealthCheck.Tests
{
var cts = new CancellationTokenSource();
var context = new TestServerCallContext(cts.Token);
var writer = new TestResponseStreamWriter();
var writer = new TestResponseStreamWriter(started: false);
var impl = new HealthServiceImpl();
var callTask = impl.Watch(new HealthCheckRequest { Service = "" }, writer, context);
// Write new 10 statuses. Only last 5 statuses will be returned when we read them from watch writer
// Write new statuses. Only last statuses will be returned when we read them from watch writer
for (var i = 0; i < HealthServiceImpl.MaxStatusBufferSize * 2; i++)
{
// These statuses aren't "valid" but it is useful for testing to have an incrementing number
impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i);
impl.SetStatus("", (HealthCheckResponse.Types.ServingStatus)i + 10);
}
// Start reading responses now that statuses have been queued up
// This is to keep the test non-flakey
writer.Start();
// Read messages in a background task
var statuses = new List<HealthCheckResponse.Types.ServingStatus>();
var readStatusesTask = Task.Run(async () => {
@ -240,11 +244,11 @@ namespace Grpc.HealthCheck.Tests
Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.ServiceUnknown, statuses[0]);
// Last 5 queued messages
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)5, statuses[1]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)6, statuses[2]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)7, statuses[3]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)8, statuses[4]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)9, statuses[5]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)15, statuses[statuses.Count - 5]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)16, statuses[statuses.Count - 4]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)17, statuses[statuses.Count - 3]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)18, statuses[statuses.Count - 2]);
Assert.AreEqual((HealthCheckResponse.Types.ServingStatus)19, statuses[statuses.Count - 1]);
}
#endif

@ -25,24 +25,42 @@ namespace Grpc.HealthCheck.Tests
{
internal class TestResponseStreamWriter : IServerStreamWriter<HealthCheckResponse>
{
private Channel<HealthCheckResponse> _channel;
private readonly Channel<HealthCheckResponse> _channel;
private readonly TaskCompletionSource<object> _startTcs;
public TestResponseStreamWriter(int maxCapacity = 1)
public TestResponseStreamWriter(int maxCapacity = 1, bool started = true)
{
_channel = System.Threading.Channels.Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(maxCapacity) {
SingleReader = false,
SingleWriter = true,
FullMode = BoundedChannelFullMode.Wait
});
if (!started)
{
_startTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}
public ChannelReader<HealthCheckResponse> WrittenMessagesReader => _channel.Reader;
public WriteOptions WriteOptions { get; set; }
public Task WriteAsync(HealthCheckResponse message)
public async Task WriteAsync(HealthCheckResponse message)
{
return _channel.Writer.WriteAsync(message).AsTask();
if (_startTcs != null)
{
await _startTcs.Task;
}
await _channel.Writer.WriteAsync(message);
}
public void Start()
{
if (_startTcs != null)
{
_startTcs.TrySetResult(null);
}
}
public void Complete()

@ -157,9 +157,6 @@ namespace Grpc.HealthCheck
{
string service = request.Service;
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Channel is used to to marshall multiple callers updating status into a single queue.
// This is required because IServerStreamWriter is not thread safe.
//
@ -205,6 +202,10 @@ namespace Grpc.HealthCheck
channel.Writer.Complete();
});
// Send current status immediately
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Read messages. WaitToReadAsync will wait until new messages are available.
// Loop will exit when the call is canceled and the writer is marked as complete.
while (await channel.Reader.WaitToReadAsync())

@ -3,6 +3,7 @@
"Grpc.Core.Interceptors.Tests.ClientInterceptorTest",
"Grpc.Core.Interceptors.Tests.ServerInterceptorTest",
"Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallStateTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",

@ -38,16 +38,28 @@ void VerifyRegexMatch(grpc_error* error, const std::regex& e) {
TEST(XdsBootstrapTest, Basic) {
const char* json =
"{"
" \"xds_server\": {"
" \"server_uri\": \"fake:///lb\","
" \"channel_creds\": ["
" {"
" \"type\": \"fake\","
" \"ignore\": 0"
" }"
" ],"
" \"ignore\": 0"
" },"
" \"xds_servers\": ["
" {"
" \"server_uri\": \"fake:///lb\","
" \"channel_creds\": ["
" {"
" \"type\": \"fake\","
" \"ignore\": 0"
" }"
" ],"
" \"ignore\": 0"
" },"
" {"
" \"server_uri\": \"ignored\","
" \"channel_creds\": ["
" {"
" \"type\": \"ignored\","
" \"ignore\": 0"
" }"
" ],"
" \"ignore\": 0"
" }"
" ],"
" \"node\": {"
" \"id\": \"foo\","
" \"cluster\": \"bar\","
@ -74,11 +86,11 @@ TEST(XdsBootstrapTest, Basic) {
grpc_slice slice = grpc_slice_from_copied_string(json);
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
EXPECT_EQ(error, GRPC_ERROR_NONE);
EXPECT_STREQ(bootstrap.server_uri(), "fake:///lb");
ASSERT_EQ(bootstrap.channel_creds().size(), 1);
EXPECT_STREQ(bootstrap.channel_creds()[0].type, "fake");
EXPECT_EQ(bootstrap.channel_creds()[0].config, nullptr);
EXPECT_EQ(error, GRPC_ERROR_NONE) << grpc_error_string(error);
EXPECT_STREQ(bootstrap.server().server_uri, "fake:///lb");
ASSERT_EQ(bootstrap.server().channel_creds.size(), 1);
EXPECT_STREQ(bootstrap.server().channel_creds[0].type, "fake");
EXPECT_EQ(bootstrap.server().channel_creds[0].config, nullptr);
ASSERT_NE(bootstrap.node(), nullptr);
EXPECT_STREQ(bootstrap.node()->id, "foo");
EXPECT_STREQ(bootstrap.node()->cluster, "bar");
@ -152,16 +164,18 @@ TEST(XdsBootstrapTest, Basic) {
TEST(XdsBootstrapTest, ValidWithoutChannelCredsAndNode) {
const char* json =
"{"
" \"xds_server\": {"
" \"server_uri\": \"fake:///lb\""
" }"
" \"xds_servers\": ["
" {"
" \"server_uri\": \"fake:///lb\""
" }"
" ]"
"}";
grpc_slice slice = grpc_slice_from_copied_string(json);
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
EXPECT_EQ(error, GRPC_ERROR_NONE);
EXPECT_STREQ(bootstrap.server_uri(), "fake:///lb");
EXPECT_EQ(bootstrap.channel_creds().size(), 0);
EXPECT_STREQ(bootstrap.server().server_uri, "fake:///lb");
EXPECT_EQ(bootstrap.server().channel_creds.size(), 0);
EXPECT_EQ(bootstrap.node(), nullptr);
}
@ -185,30 +199,31 @@ TEST(XdsBootstrapTest, MalformedJson) {
VerifyRegexMatch(error, e);
}
TEST(XdsBootstrapTest, MissingXdsServer) {
TEST(XdsBootstrapTest, MissingXdsServers) {
grpc_slice slice = grpc_slice_from_copied_string("{}");
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
gpr_log(GPR_ERROR, "%s", grpc_error_string(error));
ASSERT_TRUE(error != GRPC_ERROR_NONE);
std::regex e(std::string("\"xds_server\" field not present"));
std::regex e(std::string("\"xds_servers\" field not present"));
VerifyRegexMatch(error, e);
}
TEST(XdsBootstrapTest, BadXdsServer) {
TEST(XdsBootstrapTest, BadXdsServers) {
grpc_slice slice = grpc_slice_from_copied_string(
"{"
" \"xds_server\":1,"
" \"xds_server\":{}"
" \"xds_servers\":1,"
" \"xds_servers\":[{}]"
"}");
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
gpr_log(GPR_ERROR, "%s", grpc_error_string(error));
ASSERT_TRUE(error != GRPC_ERROR_NONE);
std::regex e(
std::string("\"xds_server\" field is not an object(.*)"
"duplicate \"xds_server\" field(.*)"
"errors parsing \"xds_server\" object(.*)"
std::string("\"xds_servers\" field is not an array(.*)"
"duplicate \"xds_servers\" field(.*)"
"errors parsing \"xds_servers\" array(.*)"
"errors parsing index 0(.*)"
"\"server_uri\" field not present"));
VerifyRegexMatch(error, e);
}
@ -216,19 +231,22 @@ TEST(XdsBootstrapTest, BadXdsServer) {
TEST(XdsBootstrapTest, BadXdsServerContents) {
grpc_slice slice = grpc_slice_from_copied_string(
"{"
" \"xds_server\":{"
" \"server_uri\":1,"
" \"server_uri\":\"foo\","
" \"channel_creds\":1,"
" \"channel_creds\":{}"
" }"
" \"xds_servers\":["
" {"
" \"server_uri\":1,"
" \"server_uri\":\"foo\","
" \"channel_creds\":1,"
" \"channel_creds\":{}"
" }"
" ]"
"}");
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
gpr_log(GPR_ERROR, "%s", grpc_error_string(error));
ASSERT_TRUE(error != GRPC_ERROR_NONE);
std::regex e(
std::string("errors parsing \"xds_server\" object(.*)"
std::string("errors parsing \"xds_servers\" array(.*)"
"errors parsing index 0(.*)"
"\"server_uri\" field is not a string(.*)"
"duplicate \"server_uri\" field(.*)"
"\"channel_creds\" field is not an array(.*)"
@ -240,24 +258,27 @@ TEST(XdsBootstrapTest, BadXdsServerContents) {
TEST(XdsBootstrapTest, BadChannelCredsContents) {
grpc_slice slice = grpc_slice_from_copied_string(
"{"
" \"xds_server\":{"
" \"server_uri\":\"foo\","
" \"channel_creds\":["
" {"
" \"type\":0,"
" \"type\":\"fake\","
" \"config\":1,"
" \"config\":{}"
" }"
" ]"
" }"
" \"xds_servers\":["
" {"
" \"server_uri\":\"foo\","
" \"channel_creds\":["
" {"
" \"type\":0,"
" \"type\":\"fake\","
" \"config\":1,"
" \"config\":{}"
" }"
" ]"
" }"
" ]"
"}");
grpc_error* error = GRPC_ERROR_NONE;
grpc_core::XdsBootstrap bootstrap(slice, &error);
gpr_log(GPR_ERROR, "%s", grpc_error_string(error));
ASSERT_TRUE(error != GRPC_ERROR_NONE);
std::regex e(
std::string("errors parsing \"xds_server\" object(.*)"
std::string("errors parsing \"xds_servers\" array(.*)"
"errors parsing index 0(.*)"
"errors parsing \"channel_creds\" array(.*)"
"errors parsing index 0(.*)"
"\"type\" field is not a string(.*)"

@ -105,14 +105,16 @@ constexpr int kDefaultLocalityPriority = 0;
constexpr char kBootstrapFile[] =
"{\n"
" \"xds_server\": {\n"
" \"server_uri\": \"fake:///lb\",\n"
" \"channel_creds\": [\n"
" {\n"
" \"type\": \"fake\"\n"
" }\n"
" ]\n"
" },\n"
" \"xds_servers\": [\n"
" {\n"
" \"server_uri\": \"fake:///lb\",\n"
" \"channel_creds\": [\n"
" {\n"
" \"type\": \"fake\"\n"
" }\n"
" ]\n"
" }\n"
" ],\n"
" \"node\": {\n"
" \"id\": \"xds_end2end_test\",\n"
" \"cluster\": \"test\",\n"
@ -129,14 +131,16 @@ constexpr char kBootstrapFile[] =
constexpr char kBootstrapFileBad[] =
"{\n"
" \"xds_server\": {\n"
" \"server_uri\": \"fake:///wrong_lb\",\n"
" \"channel_creds\": [\n"
" {\n"
" \"type\": \"fake\"\n"
" }\n"
" ]\n"
" },\n"
" \"xds_servers\": [\n"
" {\n"
" \"server_uri\": \"fake:///wrong_lb\",\n"
" \"channel_creds\": [\n"
" {\n"
" \"type\": \"fake\"\n"
" }\n"
" ]\n"
" }\n"
" ],\n"
" \"node\": {\n"
" }\n"
"}\n";

Loading…
Cancel
Save