Merge branch 'master' of github.com:grpc/grpc into the-ultimate-showdown

Conflicts:
	src/cpp/client/channel.cc
	src/cpp/client/client_context.cc
pull/2612/head
Nicolas "Pixel" Noble 9 years ago
commit d53b389607
  1. 2
      doc/connectivity-semantics-and-api.md
  2. 4
      include/grpc++/client_context.h
  3. 55
      include/grpc/census.h
  4. 15
      src/core/security/client_auth_filter.c
  5. 33
      src/cpp/client/channel.cc
  6. 11
      src/cpp/client/client_context.cc
  7. 62
      src/csharp/Grpc.Core.Tests/ClientBaseTest.cs
  8. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  9. 25
      src/csharp/Grpc.Core/ClientBase.cs
  10. 8
      src/objective-c/GRPCClient/GRPCCall+OAuth2.m
  11. 56
      src/objective-c/GRPCClient/GRPCCall.h
  12. 59
      src/objective-c/GRPCClient/GRPCCall.m
  13. 8
      src/objective-c/tests/GRPCClientTests.m
  14. 81
      src/ruby/ext/grpc/rb_channel.c
  15. 2
      test/core/end2end/tests/bad_hostname.c

@ -38,7 +38,7 @@ because the server is not yet available), the channel may spend increasingly
large amounts of time in this state. large amounts of time in this state.
IDLE: This is the state where the channel is not even trying to create a IDLE: This is the state where the channel is not even trying to create a
connection because of a lack of new or pending RPCs. New channels MAY be created connection because of a lack of new or pending RPCs. New RPCs MAY be created
in this state. Any attempt to start an RPC on the channel will push the channel in this state. Any attempt to start an RPC on the channel will push the channel
out of this state to connecting. When there has been no RPC activity on a channel out of this state to connecting. When there has been no RPC activity on a channel
for a specified IDLE_TIMEOUT, i.e., no new or pending (active) RPCs for this for a specified IDLE_TIMEOUT, i.e., no new or pending (active) RPCs for this

@ -218,15 +218,11 @@ class ClientContext {
void set_call(grpc_call* call, void set_call(grpc_call* call,
const std::shared_ptr<ChannelInterface>& channel); const std::shared_ptr<ChannelInterface>& channel);
grpc_completion_queue* cq() { return cq_; }
void set_cq(grpc_completion_queue* cq) { cq_ = cq; }
grpc::string authority() { return authority_; } grpc::string authority() { return authority_; }
bool initial_metadata_received_; bool initial_metadata_received_;
std::shared_ptr<ChannelInterface> channel_; std::shared_ptr<ChannelInterface> channel_;
grpc_call* call_; grpc_call* call_;
grpc_completion_queue* cq_;
gpr_timespec deadline_; gpr_timespec deadline_;
grpc::string authority_; grpc::string authority_;
std::shared_ptr<Credentials> creds_; std::shared_ptr<Credentials> creds_;

@ -104,6 +104,61 @@ int census_context_deserialize(const char *buffer, census_context **context);
* future census calls will result in undefined behavior. */ * future census calls will result in undefined behavior. */
void census_context_destroy(census_context *context); void census_context_destroy(census_context *context);
/* Max number of characters in tag key */
#define CENSUS_MAX_TAG_KEY_LENGTH 20
/* Max number of tag value characters */
#define CENSUS_MAX_TAG_VALUE_LENGTH 50
/* A Census tag set is a collection of key:value string pairs; these form the
basis against which Census metrics will be recorded. Keys are unique within
a tag set. All contexts have an associated tag set. */
typedef struct census_tag_set census_tag_set;
/* Returns a pointer to a newly created, empty tag set. If size_hint > 0,
indicates that the tag set is intended to hold approximately that number
of tags. */
census_tag_set *census_tag_set_create(size_t size_hint);
/* Add a new tag key/value to an existing tag set; if the tag key already exists
in the tag set, then its value is overwritten with the new one. Can also be
used to delete a tag, by specifying a NULL value. If key is NULL, returns
the number of tags in the tag set.
Return values:
-1: invalid length key or value
non-negative value: the number of tags in the tag set. */
int census_tag_set_add(census_tag_set *tags, const char *key,
const char *value);
/* Destroys a tag set. This function must be called to prevent memory leaks.
Once called, the tag set cannot be used again. */
void census_tag_set_destroy(census_tag_set *tags);
/* Get a contexts tag set. */
census_tag_set *census_context_tag_set(census_context *context);
/* A read-only representation of a tag for use by census clients. */
typedef struct {
size_t key_len; /* Number of bytes in tag key. */
const char *key; /* A pointer to the tag key. May not be null-terminated. */
size_t value_len; /* Number of bytes in tag value. */
const char *value; /* Pointer to the tag value. May not be null-terminated. */
} census_tag_const;
/* Used to iterate through a tag sets contents. */
typedef struct census_tag_set_iterator census_tag_set_iterator;
/* Open a tag set for iteration. The tag set must not be modified while
iteration is ongoing. Returns an iterator for use in following functions. */
census_tag_set_iterator *census_tag_set_open(census_tag_set *tags);
/* Get the next tag in the tag set, by writing into the 'tag' argument. Returns
1 if there is a "next" tag, 0 if there are no more tags. */
int census_tag_set_next(census_tag_set_iterator *it, census_tag_const *tag);
/* Close an iterator opened by census_tag_set_open(). The iterator will be
invalidated, and should not be used once close is called. */
void census_tag_set_close(census_tag_set_iterator *it);
/* A census statistic to be recorded comprises two parts: an ID for the /* A census statistic to be recorded comprises two parts: an ID for the
* particular statistic and the value to be recorded against it. */ * particular statistic and the value to be recorded against it. */
typedef struct { typedef struct {

@ -75,11 +75,11 @@ typedef struct {
grpc_mdstr *status_key; grpc_mdstr *status_key;
} channel_data; } channel_data;
static void bubble_up_error(grpc_call_element *elem, const char *error_msg) { static void bubble_up_error(grpc_call_element *elem, grpc_status_code status,
const char *error_msg) {
call_data *calld = elem->call_data; call_data *calld = elem->call_data;
gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg); gpr_log(GPR_ERROR, "Client side authentication failure: %s", error_msg);
grpc_transport_stream_op_add_cancellation(&calld->op, grpc_transport_stream_op_add_cancellation(&calld->op, status);
GRPC_STATUS_UNAUTHENTICATED);
grpc_call_next_op(elem, &calld->op); grpc_call_next_op(elem, &calld->op);
} }
@ -94,7 +94,8 @@ static void on_credentials_metadata(void *user_data,
grpc_metadata_batch *mdb; grpc_metadata_batch *mdb;
size_t i; size_t i;
if (status != GRPC_CREDENTIALS_OK) { if (status != GRPC_CREDENTIALS_OK) {
bubble_up_error(elem, "Credentials failed to get metadata."); bubble_up_error(elem, GRPC_STATUS_UNAUTHENTICATED,
"Credentials failed to get metadata.");
return; return;
} }
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
@ -154,7 +155,7 @@ static void send_security_metadata(grpc_call_element *elem,
if (channel_creds_has_md && call_creds_has_md) { if (channel_creds_has_md && call_creds_has_md) {
calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds); calld->creds = grpc_composite_credentials_create(channel_creds, ctx->creds);
if (calld->creds == NULL) { if (calld->creds == NULL) {
bubble_up_error(elem, bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT,
"Incompatible credentials set on channel and call."); "Incompatible credentials set on channel and call.");
return; return;
} }
@ -182,7 +183,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg; char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host)); grpc_mdstr_as_c_string(calld->host));
bubble_up_error(elem, error_msg); bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg); gpr_free(error_msg);
} }
} }
@ -252,7 +253,7 @@ static void auth_start_transport_op(grpc_call_element *elem,
gpr_asprintf(&error_msg, gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.", "Invalid host %s set in :authority metadata.",
call_host); call_host);
bubble_up_error(elem, error_msg); bubble_up_error(elem, GRPC_STATUS_INVALID_ARGUMENT, error_msg);
gpr_free(error_msg); gpr_free(error_msg);
} }
return; /* early exit */ return; /* early exit */

@ -61,20 +61,25 @@ Channel::~Channel() { grpc_channel_destroy(c_channel_); }
Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
CompletionQueue* cq) { CompletionQueue* cq) {
const char* host_str = host_.empty() ? NULL : host_.c_str(); const bool kRegistered = method.channel_tag() && context->authority().empty();
auto c_call = method.channel_tag() && context->authority().empty() grpc_call* c_call = NULL;
? grpc_channel_create_registered_call( if (kRegistered) {
c_channel_, context->propagate_from_call_, c_call = grpc_channel_create_registered_call(
context->propagation_options_.c_bitmask(), cq->cq(), c_channel_, context->propagate_from_call_,
method.channel_tag(), context->raw_deadline(), context->propagation_options_.c_bitmask(), cq->cq(),
nullptr) method.channel_tag(), context->raw_deadline(), nullptr);
: grpc_channel_create_call( } else {
c_channel_, context->propagate_from_call_, const char* host_str = NULL;
context->propagation_options_.c_bitmask(), cq->cq(), if (!context->authority().empty()) {
method.name(), context->authority().empty() host_str = context->authority().c_str();
? host_str } else if (!host_.empty()) {
: context->authority().c_str(), host_str = host_.c_str();
context->raw_deadline(), nullptr); }
c_call = grpc_channel_create_call(c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(),
cq->cq(), method.name(), host_str,
context->raw_deadline(), nullptr);
}
grpc_census_call_set_context(c_call, context->census_context()); grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call); GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
context->set_call(c_call, shared_from_this()); context->set_call(c_call, shared_from_this());

@ -48,7 +48,6 @@ namespace grpc {
ClientContext::ClientContext() ClientContext::ClientContext()
: initial_metadata_received_(false), : initial_metadata_received_(false),
call_(nullptr), call_(nullptr),
cq_(nullptr),
deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)), deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
propagate_from_call_(nullptr) {} propagate_from_call_(nullptr) {}
@ -56,16 +55,6 @@ ClientContext::~ClientContext() {
if (call_) { if (call_) {
grpc_call_destroy(call_); grpc_call_destroy(call_);
} }
if (cq_) {
// Drain cq_.
grpc_completion_queue_shutdown(cq_);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_event event;
do {
event = grpc_completion_queue_next(cq_, deadline, nullptr);
} while (event.type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cq_);
}
} }
std::unique_ptr<ClientContext> ClientContext::FromServerContext( std::unique_ptr<ClientContext> ClientContext::FromServerContext(

@ -0,0 +1,62 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class ClientBaseTest
{
[Test]
public void GetAuthUriBase_Valid()
{
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("some.googleapi.com"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("dns:///some.googleapi.com/"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("dns:///some.googleapi.com:443/"));
Assert.AreEqual("https://some.googleapi.com/", ClientBase.GetAuthUriBase("some.googleapi.com:443/"));
}
[Test]
public void GetAuthUriBase_Invalid()
{
Assert.IsNull(ClientBase.GetAuthUriBase("some.googleapi.com:"));
Assert.IsNull(ClientBase.GetAuthUriBase("https://some.googleapi.com/"));
Assert.IsNull(ClientBase.GetAuthUriBase("dns://some.googleapi.com:443")); // just two slashes
Assert.IsNull(ClientBase.GetAuthUriBase(""));
}
}
}

@ -63,6 +63,7 @@
<Compile Include="..\Grpc.Core\Version.cs"> <Compile Include="..\Grpc.Core\Version.cs">
<Link>Version.cs</Link> <Link>Version.cs</Link>
</Compile> </Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" /> <Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" /> <Compile Include="ServerTest.cs" />

@ -33,9 +33,10 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text.RegularExpressions;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using System.Text.RegularExpressions; using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
@ -46,15 +47,16 @@ namespace Grpc.Core
/// </summary> /// </summary>
public abstract class ClientBase public abstract class ClientBase
{ {
static readonly Regex TrailingPortPattern = new Regex(":[0-9]+/?$"); // Regex for removal of the optional DNS scheme, trailing port, and trailing backslash
static readonly Regex ChannelTargetPattern = new Regex(@"^(dns:\/{3})?([^:\/]+)(:\d+)?\/?$");
readonly Channel channel; readonly Channel channel;
readonly string authUriBase; readonly string authUriBase;
public ClientBase(Channel channel) public ClientBase(Channel channel)
{ {
this.channel = channel; this.channel = channel;
// TODO(jtattermush): we shouldn't need to hand-curate the channel.Target contents. this.authUriBase = GetAuthUriBase(channel.Target);
this.authUriBase = "https://" + TrailingPortPattern.Replace(channel.Target, "") + "/";
} }
/// <summary> /// <summary>
@ -104,10 +106,23 @@ namespace Grpc.Core
{ {
options = options.WithHeaders(new Metadata()); options = options.WithHeaders(new Metadata());
} }
var authUri = authUriBase + method.ServiceName; var authUri = authUriBase != null ? authUriBase + method.ServiceName : null;
interceptor(authUri, options.Headers); interceptor(authUri, options.Headers);
} }
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options); return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
} }
/// <summary>
/// Creates Auth URI base from channel's target (the one passed at channel creation).
/// Fully-qualified service name is to be appended to this.
/// </summary>
internal static string GetAuthUriBase(string target)
{
var match = ChannelTargetPattern.Match(target);
if (!match.Success) {
return null;
}
return "https://" + match.Groups[2].Value + "/";
}
} }
} }

@ -40,7 +40,7 @@ static NSString * const kChallengeHeader = @"www-authenticate";
@implementation GRPCCall (OAuth2) @implementation GRPCCall (OAuth2)
- (NSString *)oauth2AccessToken { - (NSString *)oauth2AccessToken {
NSString *headerValue = self.requestMetadata[kAuthorizationHeader]; NSString *headerValue = self.requestHeaders[kAuthorizationHeader];
if ([headerValue hasPrefix:kBearerPrefix]) { if ([headerValue hasPrefix:kBearerPrefix]) {
return [headerValue substringFromIndex:kBearerPrefix.length]; return [headerValue substringFromIndex:kBearerPrefix.length];
} else { } else {
@ -50,14 +50,14 @@ static NSString * const kChallengeHeader = @"www-authenticate";
- (void)setOauth2AccessToken:(NSString *)token { - (void)setOauth2AccessToken:(NSString *)token {
if (token) { if (token) {
self.requestMetadata[kAuthorizationHeader] = [kBearerPrefix stringByAppendingString:token]; self.requestHeaders[kAuthorizationHeader] = [kBearerPrefix stringByAppendingString:token];
} else { } else {
[self.requestMetadata removeObjectForKey:kAuthorizationHeader]; [self.requestHeaders removeObjectForKey:kAuthorizationHeader];
} }
} }
- (NSString *)oauth2ChallengeHeader { - (NSString *)oauth2ChallengeHeader {
return self.responseMetadata[kChallengeHeader]; return self.responseHeaders[kChallengeHeader];
} }
@end @end

@ -48,8 +48,10 @@
#import <Foundation/Foundation.h> #import <Foundation/Foundation.h>
#import <RxLibrary/GRXWriter.h> #import <RxLibrary/GRXWriter.h>
// Key used in |NSError|'s |userInfo| dictionary to store the response metadata sent by the server. // Keys used in |NSError|'s |userInfo| dictionary to store the response headers and trailers sent by
extern id const kGRPCStatusMetadataKey; // the server.
extern id const kGRPCHeadersKey;
extern id const kGRPCTrailersKey;
// Represents a single gRPC remote call. // Represents a single gRPC remote call.
@interface GRPCCall : GRXWriter @interface GRPCCall : GRXWriter
@ -57,43 +59,49 @@ extern id const kGRPCStatusMetadataKey;
// These HTTP headers will be passed to the server as part of this call. Each HTTP header is a // These HTTP headers will be passed to the server as part of this call. Each HTTP header is a
// name-value pair with string names and either string or binary values. // name-value pair with string names and either string or binary values.
// //
// The passed dictionary has to use NSString keys, corresponding to the header names. The // The passed dictionary has to use NSString keys, corresponding to the header names. The value
// value associated to each can be a NSString object or a NSData object. E.g.: // associated to each can be a NSString object or a NSData object. E.g.:
// //
// call.requestMetadata = @{@"Authorization": @"Bearer ..."}; // call.requestHeaders = @{@"authorization": @"Bearer ..."};
// //
// call.requestMetadata[@"SomeBinaryHeader"] = someData; // call.requestHeaders[@"my-header-bin"] = someData;
// //
// After the call is started, modifying this won't have any effect. // After the call is started, trying to modify this property is an error.
// //
// For convenience, the property is initialized to an empty NSMutableDictionary, and the setter // For convenience, the property is initialized to an empty NSMutableDictionary, and the setter
// accepts (and copies) both mutable and immutable dictionaries. // accepts (and copies) both mutable and immutable dictionaries.
- (NSMutableDictionary *)requestMetadata; // nonatomic - (NSMutableDictionary *)requestHeaders; // nonatomic
- (void)setRequestMetadata:(NSDictionary *)requestMetadata; // nonatomic, copy - (void)setRequestHeaders:(NSDictionary *)requestHeaders; // nonatomic, copy
// This dictionary is populated with the HTTP headers received from the server. When the RPC ends, // This dictionary is populated with the HTTP headers received from the server. This happens before
// the HTTP trailers received are added to the dictionary too. It has the same structure as the // any response message is received from the server. It has the same structure as the request
// request metadata dictionary. // headers dictionary: Keys are NSString header names; names ending with the suffix "-bin" have a
// NSData value; the others have a NSString value.
// //
// The first time this object calls |writeValue| on the writeable passed to |startWithWriteable|, // The value of this property is nil until all response headers are received, and will change before
// the |responseMetadata| dictionary already contains the response headers. When it calls // any of -writeValue: or -writesFinishedWithError: are sent to the writeable.
// |writesFinishedWithError|, the dictionary contains both the response headers and trailers. @property(atomic, readonly) NSDictionary *responseHeaders;
@property(atomic, readonly) NSDictionary *responseMetadata;
// Same as responseHeaders, but populated with the HTTP trailers received from the server before the
// call finishes.
//
// The value of this property is nil until all response trailers are received, and will change
// before -writesFinishedWithError: is sent to the writeable.
@property(atomic, readonly) NSDictionary *responseTrailers;
// The request writer has to write NSData objects into the provided Writeable. The server will // The request writer has to write NSData objects into the provided Writeable. The server will
// receive each of those separately and in order. // receive each of those separately and in order as distinct messages.
// A gRPC call might not complete until the request writer finishes. On the other hand, the // A gRPC call might not complete until the request writer finishes. On the other hand, the request
// request finishing doesn't necessarily make the call to finish, as the server might continue // finishing doesn't necessarily make the call to finish, as the server might continue sending
// sending messages to the response side of the call indefinitely (depending on the semantics of // messages to the response side of the call indefinitely (depending on the semantics of the
// the specific remote method called). // specific remote method called).
// To finish a call right away, invoke cancel. // To finish a call right away, invoke cancel.
- (instancetype)initWithHost:(NSString *)host - (instancetype)initWithHost:(NSString *)host
path:(NSString *)path path:(NSString *)path
requestsWriter:(GRXWriter *)requestsWriter NS_DESIGNATED_INITIALIZER; requestsWriter:(GRXWriter *)requestsWriter NS_DESIGNATED_INITIALIZER;
// Finishes the request side of this call, notifies the server that the RPC // Finishes the request side of this call, notifies the server that the RPC should be cancelled, and
// should be cancelled, and finishes the response side of the call with an error // finishes the response side of the call with an error of code CANCELED.
// of code CANCELED.
- (void)cancel; - (void)cancel;
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter? // TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?

@ -42,9 +42,13 @@
#import "private/NSDictionary+GRPC.h" #import "private/NSDictionary+GRPC.h"
#import "private/NSError+GRPC.h" #import "private/NSError+GRPC.h"
NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; NSString * const kGRPCHeadersKey = @"io.grpc.HeadersKey";
NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
@interface GRPCCall () <GRXWriteable> @interface GRPCCall () <GRXWriteable>
// Make them read-write.
@property(atomic, strong) NSDictionary *responseHeaders;
@property(atomic, strong) NSDictionary *responseTrailers;
@end @end
// The following methods of a C gRPC call object aren't reentrant, and thus // The following methods of a C gRPC call object aren't reentrant, and thus
@ -89,8 +93,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// the response arrives. // the response arrives.
GRPCCall *_retainSelf; GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata; NSMutableDictionary *_requestHeaders;
NSMutableDictionary *_responseMetadata;
} }
@synthesize state = _state; @synthesize state = _state;
@ -121,24 +124,19 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_requestWriter = requestWriter; _requestWriter = requestWriter;
_requestMetadata = [NSMutableDictionary dictionary]; _requestHeaders = [NSMutableDictionary dictionary];
_responseMetadata = [NSMutableDictionary dictionary];
} }
return self; return self;
} }
#pragma mark Metadata #pragma mark Metadata
- (NSMutableDictionary *)requestMetadata { - (NSMutableDictionary *)requestHeaders {
return _requestMetadata; return _requestHeaders;
} }
- (void)setRequestMetadata:(NSDictionary *)requestMetadata { - (void)setRequestHeaders:(NSDictionary *)requestHeaders {
_requestMetadata = [NSMutableDictionary dictionaryWithDictionary:requestMetadata]; _requestHeaders = [NSMutableDictionary dictionaryWithDictionary:requestHeaders];
}
- (NSDictionary *)responseMetadata {
return _responseMetadata;
} }
#pragma mark Finish #pragma mark Finish
@ -232,11 +230,10 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
#pragma mark Send headers #pragma mark Send headers
// TODO(jcanizales): Rename to commitHeaders. - (void)sendHeaders:(NSDictionary *)headers {
- (void)sendHeaders:(NSDictionary *)metadata {
// TODO(jcanizales): Add error handlers for async failures // TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc]
initWithMetadata:metadata ?: @{} handler:nil]]]; initWithMetadata:headers ?: @{} handler:nil]]];
} }
#pragma mark GRXWriteable implementation #pragma mark GRXWriteable implementation
@ -305,35 +302,45 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Both handlers will eventually be called, from the network queue. Writes can start immediately // Both handlers will eventually be called, from the network queue. Writes can start immediately
// after this. // after this.
// The first one (metadataHandler), when the response headers are received. // The first one (headersHandler), when the response headers are received.
// The second one (completionHandler), whenever the RPC finishes for any reason. // The second one (completionHandler), whenever the RPC finishes for any reason.
- (void)invokeCallWithMetadataHandler:(void(^)(NSDictionary *))metadataHandler - (void)invokeCallWithHeadersHandler:(void(^)(NSDictionary *))headersHandler
completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler { completionHandler:(void(^)(NSError *, NSDictionary *))completionHandler {
// TODO(jcanizales): Add error handlers for async failures // TODO(jcanizales): Add error handlers for async failures
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc] [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvMetadata alloc]
initWithHandler:metadataHandler]]]; initWithHandler:headersHandler]]];
[_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc] [_wrappedCall startBatchWithOperations:@[[[GRPCOpRecvStatus alloc]
initWithHandler:completionHandler]]]; initWithHandler:completionHandler]]];
} }
- (void)invokeCall { - (void)invokeCall {
__weak GRPCCall *weakSelf = self; __weak GRPCCall *weakSelf = self;
[self invokeCallWithMetadataHandler:^(NSDictionary *headers) { [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
// Response headers received. // Response headers received.
GRPCCall *strongSelf = weakSelf; GRPCCall *strongSelf = weakSelf;
if (strongSelf) { if (strongSelf) {
[strongSelf->_responseMetadata addEntriesFromDictionary:headers]; strongSelf.responseHeaders = headers;
[strongSelf startNextRead]; [strongSelf startNextRead];
} }
} completionHandler:^(NSError *error, NSDictionary *trailers) { } completionHandler:^(NSError *error, NSDictionary *trailers) {
GRPCCall *strongSelf = weakSelf; GRPCCall *strongSelf = weakSelf;
if (strongSelf) { if (strongSelf) {
[strongSelf->_responseMetadata addEntriesFromDictionary:trailers]; strongSelf.responseTrailers = trailers;
if (error) { if (error) {
NSMutableDictionary *userInfo = NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
[NSMutableDictionary dictionaryWithDictionary:error.userInfo]; if (error.userInfo) {
userInfo[kGRPCStatusMetadataKey] = strongSelf->_responseMetadata; [userInfo addEntriesFromDictionary:error.userInfo];
}
userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
// TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be
// called before this one, so an error might end up with trailers but no headers. We
// shouldn't call finishWithError until ater both blocks are called. It is also when this is
// done that we can provide a merged view of response headers and trailers in a thread-safe
// way.
if (strongSelf.responseHeaders) {
userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
}
error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
} }
[strongSelf finishWithError:error]; [strongSelf finishWithError:error];
@ -356,7 +363,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
_retainSelf = self; _retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata]; [self sendHeaders:_requestHeaders];
[self invokeCall]; [self invokeCall];
} }

@ -168,11 +168,13 @@ static ProtoMethod *kUnaryCallMethod;
} completionHandler:^(NSError *errorOrNil) { } completionHandler:^(NSError *errorOrNil) {
XCTAssertNotNil(errorOrNil, @"Finished without error!"); XCTAssertNotNil(errorOrNil, @"Finished without error!");
XCTAssertEqual(errorOrNil.code, 16, @"Finished with unexpected error: %@", errorOrNil); XCTAssertEqual(errorOrNil.code, 16, @"Finished with unexpected error: %@", errorOrNil);
XCTAssertEqualObjects(call.responseMetadata, errorOrNil.userInfo[kGRPCStatusMetadataKey], XCTAssertEqualObjects(call.responseHeaders, errorOrNil.userInfo[kGRPCHeadersKey],
@"Metadata in the NSError object and call object differ."); @"Headers in the NSError object and call object differ.");
XCTAssertEqualObjects(call.responseTrailers, errorOrNil.userInfo[kGRPCTrailersKey],
@"Trailers in the NSError object and call object differ.");
NSString *challengeHeader = call.oauth2ChallengeHeader; NSString *challengeHeader = call.oauth2ChallengeHeader;
XCTAssertGreaterThan(challengeHeader.length, 0, XCTAssertGreaterThan(challengeHeader.length, 0,
@"No challenge in response headers %@", call.responseMetadata); @"No challenge in response headers %@", call.responseHeaders);
[expectation fulfill]; [expectation fulfill];
}]; }];

@ -165,6 +165,65 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
return self; return self;
} }
/*
call-seq:
insecure_channel = Channel:new("myhost:8080", {'arg1': 'value1'})
creds = ...
secure_channel = Channel:new("myhost:443", {'arg1': 'value1'}, creds)
Creates channel instances. */
static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv,
VALUE self) {
VALUE try_to_connect = Qfalse;
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
/* "01" == 0 mandatory args, 1 (try_to_connect) is optional */
rb_scan_args(argc, argv, "01", try_to_connect);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
return NUM2LONG(
grpc_channel_check_connectivity_state(ch, (int)try_to_connect));
}
/* Watch for a change in connectivity state.
Once the channel connectivity state is different from the last observed
state, tag will be enqueued on cq with success=1
If deadline expires BEFORE the state is changed, tag will be enqueued on
the completion queue with success=0 */
static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
VALUE last_state,
VALUE cqueue,
VALUE deadline,
VALUE tag) {
grpc_rb_channel *wrapper = NULL;
grpc_channel *ch = NULL;
grpc_completion_queue *cq = NULL;
cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
rb_raise(rb_eRuntimeError, "closed!");
return Qnil;
}
grpc_channel_watch_connectivity_state(
ch,
NUM2LONG(last_state),
grpc_rb_time_timeval(deadline, /* absolute time */ 0),
cq,
ROBJECT(tag));
return Qnil;
}
/* Clones Channel instances. /* Clones Channel instances.
Gives Channel a consistent implementation of Ruby's object copy/dup Gives Channel a consistent implementation of Ruby's object copy/dup
@ -295,6 +354,22 @@ static void Init_grpc_propagate_masks() {
UINT2NUM(GRPC_PROPAGATE_DEFAULTS)); UINT2NUM(GRPC_PROPAGATE_DEFAULTS));
} }
static void Init_grpc_connectivity_states() {
/* Constants representing call propagation masks in grpc.h */
VALUE grpc_rb_mConnectivityStates = rb_define_module_under(
grpc_rb_mGrpcCore, "ConnectivityStates");
rb_define_const(grpc_rb_mConnectivityStates, "IDLE",
LONG2NUM(GRPC_CHANNEL_IDLE));
rb_define_const(grpc_rb_mConnectivityStates, "CONNECTING",
LONG2NUM(GRPC_CHANNEL_CONNECTING));
rb_define_const(grpc_rb_mConnectivityStates, "READY",
LONG2NUM(GRPC_CHANNEL_READY));
rb_define_const(grpc_rb_mConnectivityStates, "TRANSIENT_FAILURE",
LONG2NUM(GRPC_CHANNEL_TRANSIENT_FAILURE));
rb_define_const(grpc_rb_mConnectivityStates, "FATAL_FAILURE",
LONG2NUM(GRPC_CHANNEL_FATAL_FAILURE));
}
void Init_grpc_channel() { void Init_grpc_channel() {
grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject); grpc_rb_cChannelArgs = rb_define_class("TmpChannelArgs", rb_cObject);
grpc_rb_cChannel = grpc_rb_cChannel =
@ -309,6 +384,11 @@ void Init_grpc_channel() {
grpc_rb_channel_init_copy, 1); grpc_rb_channel_init_copy, 1);
/* Add ruby analogues of the Channel methods. */ /* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state",
grpc_rb_channel_get_connectivity_state,
-1);
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call", rb_define_method(grpc_rb_cChannel, "create_call",
grpc_rb_channel_create_call, 6); grpc_rb_channel_create_call, 6);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0); rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
@ -327,6 +407,7 @@ void Init_grpc_channel() {
rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH", rb_define_const(grpc_rb_cChannel, "MAX_MESSAGE_LENGTH",
ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH))); ID2SYM(rb_intern(GRPC_ARG_MAX_MESSAGE_LENGTH)));
Init_grpc_propagate_masks(); Init_grpc_propagate_masks();
Init_grpc_connectivity_states();
} }
/* Gets the wrapped channel from the ruby wrapper */ /* Gets the wrapped channel from the ruby wrapper */

@ -153,7 +153,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_expect_completion(cqv, tag(1), 1); cq_expect_completion(cqv, tag(1), 1);
cq_verify(cqv); cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_UNAUTHENTICATED); GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT);
gpr_free(details); gpr_free(details);
grpc_metadata_array_destroy(&initial_metadata_recv); grpc_metadata_array_destroy(&initial_metadata_recv);

Loading…
Cancel
Save