Merge branch 'master' of github.com:grpc/grpc into credentials_naming_and_cleanup

pull/3075/head
Julien Boeuf 10 years ago
commit 6c7185d350
  1. 6
      include/grpc++/create_channel.h
  2. 2
      include/grpc++/credentials.h
  3. 7
      src/compiler/csharp_generator.cc
  4. 3
      src/compiler/csharp_generator.h
  5. 2
      src/core/census/grpc_filter.c
  6. 44
      src/core/transport/chttp2/stream_lists.c
  7. 2
      src/core/transport/metadata.c
  8. 6
      src/cpp/client/create_channel.cc
  9. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  10. 176
      src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs
  11. 11
      src/csharp/Grpc.Core.Tests/MetadataTest.cs
  12. 80
      src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
  13. 17
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  14. 50
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  15. 5
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  16. 2
      src/csharp/Grpc.Core/Marshaller.cs
  17. 7
      src/csharp/Grpc.Core/Metadata.cs
  18. 2
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  19. 2
      src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
  20. 2
      src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs
  21. 5
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  22. 23
      src/node/ext/call.cc
  23. 15
      src/node/index.js
  24. 8
      src/node/interop/interop_client.js
  25. 61
      src/node/src/client.js
  26. 181
      src/node/src/metadata.js
  27. 65
      src/node/src/server.js
  28. 193
      src/node/test/metadata_test.js
  29. 62
      src/node/test/surface_test.js
  30. 4
      src/python/grpcio/grpc/_adapter/_intermediary_low.py
  31. 2
      src/python/grpcio/grpc/_adapter/fore.py
  32. 22
      src/python/grpcio/grpc/_links/service.py
  33. 6
      src/python/grpcio/grpc/framework/core/_end.py
  34. 8
      src/python/grpcio_test/grpc_interop/client.py
  35. 16
      src/python/grpcio_test/grpc_interop/methods.py
  36. 22
      src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py
  37. 13
      src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py
  38. 6
      src/python/grpcio_test/grpc_test/_links/_transmission_test.py
  39. 5
      src/python/grpcio_test/grpc_test/test_common.py
  40. 2
      test/core/iomgr/udp_server_test.c
  41. 2
      test/core/util/test_config.h
  42. 8
      test/cpp/end2end/async_end2end_test.cc
  43. 2
      test/cpp/end2end/client_crash_test.cc
  44. 9
      test/cpp/end2end/end2end_test.cc
  45. 4
      test/cpp/end2end/generic_end2end_test.cc
  46. 4
      test/cpp/end2end/mock_test.cc
  47. 4
      test/cpp/end2end/server_crash_test_client.cc
  48. 2
      test/cpp/end2end/shutdown_test.cc
  49. 4
      test/cpp/end2end/thread_stress_test.cc
  50. 2
      test/cpp/end2end/zookeeper_test.cc
  51. 8
      test/cpp/qps/driver.cc
  52. 4
      test/cpp/qps/report.h
  53. 3
      test/cpp/util/cli_call_test.cc
  54. 4
      test/cpp/util/create_test_channel.cc
  55. 2
      test/cpp/util/grpc_cli.cc
  56. 5
      tools/codegen/core/gen_legal_metadata_characters.c

@ -44,6 +44,12 @@ namespace grpc {
// If creds does not hold an object or is invalid, a lame channel is returned.
std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds);
// For advanced use and testing ONLY. Override default channel arguments only
// if necessary.
// If creds does not hold an object or is invalid, a lame channel is returned.
std::shared_ptr<Channel> CreateCustomChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args);

@ -57,7 +57,7 @@ class Credentials : public GrpcLibrary {
virtual SecureCredentials* AsSecureCredentials() = 0;
private:
friend std::shared_ptr<Channel> CreateChannel(
friend std::shared_ptr<Channel> CreateCustomChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args);

@ -33,6 +33,7 @@
#include <cctype>
#include <map>
#include <sstream>
#include <vector>
#include "src/compiler/csharp_generator.h"
@ -44,7 +45,6 @@
using google::protobuf::compiler::csharp::GetFileNamespace;
using google::protobuf::compiler::csharp::GetClassName;
using google::protobuf::compiler::csharp::GetUmbrellaClassName;
using google::protobuf::SimpleItoa;
using grpc::protobuf::FileDescriptor;
using grpc::protobuf::Descriptor;
using grpc::protobuf::ServiceDescriptor;
@ -228,11 +228,14 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) {
}
void GenerateServiceDescriptorProperty(Printer* out, const ServiceDescriptor *service) {
std::ostringstream index;
index << service->index();
out->Print("// service descriptor\n");
out->Print("public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor\n");
out->Print("{\n");
out->Print(" get { return $umbrella$.Descriptor.Services[$index$]; }\n",
"umbrella", GetUmbrellaClassName(service->file()), "index", SimpleItoa(service->index()));
"umbrella", GetUmbrellaClassName(service->file()), "index",
index.str());
out->Print("}\n");
out->Print("\n");
}

@ -36,10 +36,7 @@
#include "src/compiler/config.h"
using namespace std;
#include <google/protobuf/compiler/csharp/csharp_names.h>
#include <google/protobuf/stubs/strutil.h>
namespace grpc_csharp_generator {

@ -36,12 +36,12 @@
#include <stdio.h>
#include <string.h>
#include "include/grpc/census.h"
#include "src/core/census/rpc_stat_id.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/channel/noop_filter.h"
#include "src/core/statistics/census_interface.h"
#include "src/core/statistics/census_rpc_stats.h"
#include <grpc/census.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice.h>

@ -177,8 +177,10 @@ int grpc_chttp2_list_pop_writable_stream(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WRITABLE);
*stream_global = &stream->global;
*stream_writing = &stream->writing;
if (r != 0) {
*stream_global = &stream->global;
*stream_writing = &stream->writing;
}
return r;
}
@ -210,7 +212,9 @@ int grpc_chttp2_list_pop_writing_stream(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITING);
*stream_writing = &stream->writing;
if (r != 0) {
*stream_writing = &stream->writing;
}
return r;
}
@ -230,8 +234,10 @@ int grpc_chttp2_list_pop_written_stream(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_WRITING(transport_writing), &stream,
GRPC_CHTTP2_LIST_WRITTEN);
*stream_global = &stream->global;
*stream_writing = &stream->writing;
if (r != 0) {
*stream_global = &stream->global;
*stream_writing = &stream->writing;
}
return r;
}
@ -251,8 +257,10 @@ int grpc_chttp2_list_pop_parsing_seen_stream(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_PARSING(transport_parsing), &stream,
GRPC_CHTTP2_LIST_PARSING_SEEN);
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
if (r != 0) {
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
}
return r;
}
@ -270,7 +278,9 @@ int grpc_chttp2_list_pop_waiting_for_concurrency(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
*stream_global = &stream->global;
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
@ -288,7 +298,9 @@ int grpc_chttp2_list_pop_closed_waiting_for_parsing(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CLOSED_WAITING_FOR_PARSING);
*stream_global = &stream->global;
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
@ -306,7 +318,9 @@ int grpc_chttp2_list_pop_cancelled_waiting_for_writing(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_CANCELLED_WAITING_FOR_WRITING);
*stream_global = &stream->global;
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}
@ -326,8 +340,10 @@ int grpc_chttp2_list_pop_incoming_window_updated(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_INCOMING_WINDOW_UPDATED);
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
if (r != 0) {
*stream_global = &stream->global;
*stream_parsing = &stream->parsing;
}
return r;
}
@ -353,7 +369,9 @@ int grpc_chttp2_list_pop_read_write_state_changed(
grpc_chttp2_stream *stream;
int r = stream_list_pop(TRANSPORT_FROM_GLOBAL(transport_global), &stream,
GRPC_CHTTP2_LIST_READ_WRITE_STATE_CHANGED);
*stream_global = &stream->global;
if (r != 0) {
*stream_global = &stream->global;
}
return r;
}

@ -703,7 +703,7 @@ int grpc_mdstr_is_legal_header(grpc_mdstr *s) {
int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s) {
static const gpr_uint8 legal_header_bits[256 / 8] = {
0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0x00, 0x00, 0x00, 0x00, 0xff, 0xef, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
return conforms_to(s, legal_header_bits);

@ -44,6 +44,11 @@ namespace grpc {
class ChannelArguments;
std::shared_ptr<Channel> CreateChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds) {
return CreateCustomChannel(target, creds, ChannelArguments());
}
std::shared_ptr<Channel> CreateCustomChannel(
const grpc::string& target, const std::shared_ptr<Credentials>& creds,
const ChannelArguments& args) {
ChannelArguments cp_args = args;
@ -57,4 +62,5 @@ std::shared_ptr<Channel> CreateChannel(
NULL, GRPC_STATUS_INVALID_ARGUMENT,
"Invalid credentials."));
}
} // namespace grpc

@ -64,6 +64,7 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
<Compile Include="MarshallingErrorsTest.cs" />
<Compile Include="ShutdownTest.cs" />
<Compile Include="Internal\AsyncCallTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />

@ -0,0 +1,176 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class MarshallingErrorsTest
{
const string Host = "127.0.0.1";
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
var marshaller = new Marshaller<string>(
(str) =>
{
if (str == "UNSERIALIZABLE_VALUE")
{
// Google.Protobuf throws exception inherited from IOException
throw new IOException("Error serializing the message.");
}
return System.Text.Encoding.UTF8.GetBytes(str);
},
(payload) =>
{
var s = System.Text.Encoding.UTF8.GetString(payload);
if (s == "UNPARSEABLE_VALUE")
{
// Google.Protobuf throws exception inherited from IOException
throw new IOException("Error parsing the message.");
}
return s;
});
helper = new MockServiceHelper(Host, marshaller);
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
[Test]
public void ResponseParsingError_UnaryResponse()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
return Task.FromResult("UNPARSEABLE_VALUE");
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "REQUEST"));
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
}
[Test]
public void ResponseParsingError_StreamingResponse()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
await responseStream.WriteAsync("UNPARSEABLE_VALUE");
await Task.Delay(10000);
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "REQUEST");
var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
}
[Test]
public void RequestParsingError_UnaryRequest()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
{
return Task.FromResult("RESPONSE");
});
var ex = Assert.Throws<RpcException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNPARSEABLE_VALUE"));
// Spec doesn't define the behavior. With the current implementation server handler throws exception which results in StatusCode.Unknown.
Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
}
[Test]
public async Task RequestParsingError_StreamingRequest()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
Assert.Throws<IOException>(async () => await requestStream.MoveNext());
return "RESPONSE";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAsync("UNPARSEABLE_VALUE");
Assert.AreEqual("RESPONSE", await call);
}
[Test]
public void RequestSerializationError_BlockingUnary()
{
Assert.Throws<IOException>(() => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
}
[Test]
public void RequestSerializationError_AsyncUnary()
{
Assert.Throws<IOException>(async () => await Calls.AsyncUnaryCall(helper.CreateUnaryCall(), "UNSERIALIZABLE_VALUE"));
}
[Test]
public async Task RequestSerializationError_ClientStreaming()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
CollectionAssert.AreEqual(new [] {"A", "B"}, await requestStream.ToListAsync());
return "RESPONSE";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.WriteAsync("A");
Assert.Throws<IOException>(async () => await call.RequestStream.WriteAsync("UNSERIALIZABLE_VALUE"));
await call.RequestStream.WriteAsync("B");
await call.RequestStream.CompleteAsync();
Assert.AreEqual("RESPONSE", await call);
}
}
}

@ -74,6 +74,17 @@ namespace Grpc.Core.Tests
Assert.AreEqual("[Entry: key=abc-bin, valueBytes=System.Byte[]]", entry.ToString());
}
[Test]
public void AsciiEntry_KeyValidity()
{
new Metadata.Entry("ABC", "XYZ");
new Metadata.Entry("0123456789abc", "XYZ");
new Metadata.Entry("-abc", "XYZ");
new Metadata.Entry("a_bc_", "XYZ");
Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc[", "xyz"));
Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc/", "xyz"));
}
[Test]
public void Entry_ConstructionPreconditions()
{

@ -50,37 +50,14 @@ namespace Grpc.Core.Tests
{
public const string ServiceName = "tests.Test";
public static readonly Method<string, string> UnaryMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"Unary",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> ClientStreamingMethod = new Method<string, string>(
MethodType.ClientStreaming,
ServiceName,
"ClientStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> ServerStreamingMethod = new Method<string, string>(
MethodType.ServerStreaming,
ServiceName,
"ServerStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
public static readonly Method<string, string> DuplexStreamingMethod = new Method<string, string>(
MethodType.DuplexStreaming,
ServiceName,
"DuplexStreaming",
Marshallers.StringMarshaller,
Marshallers.StringMarshaller);
readonly string host;
readonly ServerServiceDefinition serviceDefinition;
readonly Method<string, string> unaryMethod;
readonly Method<string, string> clientStreamingMethod;
readonly Method<string, string> serverStreamingMethod;
readonly Method<string, string> duplexStreamingMethod;
UnaryServerMethod<string, string> unaryHandler;
ClientStreamingServerMethod<string, string> clientStreamingHandler;
ServerStreamingServerMethod<string, string> serverStreamingHandler;
@ -89,15 +66,44 @@ namespace Grpc.Core.Tests
Server server;
Channel channel;
public MockServiceHelper(string host = null)
public MockServiceHelper(string host = null, Marshaller<string> marshaller = null)
{
this.host = host ?? "localhost";
marshaller = marshaller ?? Marshallers.StringMarshaller;
unaryMethod = new Method<string, string>(
MethodType.Unary,
ServiceName,
"Unary",
marshaller,
marshaller);
clientStreamingMethod = new Method<string, string>(
MethodType.ClientStreaming,
ServiceName,
"ClientStreaming",
marshaller,
marshaller);
serverStreamingMethod = new Method<string, string>(
MethodType.ServerStreaming,
ServiceName,
"ServerStreaming",
marshaller,
marshaller);
duplexStreamingMethod = new Method<string, string>(
MethodType.DuplexStreaming,
ServiceName,
"DuplexStreaming",
marshaller,
marshaller);
serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(UnaryMethod, (request, context) => unaryHandler(request, context))
.AddMethod(ClientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
.AddMethod(ServerStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
.AddMethod(DuplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
.AddMethod(unaryMethod, (request, context) => unaryHandler(request, context))
.AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
.AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
.AddMethod(duplexStreamingMethod, (requestStream, responseStream, context) => duplexStreamingHandler(requestStream, responseStream, context))
.Build();
var defaultStatus = new Status(StatusCode.Unknown, "Default mock implementation. Please provide your own.");
@ -155,22 +161,22 @@ namespace Grpc.Core.Tests
public CallInvocationDetails<string, string> CreateUnaryCall(CallOptions options = default(CallOptions))
{
return new CallInvocationDetails<string, string>(channel, UnaryMethod, options);
return new CallInvocationDetails<string, string>(channel, unaryMethod, options);
}
public CallInvocationDetails<string, string> CreateClientStreamingCall(CallOptions options = default(CallOptions))
{
return new CallInvocationDetails<string, string>(channel, ClientStreamingMethod, options);
return new CallInvocationDetails<string, string>(channel, clientStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateServerStreamingCall(CallOptions options = default(CallOptions))
{
return new CallInvocationDetails<string, string>(channel, ServerStreamingMethod, options);
return new CallInvocationDetails<string, string>(channel, serverStreamingMethod, options);
}
public CallInvocationDetails<string, string> CreateDuplexStreamingCall(CallOptions options = default(CallOptions))
{
return new CallInvocationDetails<string, string>(channel, DuplexStreamingMethod, options);
return new CallInvocationDetails<string, string>(channel, duplexStreamingMethod, options);
}
public string Host

@ -322,6 +322,11 @@ namespace Grpc.Core.Internal
details.Channel.RemoveCallReference(this);
}
protected override bool IsClient
{
get { return true; }
}
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = CreateNativeCall(cq);
@ -376,9 +381,17 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
TResponse msg = default(TResponse);
var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
lock (myLock)
{
finished = true;
if (deserializeException != null && receivedStatus.Status.StatusCode == StatusCode.OK)
{
receivedStatus = new ClientSideStatus(DeserializeResponseFailureStatus, receivedStatus.Trailers);
}
finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
@ -394,10 +407,6 @@ namespace Grpc.Core.Internal
return;
}
// TODO: handle deserialization error
TResponse msg;
TryDeserialize(receivedMessage, out msg);
unaryResponseTcs.SetResult(msg);
}

@ -33,10 +33,12 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@ -50,6 +52,7 @@ namespace Grpc.Core.Internal
internal abstract class AsyncCallBase<TWrite, TRead>
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCallBase<TWrite, TRead>>();
protected static readonly Status DeserializeResponseFailureStatus = new Status(StatusCode.Internal, "Failed to deserialize response message.");
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
@ -100,11 +103,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Requests cancelling the call with given status.
/// </summary>
public void CancelWithStatus(Status status)
protected void CancelWithStatus(Status status)
{
lock (myLock)
{
Preconditions.CheckState(started);
cancelRequested = true;
if (!disposed)
@ -177,6 +179,11 @@ namespace Grpc.Core.Internal
return false;
}
protected abstract bool IsClient
{
get;
}
private void ReleaseResources()
{
if (call != null)
@ -224,33 +231,31 @@ namespace Grpc.Core.Internal
return serializer(msg);
}
protected bool TrySerialize(TWrite msg, out byte[] payload)
protected Exception TrySerialize(TWrite msg, out byte[] payload)
{
try
{
payload = serializer(msg);
return true;
return null;
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while trying to serialize message");
payload = null;
return false;
return e;
}
}
protected bool TryDeserialize(byte[] payload, out TRead msg)
protected Exception TryDeserialize(byte[] payload, out TRead msg)
{
try
{
msg = deserializer(payload);
return true;
return null;
}
catch (Exception e)
{
Logger.Error(e, "Exception occured while trying to deserialize message.");
msg = default(TRead);
return false;
return e;
}
}
@ -319,6 +324,9 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
lock (myLock)
{
@ -331,23 +339,23 @@ namespace Grpc.Core.Internal
readingDone = true;
}
if (deserializeException != null && IsClient)
{
readingDone = true;
CancelWithStatus(DeserializeResponseFailureStatus);
}
ReleaseResourcesIfPossible();
}
// TODO: handle the case when error occured...
// TODO: handle the case when success==false
if (receivedMessage != null)
{
// TODO: handle deserialization error
TRead msg;
TryDeserialize(receivedMessage, out msg);
FireCompletion(origCompletionDelegate, msg, null);
}
else
if (deserializeException != null && !IsClient)
{
FireCompletion(origCompletionDelegate, default(TRead), null);
FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
return;
}
FireCompletion(origCompletionDelegate, msg, null);
}
}
}

@ -169,6 +169,11 @@ namespace Grpc.Core.Internal
}
}
protected override bool IsClient
{
get { return false; }
}
protected override void CheckReadingAllowed()
{
base.CheckReadingAllowed();

@ -39,7 +39,7 @@ namespace Grpc.Core
/// <summary>
/// Encapsulates the logic for serializing and deserializing messages.
/// </summary>
public struct Marshaller<T>
public class Marshaller<T>
{
readonly Func<T, byte[]> serializer;
readonly Func<byte[], T> deserializer;

@ -36,6 +36,7 @@ using System.Collections.Specialized;
using System.Globalization;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.RegularExpressions;
using Grpc.Core.Utils;
@ -189,6 +190,7 @@ namespace Grpc.Core
public struct Entry
{
private static readonly Encoding Encoding = Encoding.ASCII;
private static readonly Regex ValidKeyRegex = new Regex("^[a-z0-9_-]+$");
readonly string key;
readonly string value;
@ -321,7 +323,10 @@ namespace Grpc.Core
private static string NormalizeKey(string key)
{
return Preconditions.CheckNotNull(key, "key").ToLower(CultureInfo.InvariantCulture);
var normalized = Preconditions.CheckNotNull(key, "key").ToLower(CultureInfo.InvariantCulture);
Preconditions.CheckArgument(ValidKeyRegex.IsMatch(normalized),
"Metadata entry key not valid. Keys can only contain lowercase alphanumeric characters, underscores and hyphens.");
return normalized;
}
}
}

@ -162,7 +162,7 @@ namespace Math.Tests
{
using (var call = client.Sum())
{
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num{ Num_ = n });
var numbers = new List<long> { 10, 20, 30 }.ConvertAll(n => new Num { Num_ = n });
await call.RequestStream.WriteAllAsync(numbers);
var result = await call.ResponseAsync;

@ -88,7 +88,7 @@ namespace Grpc.HealthCheck.Tests
[Test]
public void ServiceDoesntExist()
{
Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest{ Host = "", Service = "nonexistent.service" }));
Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest { Host = "", Service = "nonexistent.service" }));
}
// TODO(jtattermusch): add test with timeout once timeouts are supported

@ -101,7 +101,7 @@ namespace Grpc.HealthCheck.Tests
private static HealthCheckResponse.Types.ServingStatus GetStatusHelper(HealthServiceImpl impl, string host, string service)
{
return impl.Check(new HealthCheckRequest{ Host = host, Service = service}, null).Result.Status;
return impl.Check(new HealthCheckRequest { Host = host, Service = service }, null).Result.Status;
}
}
}

@ -37,13 +37,12 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Google.Apis.Auth.OAuth2;
using Google.Protobuf;
using Grpc.Auth;
using Grpc.Core;
using Grpc.Core.Utils;
using Grpc.Testing;
using Google.Protobuf;
using Google.Apis.Auth.OAuth2;
using NUnit.Framework;
namespace Grpc.IntegrationTesting

@ -111,17 +111,19 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
NanAssignPersistent(*handle, value);
resources->handles.push_back(unique_ptr<PersistentHolder>(
new PersistentHolder(handle)));
continue;
} else {
return false;
}
}
if (value->IsString()) {
Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
current->value = **utf8_value;
current->value_length = string_value->Length();
} else {
return false;
if (value->IsString()) {
Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
current->value = **utf8_value;
current->value_length = string_value->Length();
} else {
return false;
}
}
array->count += 1;
}
@ -156,8 +158,7 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) {
}
if (EndsWith(elem->key, "-bin")) {
array->Set(index_map[elem->key],
MakeFastBuffer(
NanNewBufferHandle(elem->value, elem->value_length)));
NanNewBufferHandle(elem->value, elem->value_length));
} else {
array->Set(index_map[elem->key], NanNew(elem->value));
}

@ -41,6 +41,8 @@ var client = require('./src/client.js');
var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
var grpc = require('bindings')('grpc');
/**
@ -107,18 +109,12 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
* @param {function(Error, Object)} callback
*/
return function updateMetadata(authURI, metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
credential.getRequestMetadata(authURI, function(err, header) {
if (err) {
callback(err);
return;
}
metadata.Authorization.push(header.Authorization);
metadata.add('authorization', header.Authorization);
callback(null, metadata);
});
};
@ -129,6 +125,11 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) {
*/
exports.Server = server.Server;
/**
* @see module:src/metadata
*/
exports.Metadata = Metadata;
/**
* Status name to code number mapping
*/

@ -321,13 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) {
credential.getAccessToken(function(err, token) {
assert.ifError(err);
var updateMetadata = function(authURI, metadata, callback) {
metadata = _.clone(metadata);
if (metadata.Authorization) {
metadata.Authorization = _.clone(metadata.Authorization);
} else {
metadata.Authorization = [];
}
metadata.Authorization.push('Bearer ' + token);
metadata.Add('authorization', 'Bearer ' + token);
callback(null, metadata);
};
var makeTestCall = function(error, client_metadata) {

@ -42,7 +42,9 @@ var _ = require('lodash');
var grpc = require('bindings')('grpc.node');
var common = require('./common.js');
var common = require('./common');
var Metadata = require('./metadata');
var EventEmitter = require('events').EventEmitter;
@ -254,8 +256,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
* serialize
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Metadata=} metadata Metadata to add to the call
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
*/
@ -264,7 +265,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var emitter = new EventEmitter();
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
emitter.cancel = function cancel() {
call.cancel();
@ -283,13 +286,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
if (options) {
message.grpcWriteFlags = options.flags;
}
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -304,7 +310,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
}
emitter.emit('metadata', response.metadata);
emitter.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
callback(null, deserialize(response.read));
});
});
@ -328,7 +335,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
* @this {Client} Client object. Must have a channel member.
* @param {function(?Error, value=)} callback The callback to for when the
* response is received
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Object=} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -337,7 +344,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientWritableStream(call, serialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -347,7 +356,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
return;
}
var metadata_batch = {};
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
@ -355,12 +365,15 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var client_batch = {};
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -398,7 +411,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {*} argument The argument to the call. Should be serializable with
* serialize
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Object} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -407,7 +420,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientReadableStream(call, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -421,7 +436,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
if (options) {
message.grpcWriteFlags = options.flags;
}
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@ -431,11 +447,14 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@ -470,7 +489,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/**
* Make a bidirectional stream request with this method on the given channel.
* @this {SurfaceClient} Client object. Must have a channel member.
* @param {array=} metadata Array of metadata key/value pairs to add to the
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
* call
* @param {Options} options Options map
* @return {EventEmitter} An event emitter for stream related events
@ -479,7 +498,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
/* jshint validthis: true */
var call = getCall(this.channel, method, options);
if (metadata === null || metadata === undefined) {
metadata = {};
metadata = new Metadata();
} else {
metadata = metadata.clone();
}
var stream = new ClientDuplexStream(call, serialize, deserialize);
this.updateMetadata(this.auth_uri, metadata, function(error, metadata) {
@ -489,7 +510,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
metadata._getCoreRepresentation();
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
@ -497,11 +519,14 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
// in the other batch.
return;
}
stream.emit('metadata', response.metadata);
stream.emit('metadata', Metadata._fromCoreRepresentation(
response.metadata));
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);

@ -0,0 +1,181 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/**
* Metadata module
* @module
*/
'use strict';
var _ = require('lodash');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
* @constructor
*/
function Metadata() {
this._internal_repr = {};
}
function normalizeKey(key) {
if (!(/^[A-Za-z\d_-]+$/.test(key))) {
throw new Error('Metadata keys must be nonempty strings containing only ' +
'alphanumeric characters and hyphens');
}
return key.toLowerCase();
}
function validate(key, value) {
if (_.endsWith(key, '-bin')) {
if (!(value instanceof Buffer)) {
throw new Error('keys that end with \'-bin\' must have Buffer values');
}
} else {
if (!_.isString(value)) {
throw new Error(
'keys that don\'t end with \'-bin\' must have String values');
}
if (!(/^[\x20-\x7E]*$/.test(value))) {
throw new Error('Metadata string values can only contain printable ' +
'ASCII characters and space');
}
}
}
/**
* Sets the given value for the given key, replacing any other values associated
* with that key. Normalizes the key.
* @param {String} key The key to set
* @param {String|Buffer} value The value to set. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.set = function(key, value) {
key = normalizeKey(key);
validate(key, value);
this._internal_repr[key] = [value];
};
/**
* Adds the given value for the given key. Normalizes the key.
* @param {String} key The key to add to.
* @param {String|Buffer} value The value to add. Must be a buffer if and only
* if the normalized key ends with '-bin'
*/
Metadata.prototype.add = function(key, value) {
key = normalizeKey(key);
validate(key, value);
if (!this._internal_repr[key]) {
this._internal_repr[key] = [];
}
this._internal_repr[key].push(value);
};
/**
* Remove the given key and any associated values. Normalizes the key.
* @param {String} key The key to remove
*/
Metadata.prototype.remove = function(key) {
key = normalizeKey(key);
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
delete this._internal_repr[key];
}
};
/**
* Gets a list of all values associated with the key. Normalizes the key.
* @param {String} key The key to get
* @return {Array.<String|Buffer>} The values associated with that key
*/
Metadata.prototype.get = function(key) {
key = normalizeKey(key);
if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) {
return this._internal_repr[key];
} else {
return [];
}
};
/**
* Get a map of each key to a single associated value. This reflects the most
* common way that people will want to see metadata.
* @return {Object.<String,String|Buffer>} A key/value mapping of the metadata
*/
Metadata.prototype.getMap = function() {
var result = {};
_.forOwn(this._internal_repr, function(values, key) {
if(values.length > 0) {
result[key] = values[0];
}
});
return result;
};
/**
* Clone the metadata object.
* @return {Metadata} The new cloned object
*/
Metadata.prototype.clone = function() {
var copy = new Metadata();
_.forOwn(this._internal_repr, function(value, key) {
copy._internal_repr[key] = _.clone(value);
});
return copy;
};
/**
* Gets the metadata in the format used by interal code. Intended for internal
* use only. API stability is not guaranteed.
* @private
* @return {Object.<String, Array.<String|Buffer>>} The metadata
*/
Metadata.prototype._getCoreRepresentation = function() {
return this._internal_repr;
};
/**
* Creates a Metadata object from a metadata map in the internal format.
* Intended for internal use only. API stability is not guaranteed.
* @private
* @param {Object.<String, Array.<String|Buffer>>} The metadata
* @return {Metadata} The new Metadata object
*/
Metadata._fromCoreRepresentation = function(metadata) {
var newMetadata = new Metadata();
if (metadata) {
newMetadata._internal_repr = _.cloneDeep(metadata);
}
return newMetadata;
};
module.exports = Metadata;

@ -44,6 +44,8 @@ var grpc = require('bindings')('grpc.node');
var common = require('./common');
var Metadata = require('./metadata');
var stream = require('stream');
var Readable = stream.Readable;
@ -60,10 +62,10 @@ var EventEmitter = require('events').EventEmitter;
* @param {Object} error The error object
*/
function handleError(call, error) {
var statusMetadata = new Metadata();
var status = {
code: grpc.status.UNKNOWN,
details: 'Unknown Error',
metadata: {}
details: 'Unknown Error'
};
if (error.hasOwnProperty('message')) {
status.details = error.message;
@ -75,11 +77,13 @@ function handleError(call, error) {
}
}
if (error.hasOwnProperty('metadata')) {
status.metadata = error.metadata;
statusMetadata = error.metadata;
}
status.metadata = statusMetadata._getCoreRepresentation();
var error_batch = {};
if (!call.metadataSent) {
error_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
error_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
}
error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(error_batch, function(){});
@ -114,22 +118,24 @@ function waitForCancel(call, emitter) {
* @param {*} value The value to respond with
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
* @param {Metadata=} metadata Optional trailing metadata to send with status
* @param {number=} flags Flags for modifying how the message is sent.
* Defaults to 0.
*/
function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var statusMetadata = new Metadata();
var status = {
code: grpc.status.OK,
details: 'OK',
metadata: {}
details: 'OK'
};
if (metadata) {
status.metadata = metadata;
statusMetadata = metadata;
}
status.metadata = statusMetadata._getCoreRepresentation();
if (!call.metadataSent) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
end_batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
call.metadataSent = true;
}
var message = serialize(value);
@ -151,14 +157,19 @@ function setUpWritable(stream, serialize) {
stream.status = {
code : grpc.status.OK,
details : 'OK',
metadata : {}
metadata : new Metadata()
};
stream.serialize = common.wrapIgnoreNull(serialize);
function sendStatus() {
var batch = {};
if (!stream.call.metadataSent) {
stream.call.metadataSent = true;
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
}
if (stream.status.metadata) {
stream.status.metadata = stream.status.metadata._getCoreRepresentation();
}
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
stream.call.startBatch(batch, function(){});
@ -173,7 +184,7 @@ function setUpWritable(stream, serialize) {
function setStatus(err) {
var code = grpc.status.UNKNOWN;
var details = 'Unknown Error';
var metadata = {};
var metadata = new Metadata();
if (err.hasOwnProperty('message')) {
details = err.message;
}
@ -203,7 +214,7 @@ function setUpWritable(stream, serialize) {
/**
* Override of Writable#end method that allows for sending metadata with a
* success status.
* @param {Object=} metadata Metadata to send with the status
* @param {Metadata=} metadata Metadata to send with the status
*/
stream.end = function(metadata) {
if (metadata) {
@ -266,7 +277,8 @@ function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
if (!this.call.metadataSent) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
this.call.metadataSent = true;
}
var message = this.serialize(chunk);
@ -289,15 +301,15 @@ ServerWritableStream.prototype._write = _write;
/**
* Send the initial metadata for a writable stream.
* @param {Object<String, Array<(String|Buffer)>>} responseMetadata Metadata
* to send
* @param {Metadata} responseMetadata Metadata to send
*/
function sendMetadata(responseMetadata) {
/* jshint validthis: true */
if (!this.call.metadataSent) {
this.call.metadataSent = true;
var batch = [];
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
this.call.startBatch(batch, function(err) {
if (err) {
this.emit('error', err);
@ -422,7 +434,7 @@ ServerDuplexStream.prototype.getPeer = getPeer;
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleUnary(call, handler, metadata) {
var emitter = new EventEmitter();
@ -430,7 +442,8 @@ function handleUnary(call, handler, metadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {});
}
};
@ -478,7 +491,7 @@ function handleUnary(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
@ -507,7 +520,7 @@ function handleServerStreaming(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
@ -515,7 +528,8 @@ function handleClientStreaming(call, handler, metadata) {
if (!call.metadataSent) {
call.metadataSent = true;
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata;
batch[grpc.opType.SEND_INITIAL_METADATA] =
responseMetadata._getCoreRepresentation();
call.startBatch(batch, function() {});
}
};
@ -542,7 +556,7 @@ function handleClientStreaming(call, handler, metadata) {
* @access private
* @param {grpc.Call} call The call to handle
* @param {Object} handler Request handler object for the method that was called
* @param {Object} metadata Metadata from the client
* @param {Metadata} metadata Metadata from the client
*/
function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
@ -599,7 +613,7 @@ function Server(options) {
var details = event.new_call;
var call = details.call;
var method = details.method;
var metadata = details.metadata;
var metadata = Metadata._fromCoreRepresentation(details.metadata);
if (method === null) {
return;
}
@ -609,7 +623,8 @@ function Server(options) {
handler = handlers[method];
} else {
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
batch[grpc.opType.SEND_INITIAL_METADATA] =
(new Metadata())._getCoreRepresentation();
batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
code: grpc.status.UNIMPLEMENTED,
details: 'This method is not available on this server.',

@ -0,0 +1,193 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var Metadata = require('../src/metadata.js');
var assert = require('assert');
describe('Metadata', function() {
var metadata;
beforeEach(function() {
metadata = new Metadata();
});
describe('#set', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.set('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.set('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.set('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.set('key-bin', new Buffer('value'));
});
});
it('Rejects invalid keys', function() {
assert.throws(function() {
metadata.set('key$', 'value');
});
assert.throws(function() {
metadata.set('', 'value');
});
});
it('Rejects values with non-ASCII characters', function() {
assert.throws(function() {
metadata.set('key', 'résumé');
});
});
it('Saves values that can be retrieved', function() {
metadata.set('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Overwrites previous values', function() {
metadata.set('key', 'value1');
metadata.set('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
it('Normalizes keys', function() {
metadata.set('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.set('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value2']);
});
});
describe('#add', function() {
it('Only accepts string values for non "-bin" keys', function() {
assert.throws(function() {
metadata.add('key', new Buffer('value'));
});
assert.doesNotThrow(function() {
metadata.add('key', 'value');
});
});
it('Only accepts Buffer values for "-bin" keys', function() {
assert.throws(function() {
metadata.add('key-bin', 'value');
});
assert.doesNotThrow(function() {
metadata.add('key-bin', new Buffer('value'));
});
});
it('Rejects invalid keys', function() {
assert.throws(function() {
metadata.add('key$', 'value');
});
assert.throws(function() {
metadata.add('', 'value');
});
});
it('Saves values that can be retrieved', function() {
metadata.add('key', 'value');
assert.deepEqual(metadata.get('key'), ['value']);
});
it('Combines with previous values', function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('Normalizes keys', function() {
metadata.add('Key', 'value1');
assert.deepEqual(metadata.get('key'), ['value1']);
metadata.add('KEY', 'value2');
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
});
describe('#remove', function() {
it('clears values from a key', function() {
metadata.add('key', 'value');
metadata.remove('key');
assert.deepEqual(metadata.get('key'), []);
});
it('Normalizes keys', function() {
metadata.add('key', 'value');
metadata.remove('KEY');
assert.deepEqual(metadata.get('key'), []);
});
});
describe('#get', function() {
beforeEach(function() {
metadata.add('key', 'value1');
metadata.add('key', 'value2');
metadata.add('key-bin', new Buffer('value'));
});
it('gets all values associated with a key', function() {
assert.deepEqual(metadata.get('key'), ['value1', 'value2']);
});
it('Normalizes keys', function() {
assert.deepEqual(metadata.get('KEY'), ['value1', 'value2']);
});
it('returns an empty list for non-existent keys', function() {
assert.deepEqual(metadata.get('non-existent-key'), []);
});
it('returns Buffers for "-bin" keys', function() {
assert(metadata.get('key-bin')[0] instanceof Buffer);
});
});
describe('#getMap', function() {
it('gets a map of keys to values', function() {
metadata.add('key1', 'value1');
metadata.add('Key2', 'value2');
metadata.add('KEY3', 'value3');
assert.deepEqual(metadata.getMap(),
{key1: 'value1',
key2: 'value2',
key3: 'value3'});
});
});
describe('#clone', function() {
it('retains values from the original', function() {
metadata.add('key', 'value');
var copy = metadata.clone();
assert.deepEqual(copy.get('key'), ['value']);
});
it('Does not see newly added values', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
metadata.add('key', 'value2');
assert.deepEqual(copy.get('key'), ['value1']);
});
it('Does not add new values to the original', function() {
metadata.add('key', 'value1');
var copy = metadata.clone();
copy.add('key', 'value2');
assert.deepEqual(metadata.get('key'), ['value1']);
});
});
});

@ -262,6 +262,7 @@ describe('Generic client and server', function() {
describe('Echo metadata', function() {
var client;
var server;
var metadata;
before(function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
var test_service = test_proto.lookup('TestService');
@ -294,6 +295,8 @@ describe('Echo metadata', function() {
var Client = surface_client.makeProtobufClientConstructor(test_service);
client = new Client('localhost:' + port, grpc.Credentials.createInsecure());
server.start();
metadata = new grpc.Metadata();
metadata.set('key', 'value');
});
after(function() {
server.forceShutdown();
@ -301,35 +304,35 @@ describe('Echo metadata', function() {
it('with unary call', function(done) {
var call = client.unary({}, function(err, data) {
assert.ifError(err);
}, {key: ['value']});
}, metadata);
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
});
it('with client stream call', function(done) {
var call = client.clientStream(function(err, data) {
assert.ifError(err);
}, {key: ['value']});
}, metadata);
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
call.end();
});
it('with server stream call', function(done) {
var call = client.serverStream({}, {key: ['value']});
var call = client.serverStream({}, metadata);
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
});
it('with bidi stream call', function(done) {
var call = client.bidiStream({key: ['value']});
var call = client.bidiStream(metadata);
call.on('data', function() {});
call.on('metadata', function(metadata) {
assert.deepEqual(metadata.key, ['value']);
assert.deepEqual(metadata.get('key'), ['value']);
done();
});
call.end();
@ -337,9 +340,10 @@ describe('Echo metadata', function() {
it('shows the correct user-agent string', function(done) {
var version = require('../package.json').version;
var call = client.unary({}, function(err, data) { assert.ifError(err); },
{key: ['value']});
metadata);
call.on('metadata', function(metadata) {
assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version));
assert(_.startsWith(metadata.get('user-agent')[0],
'grpc-node/' + version));
done();
});
});
@ -354,13 +358,15 @@ describe('Other conditions', function() {
var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto');
test_service = test_proto.lookup('TestService');
server = new grpc.Server();
var trailer_metadata = new grpc.Metadata();
trailer_metadata.add('trailer-present', 'yes');
server.addProtoService(test_service, {
unary: function(call, cb) {
var req = call.request;
if (req.error) {
cb(new Error('Requested error'), null, {trailer_present: ['yes']});
cb(new Error('Requested error'), null, trailer_metadata);
} else {
cb(null, {count: 1}, {trailer_present: ['yes']});
cb(null, {count: 1}, trailer_metadata);
}
},
clientStream: function(stream, cb){
@ -369,14 +375,14 @@ describe('Other conditions', function() {
stream.on('data', function(data) {
if (data.error) {
errored = true;
cb(new Error('Requested error'), null, {trailer_present: ['yes']});
cb(new Error('Requested error'), null, trailer_metadata);
} else {
count += 1;
}
});
stream.on('end', function() {
if (!errored) {
cb(null, {count: count}, {trailer_present: ['yes']});
cb(null, {count: count}, trailer_metadata);
}
});
},
@ -384,13 +390,13 @@ describe('Other conditions', function() {
var req = stream.request;
if (req.error) {
var err = new Error('Requested error');
err.metadata = {trailer_present: ['yes']};
err.metadata = trailer_metadata;
stream.emit('error', err);
} else {
for (var i = 0; i < 5; i++) {
stream.write({count: i});
}
stream.end({trailer_present: ['yes']});
stream.end(trailer_metadata);
}
},
bidiStream: function(stream) {
@ -398,10 +404,8 @@ describe('Other conditions', function() {
stream.on('data', function(data) {
if (data.error) {
var err = new Error('Requested error');
err.metadata = {
trailer_present: ['yes'],
count: ['' + count]
};
err.metadata = trailer_metadata.clone();
err.metadata.add('count', '' + count);
stream.emit('error', err);
} else {
stream.write({count: count});
@ -409,7 +413,7 @@ describe('Other conditions', function() {
}
});
stream.on('end', function() {
stream.end({trailer_present: ['yes']});
stream.end(trailer_metadata);
});
}
});
@ -510,7 +514,7 @@ describe('Other conditions', function() {
assert.ifError(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -519,7 +523,7 @@ describe('Other conditions', function() {
assert(err);
});
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -531,7 +535,7 @@ describe('Other conditions', function() {
call.write({error: false});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -543,7 +547,7 @@ describe('Other conditions', function() {
call.write({error: true});
call.end();
call.on('status', function(status) {
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -552,7 +556,7 @@ describe('Other conditions', function() {
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -560,7 +564,7 @@ describe('Other conditions', function() {
var call = client.serverStream({error: true});
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']);
assert.deepEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -572,7 +576,7 @@ describe('Other conditions', function() {
call.on('data', function(){});
call.on('status', function(status) {
assert.strictEqual(status.code, grpc.status.OK);
assert.deepEqual(status.metadata.trailer_present, ['yes']);
assert.deepEqual(status.metadata.get('trailer-present'), ['yes']);
done();
});
});
@ -583,7 +587,7 @@ describe('Other conditions', function() {
call.end();
call.on('data', function(){});
call.on('error', function(error) {
assert.deepEqual(error.metadata.trailer_present, ['yes']);
assert.deepEqual(error.metadata.get('trailer-present'), ['yes']);
done();
});
});

@ -255,6 +255,6 @@ class ClientCredentials(object):
class ServerCredentials(object):
"""Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials."""
def __init__(self, root_credentials, pair_sequence):
def __init__(self, root_credentials, pair_sequence, force_client_auth):
self._internal = _low.ServerCredentials.ssl(
root_credentials, list(pair_sequence), False)
root_credentials, list(pair_sequence), force_client_auth)

@ -288,7 +288,7 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
self._port = self._server.add_http2_addr(address)
else:
server_credentials = _low.ServerCredentials(
self._root_certificates, self._key_chain_pairs)
self._root_certificates, self._key_chain_pairs, False)
self._server = _low.Server(self._completion_queue)
self._port = self._server.add_secure_http2_addr(
address, server_credentials)

@ -316,9 +316,8 @@ class _Kernel(object):
call.status(status, call)
self._rpc_states.pop(call, None)
def add_port(self, port, server_credentials):
def add_port(self, address, server_credentials):
with self._lock:
address = '[::]:%d' % port
if self._server is None:
self._completion_queue = _intermediary_low.CompletionQueue()
self._server = _intermediary_low.Server(self._completion_queue)
@ -362,17 +361,20 @@ class ServiceLink(links.Link):
"""
@abc.abstractmethod
def add_port(self, port, server_credentials):
def add_port(self, address, server_credentials):
"""Adds a port on which to service RPCs after this link has been started.
Args:
port: The port on which to service RPCs, or zero to request that a port be
automatically selected and used.
server_credentials: A ServerCredentials object, or None for insecure
service.
address: The address on which to service RPCs with a port number of zero
requesting that a port number be automatically selected and used.
server_credentials: An _intermediary_low.ServerCredentials object, or
None for insecure service.
Returns:
A port on which RPCs will be serviced after this link has been started.
A integer port on which RPCs will be serviced after this link has been
started. This is typically the same number as the port number contained
in the passed address, but will likely be different if the port number
contained in the passed address was zero.
"""
raise NotImplementedError()
@ -417,8 +419,8 @@ class _ServiceLink(ServiceLink):
def join_link(self, link):
self._relay.set_behavior(link.accept_ticket)
def add_port(self, port, server_credentials):
return self._kernel.add_port(port, server_credentials)
def add_port(self, address, server_credentials):
return self._kernel.add_port(address, server_credentials)
def start(self):
self._relay.start()

@ -30,7 +30,6 @@
"""Implementation of base.End."""
import abc
import enum
import threading
import uuid
@ -75,7 +74,7 @@ def _abort(operations):
def _cancel_futures(futures):
for future in futures:
futures.cancel()
future.cancel()
def _future_shutdown(lock, cycle, event):
@ -83,8 +82,6 @@ def _future_shutdown(lock, cycle, event):
with lock:
_abort(cycle.operations.values())
_cancel_futures(cycle.futures)
pool = cycle.pool
cycle.pool.shutdown(wait=True)
return in_future
@ -113,6 +110,7 @@ def _termination_action(lock, stats, operation_id, cycle):
cycle.idle_actions = []
if cycle.grace:
_cancel_futures(cycle.futures)
cycle.pool.shutdown(wait=False)
return termination_action

@ -70,7 +70,13 @@ def _oauth_access_token(args):
def _stub(args):
if args.oauth_scope:
metadata_transformer = lambda x: [('Authorization', 'Bearer %s' % _oauth_access_token(args))]
if args.test_case == 'oauth2_auth_token':
access_token = _oauth_access_token(args)
metadata_transformer = lambda x: [
('Authorization', 'Bearer %s' % access_token)]
else:
metadata_transformer = lambda x: [
('Authorization', 'Bearer %s' % _oauth_access_token(args))]
else:
metadata_transformer = lambda x: []
if args.use_tls:

@ -346,6 +346,19 @@ def _compute_engine_creds(stub, args):
response.username))
def _oauth2_auth_token(stub, args):
json_key_filename = os.environ[
oauth2client_client.GOOGLE_APPLICATION_CREDENTIALS]
wanted_email = json.load(open(json_key_filename, 'rb'))['client_email']
response = _large_unary_common_behavior(stub, True, True)
if wanted_email != response.username:
raise ValueError(
'expected username %s, got %s' % (wanted_email, response.username))
if args.oauth_scope.find(response.oauth_scope) == -1:
raise ValueError(
'expected to find oauth scope "%s" in received "%s"' %
(response.oauth_scope, args.oauth_scope))
@enum.unique
class TestCase(enum.Enum):
EMPTY_UNARY = 'empty_unary'
@ -356,6 +369,7 @@ class TestCase(enum.Enum):
CANCEL_AFTER_BEGIN = 'cancel_after_begin'
CANCEL_AFTER_FIRST_RESPONSE = 'cancel_after_first_response'
COMPUTE_ENGINE_CREDS = 'compute_engine_creds'
OAUTH2_AUTH_TOKEN = 'oauth2_auth_token'
TIMEOUT_ON_SLEEPING_SERVER = 'timeout_on_sleeping_server'
def test_interoperability(self, stub, args):
@ -377,5 +391,7 @@ class TestCase(enum.Enum):
_timeout_on_sleeping_server(stub)
elif self is TestCase.COMPUTE_ENGINE_CREDS:
_compute_engine_creds(stub, args)
elif self is TestCase.OAUTH2_AUTH_TOKEN:
_oauth2_auth_token(stub, args)
else:
raise NotImplementedError('Test case "%s" not implemented!' % self.name)

@ -45,11 +45,7 @@ from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.base import test_cases
from grpc_test.framework.interfaces.base import test_interfaces
_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
_CODE = _intermediary_low.Code.OK
_MESSAGE = b'test message'
class _SerializationBehaviors(
@ -95,7 +91,7 @@ class _Implementation(test_interfaces.Implementation):
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
@ -117,16 +113,18 @@ class _Implementation(test_interfaces.Implementation):
service_grpc_link.stop_gracefully()
def invocation_initial_metadata(self):
return _INVOCATION_INITIAL_METADATA
return grpc_test_common.INVOCATION_INITIAL_METADATA
def service_initial_metadata(self):
return _SERVICE_INITIAL_METADATA
return grpc_test_common.SERVICE_INITIAL_METADATA
def invocation_completion(self):
return utilities.completion(None, None, None)
def service_completion(self):
return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE)
return utilities.completion(
grpc_test_common.SERVICE_TERMINAL_METADATA, _CODE,
grpc_test_common.DETAILS)
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(
@ -146,14 +144,6 @@ class _Implementation(test_interfaces.Implementation):
return True
def setUpModule():
logging.warn('setUpModule!')
def tearDownModule():
logging.warn('tearDownModule!')
def load_tests(loader, tests, pattern):
return unittest.TestSuite(
tests=tuple(

@ -39,11 +39,10 @@ from grpc.framework.core import implementations as core_implementations
from grpc.framework.crust import implementations as crust_implementations
from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.links import utilities
from grpc_test import test_common
from grpc_test import test_common as grpc_test_common
from grpc_test.framework.common import test_constants
from grpc_test.framework.interfaces.face import test_cases
from grpc_test.framework.interfaces.face import test_interfaces
from grpc_test.framework.interfaces.links import test_utilities
class _SerializationBehaviors(
@ -85,7 +84,7 @@ class _Implementation(test_interfaces.Implementation):
service_grpc_link = service.service_link(
serialization_behaviors.request_deserializers,
serialization_behaviors.response_serializers)
port = service_grpc_link.add_port(0, None)
port = service_grpc_link.add_port('[::]:0', None)
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_grpc_link = invocation.invocation_link(
channel, b'localhost',
@ -130,19 +129,19 @@ class _Implementation(test_interfaces.Implementation):
pool.shutdown(wait=True)
def invocation_metadata(self):
return test_common.INVOCATION_INITIAL_METADATA
return grpc_test_common.INVOCATION_INITIAL_METADATA
def initial_metadata(self):
return test_common.SERVICE_INITIAL_METADATA
return grpc_test_common.SERVICE_INITIAL_METADATA
def terminal_metadata(self):
return test_common.SERVICE_TERMINAL_METADATA
return grpc_test_common.SERVICE_TERMINAL_METADATA
def code(self):
return _intermediary_low.Code.OK
def details(self):
return test_common.DETAILS
return grpc_test_common.DETAILS
def metadata_transmitted(self, original_metadata, transmitted_metadata):
return original_metadata is None or grpc_test_common.metadata_transmitted(

@ -50,7 +50,7 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase):
service_link = service.service_link(
{self.group_and_method(): self.deserialize_request},
{self.group_and_method(): self.serialize_response})
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
@ -116,7 +116,7 @@ class RoundTripTest(unittest.TestCase):
identity_transformation, identity_transformation)
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(
@ -160,7 +160,7 @@ class RoundTripTest(unittest.TestCase):
{(test_group, test_method): scenario.serialize_response})
service_mate = test_utilities.RecordingLink()
service_link.join_link(service_mate)
port = service_link.add_port(0, None)
port = service_link.add_port('[::]:0', None)
service_link.start()
channel = _intermediary_low.Channel('localhost:%d' % port, None)
invocation_link = invocation.invocation_link(

@ -31,6 +31,11 @@
import collections
INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),)
SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),)
SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),)
DETAILS = b'test details'
def metadata_transmitted(original_metadata, transmitted_metadata):
"""Judges whether or not metadata was acceptably transmitted.

@ -135,7 +135,7 @@ static void test_receive(int number_of_clients) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (i = 0; i < number_of_clients; i++) {
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(4000);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
number_of_reads_before = g_number_of_reads;
/* Create a socket, send a packet to the UDP server. */

@ -56,7 +56,7 @@ extern double g_fixture_slowdown_factor;
#define GRPC_TIMEOUT_SECONDS_TO_DEADLINE(x) \
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), \
gpr_time_from_micros(GRPC_TEST_SLOWDOWN_FACTOR * 1e6 * (x), \
gpr_time_from_millis(GRPC_TEST_SLOWDOWN_FACTOR * 1e3 * (x), \
GPR_TIMESPAN))
#define GRPC_TIMEOUT_MILLIS_TO_DEADLINE(x) \

@ -200,8 +200,8 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
}
void ResetStub() {
std::shared_ptr<Channel> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}
@ -750,8 +750,8 @@ TEST_P(AsyncEnd2endTest, ServerCheckDone) {
}
TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
std::shared_ptr<Channel> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureCredentials());
std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
stub =
std::move(grpc::cpp::test::util::UnimplementedService::NewStub(channel));

@ -76,7 +76,7 @@ class CrashTest : public ::testing::Test {
}));
GPR_ASSERT(server_);
return grpc::cpp::test::util::TestService::NewStub(
CreateChannel(addr, InsecureCredentials(), ChannelArguments()));
CreateChannel(addr, InsecureCredentials()));
}
void KillServer() { server_.reset(); }

@ -291,8 +291,8 @@ class End2endTest : public ::testing::TestWithParam<bool> {
ChannelArguments args;
args.SetSslTargetNameOverride("foo.test.google.fr");
args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
channel_ =
CreateChannel(server_address_.str(), SslCredentials(ssl_opts), args);
channel_ = CreateCustomChannel(server_address_.str(),
SslCredentials(ssl_opts), args);
}
void ResetStub(bool use_proxy) {
@ -307,8 +307,7 @@ class End2endTest : public ::testing::TestWithParam<bool> {
builder.RegisterService(proxy_service_.get());
proxy_server_ = builder.BuildAndStart();
channel_ = CreateChannel(proxyaddr.str(), InsecureCredentials(),
ChannelArguments());
channel_ = CreateChannel(proxyaddr.str(), InsecureCredentials());
}
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
@ -566,7 +565,7 @@ TEST_F(End2endTest, BadCredentials) {
std::shared_ptr<Credentials> bad_creds = GoogleRefreshTokenCredentials("");
EXPECT_EQ(static_cast<Credentials*>(nullptr), bad_creds.get());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), bad_creds, ChannelArguments());
CreateChannel(server_address_.str(), bad_creds);
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub(
grpc::cpp::test::util::TestService::NewStub(channel));
EchoRequest request;

@ -121,8 +121,8 @@ class GenericEnd2endTest : public ::testing::Test {
}
void ResetStub() {
std::shared_ptr<Channel> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureCredentials());
generic_stub_.reset(new GenericStub(channel));
}

@ -245,8 +245,8 @@ class MockTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() {
std::shared_ptr<Channel> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}

@ -58,8 +58,8 @@ using namespace gflags;
int main(int argc, char** argv) {
ParseCommandLineFlags(&argc, &argv, true);
auto stub = grpc::cpp::test::util::TestService::NewStub(grpc::CreateChannel(
FLAGS_address, grpc::InsecureCredentials(), grpc::ChannelArguments()));
auto stub = grpc::cpp::test::util::TestService::NewStub(
grpc::CreateChannel(FLAGS_address, grpc::InsecureCredentials()));
EchoRequest request;
EchoResponse response;

@ -95,7 +95,7 @@ class ShutdownTest : public ::testing::Test {
void ResetStub() {
string target = "dns:localhost:" + to_string(port_);
channel_ = CreateChannel(target, InsecureCredentials(), ChannelArguments());
channel_ = CreateChannel(target, InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}

@ -191,8 +191,8 @@ class End2endTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() {
std::shared_ptr<Channel> channel = CreateChannel(
server_address_.str(), InsecureCredentials(), ChannelArguments());
std::shared_ptr<Channel> channel =
CreateChannel(server_address_.str(), InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel));
}

@ -159,7 +159,7 @@ class ZookeeperTest : public ::testing::Test {
void ResetStub() {
string target = "zookeeper://" + zookeeper_address_ + "/test";
channel_ = CreateChannel(target, InsecureCredentials(), ChannelArguments());
channel_ = CreateChannel(target, InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}

@ -154,8 +154,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* servers = new ServerData[num_servers];
for (size_t i = 0; i < num_servers; i++) {
servers[i].stub = std::move(Worker::NewStub(
CreateChannel(workers[i], InsecureCredentials(), ChannelArguments())));
servers[i].stub = std::move(
Worker::NewStub(CreateChannel(workers[i], InsecureCredentials())));
ServerArgs args;
result_server_config = server_config;
result_server_config.set_host(workers[i]);
@ -182,8 +182,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
// where class contained in std::vector must have a copy constructor
auto* clients = new ClientData[num_clients];
for (size_t i = 0; i < num_clients; i++) {
clients[i].stub = std::move(Worker::NewStub(CreateChannel(
workers[i + num_servers], InsecureCredentials(), ChannelArguments())));
clients[i].stub = std::move(Worker::NewStub(
CreateChannel(workers[i + num_servers], InsecureCredentials())));
ClientArgs args;
result_client_config = client_config;
result_client_config.set_host(workers[i + num_servers]);

@ -116,8 +116,8 @@ class PerfDbReporter : public Reporter {
test_name_(test_name),
sys_info_(sys_info),
tag_(tag) {
perf_db_client_.init(grpc::CreateChannel(
server_address, grpc::InsecureCredentials(), ChannelArguments()));
perf_db_client_.init(
grpc::CreateChannel(server_address, grpc::InsecureCredentials()));
}
~PerfDbReporter() GRPC_OVERRIDE { SendData(); };

@ -91,8 +91,7 @@ class CliCallTest : public ::testing::Test {
void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
void ResetStub() {
channel_ = CreateChannel(server_address_.str(), InsecureCredentials(),
ChannelArguments());
channel_ = CreateChannel(server_address_.str(), InsecureCredentials());
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}

@ -74,9 +74,9 @@ std::shared_ptr<Channel> CreateTestChannel(
if (creds.get()) {
channel_creds = CompositeCredentials(creds, channel_creds);
}
return CreateChannel(connect_to, channel_creds, channel_args);
return CreateCustomChannel(connect_to, channel_creds, channel_args);
} else {
return CreateChannel(server, InsecureCredentials(), channel_args);
return CreateChannel(server, InsecureCredentials());
}
}

@ -159,7 +159,7 @@ int main(int argc, char** argv) {
}
}
std::shared_ptr<grpc::Channel> channel =
grpc::CreateChannel(server_address, creds, grpc::ChannelArguments());
grpc::CreateChannel(server_address, creds);
grpc::string response;
std::multimap<grpc::string, grpc::string> client_metadata;

@ -66,7 +66,10 @@ int main(void) {
dump();
clear();
for (i = 32; i <= 126; i++) legal(i);
for (i = 32; i <= 126; i++) {
if (i == ',') continue;
legal(i);
}
dump();
return 0;

Loading…
Cancel
Save