Merge remote-tracking branch 'upstream/master'

pull/2945/head
Hongyu Chen 9 years ago
commit c963ca374b
  1. 2
      doc/connectivity-semantics-and-api.md
  2. 4
      include/grpc++/client_context.h
  3. 55
      include/grpc/census.h
  4. 15
      src/core/security/client_auth_filter.c
  5. 26
      src/cpp/client/channel.cc
  6. 9
      src/cpp/client/client_context.cc
  7. 62
      src/csharp/Grpc.Core.Tests/ClientBaseTest.cs
  8. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  9. 25
      src/csharp/Grpc.Core/ClientBase.cs
  10. 8
      src/objective-c/GRPCClient/GRPCCall+OAuth2.m
  11. 56
      src/objective-c/GRPCClient/GRPCCall.h
  12. 59
      src/objective-c/GRPCClient/GRPCCall.m
  13. 8
      src/objective-c/tests/GRPCClientTests.m
  14. 2
      src/python/grpcio_health_checking/MANIFEST.in
  15. 9
      src/python/grpcio_health_checking/README.rst
  16. 80
      src/python/grpcio_health_checking/commands.py
  17. 30
      src/python/grpcio_health_checking/grpc/__init__.py
  18. 30
      src/python/grpcio_health_checking/grpc/health/__init__.py
  19. 30
      src/python/grpcio_health_checking/grpc/health/v1alpha/__init__.py
  20. 49
      src/python/grpcio_health_checking/grpc/health/v1alpha/health.proto
  21. 129
      src/python/grpcio_health_checking/grpc/health/v1alpha/health.py
  22. 72
      src/python/grpcio_health_checking/setup.py
  23. 121
      src/ruby/ext/grpc/rb_channel.c
  24. 2
      src/ruby/grpc.gemspec
  25. 61
      src/ruby/lib/grpc/generic/client_stub.rb
  26. 2
      src/ruby/spec/call_spec.rb
  27. 4
      src/ruby/spec/channel_spec.rb
  28. 2
      src/ruby/spec/client_server_spec.rb
  29. 2
      src/ruby/spec/generic/active_call_spec.rb
  30. 2
      test/core/end2end/tests/bad_hostname.c
  31. 82
      test/cpp/qps/client.h
  32. 43
      test/cpp/qps/client_async.cc
  33. 27
      test/cpp/qps/client_sync.cc
  34. 102
      test/cpp/qps/driver.cc
  35. 16
      test/cpp/qps/driver.h
  36. 14
      test/cpp/qps/interarrival.h
  37. 1
      test/cpp/qps/qps_driver.cc
  38. 85
      test/cpp/qps/report.cc
  39. 40
      test/cpp/qps/server_async.cc
  40. 7
      tools/run_tests/build_python.sh

@ -38,7 +38,7 @@ because the server is not yet available), the channel may spend increasingly
large amounts of time in this state.
IDLE: This is the state where the channel is not even trying to create a
connection because of a lack of new or pending RPCs. New channels MAY be created
connection because of a lack of new or pending RPCs. New RPCs MAY be created
in this state. Any attempt to start an RPC on the channel will push the channel
out of this state to connecting. When there has been no RPC activity on a channel
for a specified IDLE_TIMEOUT, i.e., no new or pending (active) RPCs for this

@ -218,15 +218,11 @@ class ClientContext {
void set_call(grpc_call* call,
const std::shared_ptr<ChannelInterface>& channel);
grpc_completion_queue* cq() { return cq_; }
void set_cq(grpc_completion_queue* cq) { cq_ = cq; }
grpc::string authority() { return authority_; }
bool initial_metadata_received_;
std::shared_ptr<ChannelInterface> channel_;
grpc_call* call_;
grpc_completion_queue* cq_;
gpr_timespec deadline_;
grpc::string authority_;
std::shared_ptr<Credentials> creds_;

@ -104,6 +104,61 @@ int census_context_deserialize(const char *buffer, census_context **context);
* future census calls will result in undefined behavior. */
void census_context_destroy(census_context *context);
/* Max number of characters in tag key */
#define CENSUS_MAX_TAG_KEY_LENGTH 20
/* Max number of tag value characters */
#define CENSUS_MAX_TAG_VALUE_LENGTH 50
/* A Census tag set is a collection of key:value string pairs; these form the
basis against which Census metrics will be recorded. Keys are unique within
a tag set. All contexts have an associated tag set. */
typedef struct census_tag_set census_tag_set;
/* Returns a pointer to a newly created, empty tag set. If size_hint > 0,
indicates that the tag set is intended to hold approximately that number
of tags. */
census_tag_set *census_tag_set_create(size_t size_hint);
/* Add a new tag key/value to an existing tag set; if the tag key already exists
in the tag set, then its value is overwritten with the new one. Can also be
used to delete a tag, by specifying a NULL value. If key is NULL, returns
the number of tags in the tag set.
Return values:
-1: invalid length key or value
non-negative value: the number of tags in the tag set. */
int census_tag_set_add(census_tag_set *tags, const char *key,
const char *value);
/* Destroys a tag set. This function must be called to prevent memory leaks.
Once called, the tag set cannot be used again. */
void census_tag_set_destroy(census_tag_set *tags);
/* Get a contexts tag set. */
census_tag_set *census_context_tag_set(census_context *context);
/* A read-only representation of a tag for use by census clients. */
typedef struct {
size_t key_len; /* Number of bytes in tag key. */
const char *key; /* A pointer to the tag key. May not be null-terminated. */
size_t value_len; /* Number of bytes in tag value. */
const char *value; /* Pointer to the tag value. May not be null-terminated. */
} census_tag_const;
/* Used to iterate through a tag sets contents. */
typedef struct census_tag_set_iterator census_tag_set_iterator;
/* Open a tag set for iteration. The tag set must not be modified while
iteration is ongoing. Returns an iterator for use in following functions. */
census_tag_set_iterator *census_tag_set_open(census_tag_set *tags);
/* Get the next tag in the tag set, by writing into the 'tag' argument. Returns
1 if there is a "next" tag, 0 if there are no more tags. */
int census_tag_set_next(census_tag_set_iterator *it, census_tag_const *tag);
/* Close an iterator opened by census_tag_set_open(). The iterator will be
invalidated, and should not be used once close is called. */
void census_tag_set_close(census_tag_set_iterator *it);
/* A census statistic to be recorded comprises two parts: an ID for the
* particular statistic and the value to be recorded against it. */
typedef struct {

@ -75,11 +75,11 @@ typedef struct {
grpc_mdstr *status_key;
} channel_data;
static void bubble_up_error(grpc_call_element *elem, const char *error_msg) {
static void bubble_up_error(grpc_call_element *elem, grpc_status_code status,
const char *error_msg) {
call_data *calld = elem->call_data;
gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg);
grpc_transport_stream_op_add_cancellation(&calld->op,
GRPC_STATUS_UNAUTHENTICATED);
grpc_transport_stream_op_add_cancellation(&calld->op, status);
grpc_call_next_op(elem, &calld->op);
}
@ -94,7 +94,8 @@ static void on_credentials_metadata(void *user_data,
grpc_metadata_batch *mdb;
size_t i;
if (status != GRPC_CREDENTIALS_OK) {
bubble_up_error(elem, "Credentials failed to get metadata.");
bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED,
"Credentials failed to get metadata.");
return;
}
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
@ -154,7 +155,7 @@ static void send_security_metadata(grpc_call_element *elem,
if (channel_creds_has_md && call_creds_has_md) {
calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds);
if (calld->creds == NULL) {
bubble_up_error(elem,
bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT,
"Incompatible credentials set on channel and call.");
return;
}
@ -182,7 +183,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
bubble_up_error(elem, error_msg);
bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg);
}
}
@ -252,7 +253,7 @@ static void auth_start_transport_op(grpc_call_element *elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
bubble_up_error(elem, error_msg);
bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg);
}
return; /* early exit */

@ -61,19 +61,25 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) {
const char* host_str = host_.empty() ? NULL : host_.c_str();
auto c_call = method.channel_tag() && context->authority().empty()
? grpc_channel_create_registered_call(
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = NULL;
if (kRegistered) {
c_call = grpc_channel_create_registered_call(
c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(), cq->cq(),
method.channel_tag(), context->raw_deadline())
: grpc_channel_create_call(
c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(), cq->cq(),
method.name(), context->authority().empty()
? host_str
: context->authority().c_str(),
method.channel_tag(), context->raw_deadline());
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
host_str = context->authority().c_str();
} else if (!host_.empty()) {
host_str = host_.c_str();
}
c_call = grpc_channel_create_call(c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(),
cq->cq(), method.name(), host_str,
context->raw_deadline());
}
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this());

@ -48,7 +48,6 @@ namespace grpc {
ClientContext::ClientContext()
: initial_metadata_received_(false),
call_(nullptr),
cq_(nullptr),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
propagate_from_call_(nullptr) {}
@ -56,14 +55,6 @@ ClientContext::~ClientContext() {
if (call_) {
grpc_call_destroy(call_);
}
if (cq_) {
// Drain cq_.
grpc_completion_queue_shutdown(cq_);
while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME))
.type != GRPC_QUEUE_SHUTDOWN)
;
grpc_completion_queue_destroy(cq_);
}
}
std::unique_ptr<ClientContext> ClientContext::FromServerContext(

@ -0,0 +1,62 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ClientBaseTest
{
[Test]
public void GetAuthUriBase_Valid()
{
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("some.googleapi.com"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("dns:///some.googleapi.com/"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("dns:///some.googleapi.com:443/"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("some.googleapi.com:443/"));
}
[Test]
public void GetAuthUriBase_Invalid()
{
Assert.IsNull(ClientBase.GetAuthUriBase("some.googleapi.com:"));
Assert.IsNull(ClientBase.GetAuthUriBase("https://some.googleapi.com/"));
Assert.IsNull(ClientBase.GetAuthUriBase("dns://some.googleapi.com:443")); // just two slashes
Assert.IsNull(ClientBase.GetAuthUriBase(""));
}
}
}

@ -63,6 +63,7 @@
<Compile Include="..\Grpc.Core\Version.cs">
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />

@ -33,9 +33,10 @@
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using Grpc.Core.Internal;
using System.Text.RegularExpressions;
using Grpc.Core.Utils;
namespace Grpc.Core
{
@ -46,15 +47,16 @@ namespace Grpc.Core
/// </summary>
public abstract class ClientBase
{
static readonly Regex TrailingPortPattern = new Regex(":[0-9]+/?$");
// Regex for removal of the optional DNS scheme, trailing port, and trailing backslash
static readonly Regex ChannelTargetPattern = new Regex(@"^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$");
readonly Channel channel;
readonly string authUriBase;
public ClientBase(Channel channel)
{
this.channel = channel;
// TODO(jtattermush): we shouldn't need to hand-curate the channel.Target contents.
this.authUriBase = "https://" + TrailingPortPattern.Replace(channel.Target, "") + "/";
this.authUriBase = GetAuthUriBase(channel.Target);
}
/// <summary>
@ -104,10 +106,23 @@ namespace Grpc.Core
{
options = options.WithHeaders(new Metadata());
}
var authUri = authUriBase + method.ServiceName;
var authUri = authUriBase != null ? authUriBase + method.ServiceName : null;
interceptor(authUri, options.Headers);
}
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
}
/// <summary>
/// Creates Auth URI base from channel's target (the one passed at channel creation).
/// Fully-qualified service name is to be appended to this.
/// </summary>
internal static string GetAuthUriBase(string target)
{
var match = ChannelTargetPattern.Match(target);
if (!match.Success) {
return null;
}
return "https://" + match.Groups[2].Value + "/";
}
}
}

@ -40,7 +40,7 @@ static NSString * const kChallengeHeader = @"www-authenticate";
@implementation GRPCCall (OAuth2)
- (NSString *)oauth2AccessToken {
NSString *headerValue = self.requestMetadata[kAuthorizationHeader];
NSString *headerValue = self.requestHeaders[kAuthorizationHeader];
if ([headerValue hasPrefix:kBearerPrefix]) {
return [headerValue substringFromIndex:kBearerPrefix.length];
} else {
@ -50,14 +50,14 @@ static NSString * const kChallengeHeader = @"www-authenticate";
- (void)setOauth2AccessToken:(NSString *)token {
if (token) {
self.requestMetadata[kAuthorizationHeader] = [kBearerPrefix stringByAppendingString:token];
self.requestHeaders[kAuthorizationHeader] = [kBearerPrefix stringByAppendingString:token];
} else {
[self.requestMetadata removeObjectForKey:kAuthorizationHeader];
[self.requestHeaders removeObjectForKey:kAuthorizationHeader];
}
}
- (NSString *)oauth2ChallengeHeader {
return self.responseMetadata[kChallengeHeader];
return self.responseHeaders[kChallengeHeader];
}
@end

@ -48,8 +48,10 @@
#import <Foundation/Foundation.h>
#import <RxLibrary/GRXWriter.h>
// Key used in |NSError|'s |userInfo| dictionary to store the response metadata sent by the server.
extern id const kGRPCStatusMetadataKey;
// Keys used in |NSError|'s |userInfo| dictionary to store the response headers and trailers sent by
// the server.
extern id const kGRPCHeadersKey;
extern id const kGRPCTrailersKey;
// Represents a single gRPC remote call.
@interface GRPCCall : GRXWriter
@ -57,43 +59,49 @@ extern id const kGRPCStatusMetadataKey;
// These HTTP headers will be passed to the server as part of this call. Each HTTP header is a
// name-value pair with string names and either string or binary values.
//
// The passed dictionary has to use NSString keys, corresponding to the header names. The
// value associated to each can be a NSString object or a NSData object. E.g.:
// The passed dictionary has to use NSString keys, corresponding to the header names. The value
// associated to each can be a NSString object or a NSData object. E.g.:
//
// call.requestMetadata = @{@"Authorization": @"Bearer ..."};
// call.requestHeaders = @{@"authorization": @"Bearer ..."};
//
// call.requestMetadata[@"SomeBinaryHeader"] = someData;
// call.requestHeaders[@"my-header-bin"] = someData;
//
// After the call is started, modifying this won't have any effect.
// After the call is started, trying to modify this property is an error.
//
// For convenience, the property is initialized to an empty NSMutableDictionary, and the setter
// accepts (and copies) both mutable and immutable dictionaries.
- (NSMutableDictionary *)requestMetadata; // nonatomic
- (void)setRequestMetadata:(NSDictionary *)requestMetadata; // nonatomic, copy
- (NSMutableDictionary *)requestHeaders; // nonatomic
- (void)setRequestHeaders:(NSDictionary *)requestHeaders; // nonatomic, copy
// This dictionary is populated with the HTTP headers received from the server. When the RPC ends,
// the HTTP trailers received are added to the dictionary too. It has the same structure as the
// request metadata dictionary.
// This dictionary is populated with the HTTP headers received from the server. This happens before
// any response message is received from the server. It has the same structure as the request
// headers dictionary: Keys are NSString header names; names ending with the suffix "-bin" have a
// NSData value; the others have a NSString value.
//
// The first time this object calls |writeValue| on the writeable passed to |startWithWriteable|,
// the |responseMetadata| dictionary already contains the response headers. When it calls
// |writesFinishedWithError|, the dictionary contains both the response headers and trailers.
@property(atomic, readonly) NSDictionary *responseMetadata;
// The value of this property is nil until all response headers are received, and will change before
// any of -writeValue: or -writesFinishedWithError: are sent to the writeable.
@property(atomic, readonly) NSDictionary *responseHeaders;
// Same as responseHeaders, but populated with the HTTP trailers received from the server before the
// call finishes.
//
// The value of this property is nil until all response trailers are received, and will change
// before -writesFinishedWithError: is sent to the writeable.
@property(atomic, readonly) NSDictionary *responseTrailers;
// The request writer has to write NSData objects into the provided Writeable. The server will
// receive each of those separately and in order.
// A gRPC call might not complete until the request writer finishes. On the other hand, the
// request finishing doesn't necessarily make the call to finish, as the server might continue
// sending messages to the response side of the call indefinitely (depending on the semantics of
// the specific remote method called).
// receive each of those separately and in order as distinct messages.
// A gRPC call might not complete until the request writer finishes. On the other hand, the request
// finishing doesn't necessarily make the call to finish, as the server might continue sending
// messages to the response side of the call indefinitely (depending on the semantics of the
// specific remote method called).
// To finish a call right away, invoke cancel.
- (instancetype)initWithHost:(NSString *)host
path:(NSString *)path
requestsWriter:(GRXWriter *)requestsWriter NS_DESIGNATED_INITIALIZER;
// Finishes the request side of this call, notifies the server that the RPC
// should be cancelled, and finishes the response side of the call with an error
// of code CANCELED.
// Finishes the request side of this call, notifies the server that the RPC should be cancelled, and
// finishes the response side of the call with an error of code CANCELED.
- (void)cancel;
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?

@ -42,9 +42,13 @@
#import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h"
NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
@interface GRPCCall () <GRXWriteable>
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
@end
// The following methods of a C gRPC call object aren't reentrant, and thus
@ -89,8 +93,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// the response arrives.
GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
NSMutableDictionary *_requestHeaders;
}
@synthesize state = _state;
@ -121,24 +124,19 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_requestWriter = requestWriter;
_requestMetadata = [NSMutableDictionary dictionary];
_responseMetadata = [NSMutableDictionary dictionary];
_requestHeaders = [NSMutableDictionary dictionary];
}
return self;
}
#pragma mark Metadata
- (NSMutableDictionary *)requestMetadata {
return _requestMetadata;
- (NSMutableDictionary *)requestHeaders {
return _requestHeaders;
}
- (void)setRequestMetadata:(NSDictionary *)requestMetadata {
_requestMetadata = [NSMutableDictionary dictionaryWithDictionary:requestMetadata];
}
- (NSDictionary *)responseMetadata {
return _responseMetadata;
- (void)setRequestHeaders:(NSDictionary *)requestHeaders {
_requestHeaders = [NSMutableDictionary dictionaryWithDictionary:requestHeaders];
}
#pragma mark Finish
@ -232,11 +230,10 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
#pragma mark Send headers
// TODO(jcanizales): Rename to commitHeaders.
- (void)sendHeaders:(NSDictionary *)metadata {
- (void)sendHeaders:(NSDictionary *)headers {
// TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc]
initWithMetadata:metadata ?: @{} handler:nil]]];
initWithMetadata:headers ?: @{} handler:nil]]];
}
#pragma mark GRXWriteable implementation
@ -305,35 +302,45 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Both handlers will eventually be called, from the network queue. Writes can start immediately
// after this.
// The first one (metadataHandler), when the response headers are received.
// The first one (headersHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithMetadataHandler:(void(^)(NSDictionary *))metadataHandler
- (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler {
// TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
initWithHandler:metadataHandler]]];
initWithHandler:headersHandler]]];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
initWithHandler:completionHandler]]];
}
- (void)invokeCall {
__weak GRPCCall *weakSelf = self;
[self invokeCallWithMetadataHandler:^(NSDictionary *headers) {
[self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
[strongSelf->_responseMetadata addEntriesFromDictionary:headers];
strongSelf.responseHeaders = headers;
[strongSelf startNextRead];
}
} completionHandler:^(NSError *error, NSDictionary *trailers) {
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
[strongSelf->_responseMetadata addEntriesFromDictionary:trailers];
strongSelf.responseTrailers = trailers;
if (error) {
NSMutableDictionary *userInfo =
[NSMutableDictionary dictionaryWithDictionary:error.userInfo];
userInfo[kGRPCStatusMetadataKey] = strongSelf->_responseMetadata;
NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
if (error.userInfo) {
[userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when this is
// done that we can provide a merged view of response headers and trailers in a thread-safe
// way.
if (strongSelf.responseHeaders) {
userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
}
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
}
[strongSelf finishWithError:error];
@ -356,7 +363,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];
[self sendHeaders:_requestHeaders];
[self invokeCall];
}

@ -168,11 +168,13 @@ static ProtoMethod *kUnaryCallMethod;
} completionHandler:^(NSError *errorOrNil) {
XCTAssertNotNil(errorOrNil, @"Finished without error!");
XCTAssertEqual(errorOrNil.code, 16, @"Finished with unexpected error: %@", errorOrNil);
XCTAssertEqualObjects(call.responseMetadata, errorOrNil.userInfo[kGRPCStatusMetadataKey],
@"Metadata in the NSError object and call object differ.");
XCTAssertEqualObjects(call.responseHeaders, errorOrNil.userInfo[kGRPCHeadersKey],
@"Headers in the NSError object and call object differ.");
XCTAssertEqualObjects(call.responseTrailers, errorOrNil.userInfo[kGRPCTrailersKey],
@"Trailers in the NSError object and call object differ.");
NSString *challengeHeader = call.oauth2ChallengeHeader;
XCTAssertGreaterThan(challengeHeader.length, 0,
@"No challenge in response headers %@", call.responseMetadata);
@"No challenge in response headers %@", call.responseHeaders);
[expectation fulfill];
}];

@ -0,0 +1,2 @@
graft grpc
include commands.py

@ -0,0 +1,9 @@
gRPC Python Health Checking
===========================
Reference package for GRPC Python health checking.
Dependencies
------------
Depends on the `grpcio` package, available from PyPI via `pip install grpcio`.

@ -0,0 +1,80 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Provides distutils command classes for the GRPC Python setup process."""
import distutils
import glob
import os
import os.path
import subprocess
import sys
import setuptools
from setuptools.command import build_py
class BuildProtoModules(setuptools.Command):
"""Command to generate project *_pb2.py modules from proto files."""
description = ''
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
self.protoc_command = 'protoc'
self.grpc_python_plugin_command = distutils.spawn.find_executable(
'grpc_python_plugin')
def run(self):
paths = []
root_directory = os.getcwd()
for walk_root, directories, filenames in os.walk(root_directory):
for filename in filenames:
if filename.endswith('.proto'):
paths.append(os.path.join(walk_root, filename))
command = [
self.protoc_command,
'--plugin=protoc-gen-python-grpc={}'.format(
self.grpc_python_plugin_command),
'-I {}'.format(root_directory),
'--python_out={}'.format(root_directory),
'--python-grpc_out={}'.format(root_directory),
] + paths
subprocess.check_call(' '.join(command), cwd=root_directory, shell=True)
class BuildPy(build_py.build_py):
"""Custom project build command."""
def run(self):
self.run_command('build_proto_modules')
build_py.build_py.run(self)

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,30 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -0,0 +1,49 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
}
ServingStatus status = 1;
}
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
}

@ -0,0 +1,129 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Reference implementation for health checking in gRPC Python."""
import abc
import enum
import threading
from grpc.health.v1alpha import health_pb2
@enum.unique
class HealthStatus(enum.Enum):
"""Statuses for a service mirroring the reference health.proto's values."""
UNKNOWN = health_pb2.HealthCheckResponse.UNKNOWN
SERVING = health_pb2.HealthCheckResponse.SERVING
NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING
class _HealthServicer(health_pb2.EarlyAdopterHealthServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self):
self._server_status_lock = threading.Lock()
self._server_status = {}
def Check(self, request, context):
with self._server_status_lock:
if request.service not in self._server_status:
# TODO(atash): once the Python API has a way of setting the server
# status, bring us into conformance with the health check spec by
# returning the NOT_FOUND status here.
raise NotImplementedError()
else:
return health_pb2.HealthCheckResponse(
status=self._server_status[request.service].value)
def set(service, status):
if not isinstance(status, HealthStatus):
raise TypeError('expected grpc.health.v1alpha.health.HealthStatus '
'for argument `status` but got {}'.format(status))
with self._server_status_lock:
self._server_status[service] = status
class HealthServer(health_pb2.EarlyAdopterHealthServer):
"""Interface for the reference gRPC Python health server."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def start(self):
raise NotImplementedError()
@abc.abstractmethod
def stop(self):
raise NotImplementedError()
@abc.abstractmethod
def set(self, service, status):
"""Set the status of the given service.
Args:
service (str): service name of the service to set the reported status of
status (HealthStatus): status to set for the specified service
"""
raise NotImplementedError()
class _HealthServerImplementation(HealthServer):
"""Implementation for the reference gRPC Python health server."""
def __init__(self, server, servicer):
self._server = server
self._servicer = servicer
def start(self):
self._server.start()
def stop(self):
self._server.stop()
def set(self, service, status):
self._servicer.set(service, status)
def create_Health_server(port, private_key=None, certificate_chain=None):
"""Get a HealthServer instance.
Args:
port (int): port number passed through to health_pb2 server creation
routine.
private_key (str): to-be-created server's desired private key
certificate_chain (str): to-be-created server's desired certificate chain
Returns:
An instance of HealthServer (conforming thus to
EarlyAdopterHealthServer and providing a method to set server status)."""
servicer = _HealthServicer()
server = health_pb2.early_adopter_create_Health_server(
servicer, port=port, private_key=private_key,
certificate_chain=certificate_chain)
return _HealthServerImplementation(server, servicer)

@ -0,0 +1,72 @@
# Copyright 2015, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Setup module for the GRPC Python package's optional health checking."""
import os
import os.path
import sys
from distutils import core as _core
import setuptools
# Ensure we're in the proper directory whether or not we're being used by pip.
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Break import-style to ensure we can actually find our commands module.
import commands
_PACKAGES = (
setuptools.find_packages('.')
)
_PACKAGE_DIRECTORIES = {
'': '.',
}
_INSTALL_REQUIRES = (
'grpcio>=0.10.0a0',
)
_SETUP_REQUIRES = _INSTALL_REQUIRES
_COMMAND_CLASS = {
'build_proto_modules': commands.BuildProtoModules,
'build_py': commands.BuildPy,
}
setuptools.setup(
name='grpcio_health_checking',
version='0.10.0a0',
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=_INSTALL_REQUIRES,
setup_requires=_SETUP_REQUIRES,
cmdclass=_COMMAND_CLASS
)

@ -165,6 +165,65 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
return self;
}
/*
call-seq:
insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'})
creds = ...
secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
Creates channel instances. */
static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) {
VALUE try_to_connect = Qfalse;
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", try_to_connect);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
return NUM2LONG(
grpc_channel_check_connectivity_state(ch, (int)try_to_connect));
}
/* Watch for a change in connectivity state.
Once the channel connectivity state is different from the last observed
state, tag will be enqueued on cq with success=1
If deadline expires BEFORE the state is changed, tag will be enqueued on
the completion queue with success=0 */
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE cqueue,
VALUE deadline,
VALUE tag) {
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
grpc_channel_watch_connectivity_state(
ch,
NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0),
cq,
ROBJECT(tag));
return Qnil;
}
/* Clones Channel instances.
Gives Channel a consistent implementation of Ruby's object copy/dup
@ -195,18 +254,28 @@ static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
VALUE host, VALUE deadline) {
static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
VALUE parent, VALUE mask,
VALUE method, VALUE host,
VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
grpc_call *parent_call = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
int flags = GRPC_PROPAGATE_DEFAULTS;
char *method_chars = StringValueCStr(method);
char *host_chars = NULL;
if (host != Qnil) {
host_chars = StringValueCStr(host);
}
if (mask != Qnil) {
flags = NUM2UINT(mask);
}
if (parent != Qnil) {
parent_call = grpc_rb_get_wrapped_call(parent);
}
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
@ -216,9 +285,9 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return Qnil;
}
call = grpc_channel_create_call(ch, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
method_chars, host_chars,
grpc_rb_time_timeval(deadline,
call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars,
host_chars, grpc_rb_time_timeval(
deadline,
/* absolute time */ 0));
if (call == NULL) {
rb_raise(rb_eRuntimeError, "cannot create call with method %s",
@ -237,6 +306,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue, VALUE method,
return res;
}
/* Closes the channel, calling it's destroy method */
static VALUE grpc_rb_channel_destroy(VALUE self) {
grpc_rb_channel *wrapper = NULL;
@ -268,6 +338,38 @@ static VALUE grpc_rb_channel_get_target(VALUE self) {
return res;
}
static void Init_grpc_propagate_masks() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mPropagateMasks = rb_define_module_under(
grpc_rb_mGrpcCore, "PropagateMasks");
rb_define_const(grpc_rb_mPropagateMasks, "DEADLINE",
UINT2NUM(GRPC_PROPAGATE_DEADLINE));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_STATS_CONTEXT",
UINT2NUM(GRPC_PROPAGATE_CENSUS_STATS_CONTEXT));
rb_define_const(grpc_rb_mPropagateMasks, "CENSUS_TRACING_CONTEXT",
UINT2NUM(GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT));
rb_define_const(grpc_rb_mPropagateMasks, "CANCELLATION",
UINT2NUM(GRPC_PROPAGATE_CANCELLATION));
rb_define_const(grpc_rb_mPropagateMasks, "DEFAULTS",
UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
}
static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mConnectivityStates = rb_define_module_under(
grpc_rb_mGrpcCore, "ConnectivityStates");
rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
LONG2NUM(GRPC_CHANNEL_IDLE));
rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
LONG2NUM(GRPC_CHANNEL_CONNECTING));
rb_define_const(grpc_rb_mConnectivityStates, "READY",
LONG2NUM(GRPC_CHANNEL_READY));
rb_define_const(grpc_rb_mConnectivityStates, "TRANSIENT_FAILURE",
LONG2NUM(GRPC_CHANNEL_TRANSIENT_FAILURE));
rb_define_const(grpc_rb_mConnectivityStates, "FATAL_FAILURE",
LONG2NUM(GRPC_CHANNEL_FATAL_FAILURE));
}
void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel =
@ -282,8 +384,13 @@ void Init_grpc_channel() {
grpc_rb_channel_init_copy, 1);
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state,
-1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 4);
grpc_rb_channel_create_call, 6);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
@ -299,6 +406,8 @@ void Init_grpc_channel() {
ID2SYM(rb_intern(GRPC_ARG_MAX_CONCURRENT_STREAMS)));
rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
Init_grpc_propagate_masks();
Init_grpc_connectivity_states();
}
/* Gets the wrapped channel from the ruby wrapper */

@ -22,7 +22,7 @@ Gem::Specification.new do |s|
s.files += Dir.glob('bin/**/*')
s.test_files = Dir.glob('spec/**/*')
%w(math noproto).each do |b|
s.executables += [ "#{b}_client.rb", "#{b}_server.rb" ]
s.executables += ["#{b}_client.rb", "#{b}_server.rb"]
end
s.require_paths = %w( bin lib )
s.platform = Gem::Platform::RUBY

@ -32,6 +32,8 @@ require 'grpc/version'
# GRPC contains the General RPC module.
module GRPC
# rubocop:disable Metrics/ParameterLists
# ClientStub represents an endpoint used to send requests to GRPC servers.
class ClientStub
include Core::StatusCodes
@ -68,6 +70,12 @@ module GRPC
update_metadata
end
# Allows users of the stub to modify the propagate mask.
#
# This is an advanced feature for use when making calls to another gRPC
# server whilst running in the handler of an existing one.
attr_writer :propagate_mask
# Creates a new ClientStub.
#
# Minimally, a stub is created with the just the host of the gRPC service
@ -91,8 +99,8 @@ module GRPC
#
# - :update_metadata
# when present, this a func that takes a hash and returns a hash
# it can be used to update metadata, i.e, remove, change or update
# amend metadata values.
# it can be used to update metadata, i.e, remove, or amend
# metadata values.
#
# @param host [String] the host the stub connects to
# @param q [Core::CompletionQueue] used to wait for events
@ -105,6 +113,7 @@ module GRPC
channel_override: nil,
timeout: nil,
creds: nil,
propagate_mask: nil,
update_metadata: nil,
**kw)
fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@ -113,6 +122,7 @@ module GRPC
@update_metadata = ClientStub.check_update_metadata(update_metadata)
alt_host = kw[Core::Channel::SSL_TARGET]
@host = alt_host.nil? ? host : alt_host
@propagate_mask = propagate_mask
@timeout = timeout.nil? ? DEFAULT_TIMEOUT : timeout
end
@ -151,11 +161,15 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param return_op [true|false] return an Operation if true
# @return [Object] the response received from the server
def request_response(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, timeout)
return_op: false,
parent: parent,
**kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.request_response(req, **md) unless return_op
@ -210,10 +224,14 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false] return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @return [Object|Operation] the response received from the server
def client_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw)
c = new_active_call(method, marshal, unmarshal, timeout)
return_op: false,
parent: nil,
**kw)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.client_streamer(requests, **md) unless return_op
@ -276,11 +294,16 @@ module GRPC
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] the max completion time in seconds
# @param return_op [true|false]return an Operation if true
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|Operation|nil] as discussed above
def server_streamer(method, req, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, timeout)
return_op: false,
parent: nil,
**kw,
&blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.server_streamer(req, **md, &blk) unless return_op
@ -381,12 +404,17 @@ module GRPC
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param timeout [Numeric] (optional) the max completion time in seconds
# @param blk [Block] when provided, is executed for each response
# @param parent [Core::Call] a prior call whose reserved metadata
# will be propagated by this one.
# @param return_op [true|false] return an Operation if true
# @param blk [Block] when provided, is executed for each response
# @return [Enumerator|nil|Operation] as discussed above
def bidi_streamer(method, requests, marshal, unmarshal, timeout = nil,
return_op: false, **kw, &blk)
c = new_active_call(method, marshal, unmarshal, timeout)
return_op: false,
parent: nil,
**kw,
&blk)
c = new_active_call(method, marshal, unmarshal, timeout, parent: parent)
kw_with_jwt_uri = self.class.update_with_jwt_aud_uri(kw, @host, method)
md = @update_metadata.nil? ? kw : @update_metadata.call(kw_with_jwt_uri)
return c.bidi_streamer(requests, **md, &blk) unless return_op
@ -407,10 +435,17 @@ module GRPC
# @param method [string] the method being called.
# @param marshal [Function] f(obj)->string that marshals requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param parent [Grpc::Call] a parent call, available when calls are
# made from server
# @param timeout [TimeConst]
def new_active_call(method, marshal, unmarshal, timeout = nil)
def new_active_call(method, marshal, unmarshal, timeout = nil, parent: nil)
deadline = from_relative_time(timeout.nil? ? @timeout : timeout)
call = @ch.create_call(@queue, method, nil, deadline)
call = @ch.create_call(@queue,
parent, # parent call
@propagate_mask, # propagation options
method,
nil, # host use nil,
deadline)
ActiveCall.new(call, @queue, marshal, unmarshal, deadline, started: false)
end
end

@ -137,7 +137,7 @@ describe GRPC::Core::Call do
end
def make_test_call
@ch.create_call(client_queue, 'dummy_method', nil, deadline)
@ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline)
end
def deadline

@ -117,7 +117,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(cq, 'dummy_method', nil, deadline)
ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to_not raise_error
end
@ -128,7 +128,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
ch.create_call(cq, 'dummy_method', nil, deadline)
ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to raise_error(RuntimeError)
end

@ -61,7 +61,7 @@ shared_context 'setup: tags' do
end
def new_client_call
@ch.create_call(@client_queue, '/method', nil, deadline)
@ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end
end

@ -338,7 +338,7 @@ describe GRPC::ActiveCall do
end
def make_test_call
@ch.create_call(@client_queue, '/method', nil, deadline)
@ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
end
def deadline

@ -146,7 +146,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNAUTHENTICATED);
GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv);

@ -41,6 +41,7 @@
#include <condition_variable>
#include <mutex>
#include <grpc++/config.h>
namespace grpc {
@ -67,10 +68,12 @@ typedef std::chrono::time_point<grpc_time_source> grpc_time;
class Client {
public:
explicit Client(const ClientConfig& config)
: timer_(new Timer), interarrival_timer_() {
: channels_(config.client_channels()),
timer_(new Timer),
interarrival_timer_() {
for (int i = 0; i < config.client_channels(); i++) {
channels_.push_back(ClientChannelInfo(
config.server_targets(i % config.server_targets_size()), config));
channels_[i].init(config.server_targets(i % config.server_targets_size()),
config);
}
request_.set_response_type(grpc::testing::PayloadType::COMPRESSABLE);
request_.set_response_size(config.payload_size());
@ -79,7 +82,8 @@ class Client {
ClientStats Mark() {
Histogram latencies;
std::vector<Histogram> to_merge(threads_.size());
// avoid std::vector for old compilers that expect a copy constructor
Histogram* to_merge = new Histogram[threads_.size()];
for (size_t i = 0; i < threads_.size(); i++) {
threads_[i]->BeginSwap(&to_merge[i]);
}
@ -89,6 +93,7 @@ class Client {
threads_[i]->EndSwap();
latencies.Merge(&to_merge[i]);
}
delete[] to_merge;
auto timer_result = timer->Mark();
@ -106,9 +111,20 @@ class Client {
class ClientChannelInfo {
public:
ClientChannelInfo(const grpc::string& target, const ClientConfig& config)
: channel_(CreateTestChannel(target, config.enable_ssl())),
stub_(TestService::NewStub(channel_)) {}
ClientChannelInfo() {}
ClientChannelInfo(const ClientChannelInfo& i) {
// The copy constructor is to satisfy old compilers
// that need it for using std::vector . It is only ever
// used for empty entries
GPR_ASSERT(!i.channel_ && !i.stub_);
}
void init(const grpc::string& target, const ClientConfig& config) {
// We have to use a 2-phase init like this with a default
// constructor followed by an initializer function to make
// old compilers happy with using this in std::vector
channel_ = CreateTestChannel(target, config.enable_ssl());
stub_ = TestService::NewStub(channel_);
}
ChannelInterface* get_channel() { return channel_.get(); }
TestService::Stub* get_stub() { return stub_.get(); }
@ -189,27 +205,9 @@ class Client {
Thread(Client* client, size_t idx)
: done_(false),
new_(nullptr),
impl_([this, idx, client]() {
for (;;) {
// run the loop body
bool thread_still_ok = client->ThreadFunc(&histogram_, idx);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}) {}
client_(client),
idx_(idx),
impl_(&Thread::ThreadFunc, this) {}
~Thread() {
{
@ -226,13 +224,37 @@ class Client {
void EndSwap() {
std::unique_lock<std::mutex> g(mu_);
cv_.wait(g, [this]() { return new_ == nullptr; });
while (new_ != nullptr) {
cv_.wait(g);
};
}
private:
Thread(const Thread&);
Thread& operator=(const Thread&);
void ThreadFunc() {
for (;;) {
// run the loop body
const bool thread_still_ok = client_->ThreadFunc(&histogram_, idx_);
// lock, see if we're done
std::lock_guard<std::mutex> g(mu_);
if (!thread_still_ok) {
gpr_log(GPR_ERROR, "Finishing client thread due to RPC error");
done_ = true;
}
if (done_) {
return;
}
// check if we're marking, swap out the histogram if so
if (new_) {
new_->Swap(&histogram_);
new_ = nullptr;
cv_.notify_one();
}
}
}
TestService::Stub* stub_;
ClientConfig config_;
std::mutex mu_;
@ -240,6 +262,8 @@ class Client {
bool done_;
Histogram* new_;
Histogram histogram_;
Client* client_;
size_t idx_;
std::thread impl_;
};

@ -156,7 +156,7 @@ class AsyncClient : public Client {
std::function<ClientRpcContext*(int, TestService::Stub*,
const SimpleRequest&)> setup_ctx)
: Client(config),
channel_lock_(config.client_channels()),
channel_lock_(new std::mutex[config.client_channels()]),
contexts_(config.client_channels()),
max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
channel_count_(config.client_channels()),
@ -208,6 +208,7 @@ class AsyncClient : public Client {
delete ctx;
}
}
delete[] channel_lock_;
}
bool ThreadFunc(Histogram* histogram,
@ -318,10 +319,14 @@ class AsyncClient : public Client {
private:
class boolean { // exists only to avoid data-race on vector<bool>
public:
boolean(): val_(false) {}
boolean(bool b): val_(b) {}
operator bool() const {return val_;}
boolean& operator=(bool b) {val_=b; return *this;}
boolean() : val_(false) {}
boolean(bool b) : val_(b) {}
operator bool() const { return val_; }
boolean& operator=(bool b) {
val_ = b;
return *this;
}
private:
bool val_;
};
@ -332,7 +337,8 @@ class AsyncClient : public Client {
std::vector<boolean> issue_allowed_; // may this thread attempt to issue
std::vector<grpc_time> next_issue_; // when should it issue?
std::vector<std::mutex> channel_lock_;
std::mutex*
channel_lock_; // a vector, but avoid std::vector for old compilers
std::vector<context_list> contexts_; // per-channel list of idle contexts
int max_outstanding_per_channel_;
int channel_count_;
@ -348,15 +354,17 @@ class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
private:
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
const SimpleRequest& request, CompletionQueue* cq) {
return stub->AsyncUnaryCall(ctx, request, cq);
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, start_req, check_done);
channel_id, stub, req, AsyncUnaryClient::StartReq,
AsyncUnaryClient::CheckDone);
}
};
@ -442,16 +450,19 @@ class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
private:
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
auto check_done = [](grpc::Status s, SimpleResponse* response) {};
auto start_req = [](TestService::Stub* stub, grpc::ClientContext* ctx,
static void CheckDone(grpc::Status s, SimpleResponse* response) {}
static std::unique_ptr<
grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
CompletionQueue* cq, void* tag) {
auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
return stream;
};
static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
const SimpleRequest& req) {
return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
channel_id, stub, req, start_req, check_done);
channel_id, stub, req, AsyncStreamingClient::StartReq,
AsyncStreamingClient::CheckDone);
}
};

@ -45,8 +45,9 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include <gflags/gflags.h>
#include <grpc++/client_context.h>
#include <grpc++/server.h>
@ -79,7 +80,9 @@ class SynchronousClient : public Client {
void WaitToIssue(int thread_idx) {
grpc_time next_time;
if (NextIssueTime(thread_idx, &next_time)) {
std::this_thread::sleep_until(next_time);
gpr_timespec next_timespec;
TimepointHR2Timespec(next_time, &next_timespec);
gpr_sleep_until(next_timespec);
}
}
@ -110,9 +113,10 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient {
class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
public:
SynchronousStreamingClient(const ClientConfig& config)
: SynchronousClient(config),
context_(num_threads_),
stream_(num_threads_) {
: SynchronousClient(config) {
context_ = new grpc::ClientContext[num_threads_];
stream_ = new std::unique_ptr<
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>[num_threads_];
for (size_t thread_idx = 0; thread_idx < num_threads_; thread_idx++) {
auto* stub = channels_[thread_idx % channels_.size()].get_stub();
stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
@ -121,12 +125,15 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
~SynchronousStreamingClient() {
EndThreads();
for (auto stream = stream_.begin(); stream != stream_.end(); stream++) {
for (auto stream = &stream_[0]; stream != &stream_[num_threads_];
stream++) {
if (*stream) {
(*stream)->WritesDone();
EXPECT_TRUE((*stream)->Finish().ok());
}
}
delete[] stream_;
delete[] context_;
}
bool ThreadFunc(Histogram* histogram, size_t thread_idx) GRPC_OVERRIDE {
@ -141,9 +148,11 @@ class SynchronousStreamingClient GRPC_FINAL : public SynchronousClient {
}
private:
std::vector<grpc::ClientContext> context_;
std::vector<std::unique_ptr<
grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>> stream_;
// These are both conceptually std::vector but cannot be for old compilers
// that expect contained classes to support copy constructors
grpc::ClientContext* context_;
std::unique_ptr<grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>>*
stream_;
};
std::unique_ptr<Client> CreateSynchronousUnaryClient(

@ -77,16 +77,34 @@ static deque<string> get_hosts(const string& name) {
}
}
// Namespace for classes and functions used only in RunScenario
// Using this rather than local definitions to workaround gcc-4.4 limitations
// regarding using templates without linkage
namespace runsc {
// ClientContext allocator
static ClientContext* AllocContext(list<ClientContext>* contexts) {
contexts->emplace_back();
return &contexts->back();
}
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
} // namespace runsc
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
int benchmark_seconds, int spawn_local_worker_count) {
// ClientContext allocator (all are destroyed at scope exit)
// ClientContext allocations (all are destroyed at scope exit)
list<ClientContext> contexts;
auto alloc_context = [&contexts]() {
contexts.emplace_back();
return &contexts.back();
};
// To be added to the result, containing the final configuration used for
// client and config (incluiding host, etc.)
@ -131,23 +149,22 @@ std::unique_ptr<ScenarioResult> RunScenario(
workers.resize(num_clients + num_servers);
// Start servers
struct ServerData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
};
vector<ServerData> servers;
using runsc::ServerData;
// servers is array rather than std::vector to avoid gcc-4.4 issues
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
ServerData sd;
sd.stub = std::move(Worker::NewStub(
servers[i].stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
*args.mutable_setup() = server_config;
sd.stream = std::move(sd.stub->RunServer(alloc_context()));
GPR_ASSERT(sd.stream->Write(args));
servers[i].stream =
std::move(servers[i].stub->RunServer(runsc::AllocContext(&contexts)));
GPR_ASSERT(servers[i].stream->Write(args));
ServerStatus init_status;
GPR_ASSERT(sd.stream->Read(&init_status));
GPR_ASSERT(servers[i].stream->Read(&init_status));
char* host;
char* driver_port;
char* cli_target;
@ -157,30 +174,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_free(host);
gpr_free(driver_port);
gpr_free(cli_target);
servers.push_back(std::move(sd));
}
// Start clients
struct ClientData {
unique_ptr<Worker::Stub> stub;
unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
};
vector<ClientData> clients;
using runsc::ClientData;
// clients is array rather than std::vector to avoid gcc-4.4 issues
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
ClientData cd;
cd.stub = std::move(Worker::NewStub(CreateChannel(
clients[i].stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);
*args.mutable_setup() = client_config;
cd.stream = std::move(cd.stub->RunTest(alloc_context()));
GPR_ASSERT(cd.stream->Write(args));
clients[i].stream =
std::move(clients[i].stub->RunTest(runsc::AllocContext(&contexts)));
GPR_ASSERT(clients[i].stream->Write(args));
ClientStatus init_status;
GPR_ASSERT(cd.stream->Read(&init_status));
clients.push_back(std::move(cd));
GPR_ASSERT(clients[i].stream->Read(&init_status));
}
// Let everything warmup
@ -195,23 +207,25 @@ std::unique_ptr<ScenarioResult> RunScenario(
server_mark.mutable_mark();
ClientArgs client_mark;
client_mark.mutable_mark();
for (auto server = servers.begin(); server != servers.end(); server++) {
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
for (auto client = clients.begin(); client != clients.end(); client++) {
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
ServerStatus server_status;
ClientStatus client_status;
for (auto server = servers.begin(); server != servers.end(); server++) {
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
}
for (auto client = clients.begin(); client != clients.end(); client++) {
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
}
// Wait some time
gpr_log(GPR_INFO, "Running");
// Use gpr_sleep_until rather than this_thread::sleep_until to support
// compilers that don't work with this_thread
gpr_sleep_until(gpr_time_add(
start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
@ -220,34 +234,36 @@ std::unique_ptr<ScenarioResult> RunScenario(
result->client_config = result_client_config;
result->server_config = result_server_config;
gpr_log(GPR_INFO, "Finishing");
for (auto server = servers.begin(); server != servers.end(); server++) {
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Write(server_mark));
}
for (auto client = clients.begin(); client != clients.end(); client++) {
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Write(client_mark));
}
for (auto server = servers.begin(); server != servers.end(); server++) {
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->Read(&server_status));
const auto& stats = server_status.stats();
result->server_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
result->server_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
}
for (auto client = clients.begin(); client != clients.end(); client++) {
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->Read(&client_status));
const auto& stats = client_status.stats();
result->latencies.MergeProto(stats.latencies());
result->client_resources.push_back(ResourceUsage{
stats.time_elapsed(), stats.time_user(), stats.time_system()});
result->client_resources.emplace_back(
stats.time_elapsed(), stats.time_user(), stats.time_system());
}
for (auto client = clients.begin(); client != clients.end(); client++) {
for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
GPR_ASSERT(client->stream->WritesDone());
GPR_ASSERT(client->stream->Finish().ok());
}
for (auto server = servers.begin(); server != servers.end(); server++) {
for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
GPR_ASSERT(server->stream->WritesDone());
GPR_ASSERT(server->stream->Finish().ok());
}
delete[] clients;
delete[] servers;
return result;
}
} // namespace testing

@ -41,10 +41,18 @@
namespace grpc {
namespace testing {
struct ResourceUsage {
double wall_time;
double user_time;
double system_time;
class ResourceUsage {
public:
ResourceUsage(double w, double u, double s)
: wall_time_(w), user_time_(u), system_time_(s) {}
double wall_time() const { return wall_time_; }
double user_time() const { return user_time_; }
double system_time() const { return system_time_; }
private:
double wall_time_;
double user_time_;
double system_time_;
};
struct ScenarioResult {

@ -36,7 +36,8 @@
#include <chrono>
#include <cmath>
#include <random>
#include <cstdlib>
#include <vector>
#include <grpc++/config.h>
@ -141,17 +142,16 @@ class ParetoDist GRPC_FINAL : public RandomDist {
// in an efficient re-entrant way. The random table is built at construction
// time, and each call must include the thread id of the invoker
typedef std::default_random_engine qps_random_engine;
class InterarrivalTimer {
public:
InterarrivalTimer() {}
void init(const RandomDist& r, int threads, int entries = 1000000) {
qps_random_engine gen;
std::uniform_real_distribution<double> uniform(0.0, 1.0);
for (int i = 0; i < entries; i++) {
random_table_.push_back(std::chrono::nanoseconds(
static_cast<int64_t>(1e9 * r(uniform(gen)))));
// rand is the only choice that is portable across POSIX and Windows
// and that supports new and old compilers
const double uniform_0_1 = rand() / RAND_MAX;
random_table_.push_back(
std::chrono::nanoseconds(static_cast<int64_t>(1e9 * r(uniform_0_1))));
}
// Now set up the thread positions
for (int i = 0; i < threads; i++) {

@ -33,6 +33,7 @@
#include <memory>
#include <set>
#include <signal.h>
#include <gflags/gflags.h>
#include <grpc/support/log.h>

@ -34,11 +34,16 @@
#include "test/cpp/qps/report.h"
#include <grpc/support/log.h>
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/stats.h"
namespace grpc {
namespace testing {
static double WallTime(ResourceUsage u) { return u.wall_time(); }
static double UserTime(ResourceUsage u) { return u.user_time(); }
static double SystemTime(ResourceUsage u) { return u.system_time(); }
void CompositeReporter::add(std::unique_ptr<Reporter> reporter) {
reporters_.emplace_back(std::move(reporter));
}
@ -68,16 +73,14 @@ void CompositeReporter::ReportTimes(const ScenarioResult& result) {
}
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f",
result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
gpr_log(
GPR_INFO, "QPS: %.1f",
result.latencies.Count() / average(result.client_resources, WallTime));
}
void GprLogReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps = result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; });
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
gpr_log(GPR_INFO, "QPS: %.1f (%.1f/server core)", qps,
qps / result.server_config.threads());
@ -95,40 +98,30 @@ void GprLogReporter::ReportLatency(const ScenarioResult& result) {
void GprLogReporter::ReportTimes(const ScenarioResult& result) {
gpr_log(GPR_INFO, "Server system time: %.2f%%",
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
100.0 * sum(result.server_resources, SystemTime) /
sum(result.server_resources, WallTime));
gpr_log(GPR_INFO, "Server user time: %.2f%%",
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.server_resources,
[](ResourceUsage u) { return u.wall_time; }));
100.0 * sum(result.server_resources, UserTime) /
sum(result.server_resources, WallTime));
gpr_log(GPR_INFO, "Client system time: %.2f%%",
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
100.0 * sum(result.client_resources, SystemTime) /
sum(result.client_resources, WallTime));
gpr_log(GPR_INFO, "Client user time: %.2f%%",
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.client_resources,
[](ResourceUsage u) { return u.wall_time; }));
100.0 * sum(result.client_resources, UserTime) /
sum(result.client_resources, WallTime));
}
void PerfDbReporter::ReportQPS(const ScenarioResult& result) {
auto qps = result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; });
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
perf_db_client_.setQps(qps);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}
void PerfDbReporter::ReportQPSPerCore(const ScenarioResult& result) {
auto qps = result.latencies.Count() /
average(result.client_resources,
[](ResourceUsage u) { return u.wall_time; });
auto qps =
result.latencies.Count() / average(result.client_resources, WallTime);
auto qpsPerCore = qps / result.server_config.threads();
@ -147,25 +140,21 @@ void PerfDbReporter::ReportLatency(const ScenarioResult& result) {
}
void PerfDbReporter::ReportTimes(const ScenarioResult& result) {
double server_system_time =
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; });
double server_user_time =
100.0 * sum(result.server_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.server_resources, [](ResourceUsage u) { return u.wall_time; });
double client_system_time =
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.system_time; }) /
sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; });
double client_user_time =
100.0 * sum(result.client_resources,
[](ResourceUsage u) { return u.user_time; }) /
sum(result.client_resources, [](ResourceUsage u) { return u.wall_time; });
perf_db_client_.setTimes(server_system_time, server_user_time, client_system_time,
client_user_time);
const double server_system_time = 100.0 *
sum(result.server_resources, SystemTime) /
sum(result.server_resources, WallTime);
const double server_user_time = 100.0 *
sum(result.server_resources, UserTime) /
sum(result.server_resources, WallTime);
const double client_system_time = 100.0 *
sum(result.client_resources, SystemTime) /
sum(result.client_resources, WallTime);
const double client_user_time = 100.0 *
sum(result.client_resources, UserTime) /
sum(result.client_resources, WallTime);
perf_db_client_.setTimes(server_system_time, server_user_time,
client_system_time, client_user_time);
perf_db_client_.setConfigs(result.client_config, result.server_config);
}

@ -99,25 +99,7 @@ class AsyncQpsServerTest : public Server {
shutdown_state_.emplace_back(new PerThreadShutdownState());
}
for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[i]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
if (!shutdown_state_[i]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
ctx->Reset();
}
} else {
return;
}
}
return;
}));
threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
}
}
~AsyncQpsServerTest() {
@ -142,6 +124,26 @@ class AsyncQpsServerTest : public Server {
}
private:
void ThreadFunc(int rank) {
// Wait until work is available or we are shutting down
bool ok;
void *got_tag;
while (srv_cqs_[rank]->Next(&got_tag, &ok)) {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
const bool still_going = ctx->RunNextState(ok);
if (!shutdown_state_[rank]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
ctx->Reset();
}
} else {
return;
}
}
return;
}
class ServerRpcContext {
public:
ServerRpcContext() {}

@ -34,8 +34,10 @@ set -ex
cd $(dirname $0)/../..
ROOT=`pwd`
PATH=$ROOT/bins/$CONFIG:$ROOT/bins/$CONFIG/protobuf:$PATH
GRPCIO=$ROOT/src/python/grpcio
GRPCIO_TEST=$ROOT/src/python/grpcio_test
GRPCIO_HEALTH_CHECKING=$ROOT/src/python/grpcio_health_checking
make_virtualenv() {
virtualenv_name="python"$1"_virtual_environment"
@ -54,6 +56,9 @@ make_virtualenv() {
cd $GRPCIO_TEST
pip install -r requirements.txt
pip install $GRPCIO_TEST
# Install grpcio_health_checking
pip install $GRPCIO_HEALTH_CHECKING
else
source $virtualenv_name/bin/activate
# Uninstall and re-install the packages we care about. Don't use
@ -62,12 +67,14 @@ make_virtualenv() {
# dependency upgrades.
(yes | pip uninstall grpcio) || true
(yes | pip uninstall grpcio_test) || true
(yes | pip uninstall grpcio_health_checking) || true
(CFLAGS="-I$ROOT/include -std=c89" LDFLAGS=-L$ROOT/libs/$CONFIG GRPC_PYTHON_BUILD_WITH_CYTHON=1 pip install $GRPCIO) || (
# Fall back to rebuilding the entire environment
rm -rf $virtualenv_name
make_virtualenv $1
)
pip install $GRPCIO_TEST
pip install $GRPCIO_HEALTH_CHECKING
fi
}

Loading…
Cancel
Save