added support for metadata

pull/1053/head
Jan Tattermusch 10 years ago
parent 1cc9fae6b4
commit c0b3721d61
  1. 14
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  2. 4
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 62
      src/csharp/Grpc.Core.Tests/Internal/MetadataArraySafeHandleTest.cs
  4. 55
      src/csharp/Grpc.Core/Call.cs
  5. 30
      src/csharp/Grpc.Core/Calls.cs
  6. 7
      src/csharp/Grpc.Core/Grpc.Core.csproj
  7. 37
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  8. 33
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  9. 72
      src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
  10. 5
      src/csharp/Grpc.Core/Marshaller.cs
  11. 126
      src/csharp/Grpc.Core/Metadata.cs
  12. 15
      src/csharp/Grpc.Core/ServerServiceDefinition.cs
  13. 73
      src/csharp/Grpc.Core/Stub/AbstractStub.cs
  14. 64
      src/csharp/Grpc.Core/Stub/StubConfiguration.cs
  15. 9
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  16. 36
      src/csharp/Grpc.Examples/MathGrpc.cs
  17. 3
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  18. 41
      src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs
  19. 162
      src/csharp/ext/grpc_csharp_ext.c

@ -46,6 +46,8 @@ namespace Grpc.Core.Tests
{ {
string host = "localhost"; string host = "localhost";
string serviceName = "/tests.Test";
Method<string, string> unaryEchoStringMethod = new Method<string, string>( Method<string, string> unaryEchoStringMethod = new Method<string, string>(
MethodType.Unary, MethodType.Unary,
"/tests.Test/UnaryEchoString", "/tests.Test/UnaryEchoString",
@ -69,7 +71,7 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService") ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
@ -77,7 +79,7 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
@ -92,7 +94,7 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService") ServerServiceDefinition.CreateBuilder(serviceName)
.AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build()); .AddMethod(unaryEchoStringMethod, HandleUnaryEchoString).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
@ -100,7 +102,7 @@ namespace Grpc.Core.Tests
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
BenchmarkUtil.RunBenchmark(100, 1000, BenchmarkUtil.RunBenchmark(100, 1000,
() => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); }); () => { Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)); });
} }
@ -113,14 +115,14 @@ namespace Grpc.Core.Tests
{ {
Server server = new Server(); Server server = new Server();
server.AddServiceDefinition( server.AddServiceDefinition(
ServerServiceDefinition.CreateBuilder("someService").Build()); ServerServiceDefinition.CreateBuilder(serviceName).Build());
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
server.Start(); server.Start();
using (Channel channel = new Channel(host + ":" + port)) using (Channel channel = new Channel(host + ":" + port))
{ {
var call = new Call<string, string>(unaryEchoStringMethod, channel); var call = new Call<string, string>(serviceName, unaryEchoStringMethod, channel, Metadata.Empty);
try try
{ {

@ -42,6 +42,7 @@
<Compile Include="GrpcEnvironmentTest.cs" /> <Compile Include="GrpcEnvironmentTest.cs" />
<Compile Include="TimespecTest.cs" /> <Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" /> <Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
</ItemGroup> </ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup> <ItemGroup>
@ -56,4 +57,7 @@
<ItemGroup> <ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="Internal\" />
</ItemGroup>
</Project> </Project>

@ -0,0 +1,62 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class MetadataArraySafeHandleTest
{
[Test]
public void CreateEmptyAndDestroy()
{
var metadata = Metadata.CreateBuilder().Build();
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
[Test]
public void CreateAndDestroy()
{
var metadata = Metadata.CreateBuilder()
.Add(new Metadata.MetadataEntry("host", "somehost"))
.Add(new Metadata.MetadataEntry("header2", "header value")).Build();
var nativeMetadata = MetadataArraySafeHandle.Create(metadata);
nativeMetadata.Dispose();
}
}
}

@ -33,65 +33,70 @@
using System; using System;
using Grpc.Core.Internal; using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
public class Call<TRequest, TResponse> public class Call<TRequest, TResponse>
{ {
readonly string methodName; readonly string name;
readonly Func<TRequest, byte[]> requestSerializer; readonly Marshaller<TRequest> requestMarshaller;
readonly Func<byte[], TResponse> responseDeserializer; readonly Marshaller<TResponse> responseMarshaller;
readonly Channel channel; readonly Channel channel;
readonly Metadata headers;
public Call(string methodName, public Call(string serviceName, Method<TRequest, TResponse> method, Channel channel, Metadata headers)
Func<TRequest, byte[]> requestSerializer,
Func<byte[], TResponse> responseDeserializer,
TimeSpan timeout,
Channel channel)
{ {
this.methodName = methodName; this.name = Preconditions.CheckNotNull(serviceName) + "/" + method.Name;
this.requestSerializer = requestSerializer; this.requestMarshaller = method.RequestMarshaller;
this.responseDeserializer = responseDeserializer; this.responseMarshaller = method.ResponseMarshaller;
this.channel = channel; this.channel = Preconditions.CheckNotNull(channel);
this.headers = Preconditions.CheckNotNull(headers);
} }
public Call(Method<TRequest, TResponse> method, Channel channel) public Channel Channel
{ {
this.methodName = method.Name; get
this.requestSerializer = method.RequestMarshaller.Serializer; {
this.responseDeserializer = method.ResponseMarshaller.Deserializer; return this.channel;
this.channel = channel; }
} }
public Channel Channel /// <summary>
/// Full methods name including the service name.
/// </summary>
public string Name
{ {
get get
{ {
return this.channel; return name;
} }
} }
public string MethodName /// <summary>
/// Headers to send at the beginning of the call.
/// </summary>
public Metadata Headers
{ {
get get
{ {
return this.methodName; return headers;
} }
} }
public Func<TRequest, byte[]> RequestSerializer public Marshaller<TRequest> RequestMarshaller
{ {
get get
{ {
return this.requestSerializer; return requestMarshaller;
} }
} }
public Func<byte[], TResponse> ResponseDeserializer public Marshaller<TResponse> ResponseMarshaller
{ {
get get
{ {
return this.responseDeserializer; return responseMarshaller;
} }
} }
} }

@ -45,30 +45,29 @@ namespace Grpc.Core
{ {
public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
return asyncCall.UnaryCall(call.Channel, call.MethodName, req); return asyncCall.UnaryCall(call.Channel, call.Name, req, call.Headers);
} }
public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
return await asyncCall.UnaryCallAsync(req); return await asyncCall.UnaryCallAsync(req, call.Headers);
} }
public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.StartServerStreamingCall(req, outputs, call.Headers);
asyncCall.StartServerStreamingCall(req, outputs);
} }
public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
var task = asyncCall.ClientStreamingCallAsync(); var task = asyncCall.ClientStreamingCallAsync(call.Headers);
var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs); return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs);
} }
@ -80,10 +79,9 @@ namespace Grpc.Core
public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token) public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token)
{ {
var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestMarshaller.Serializer, call.ResponseMarshaller.Deserializer);
asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.Name);
asyncCall.StartDuplexStreamingCall(outputs, call.Headers);
asyncCall.StartDuplexStreamingCall(outputs);
return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall);
} }

@ -79,6 +79,10 @@
<Compile Include="Utils\Preconditions.cs" /> <Compile Include="Utils\Preconditions.cs" />
<Compile Include="Internal\ServerCredentialsSafeHandle.cs" /> <Compile Include="Internal\ServerCredentialsSafeHandle.cs" />
<Compile Include="ServerCredentials.cs" /> <Compile Include="ServerCredentials.cs" />
<Compile Include="Metadata.cs" />
<Compile Include="Internal\MetadataArraySafeHandle.cs" />
<Compile Include="Stub\AbstractStub.cs" />
<Compile Include="Stub\StubConfiguration.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="packages.config" /> <None Include="packages.config" />
@ -96,4 +100,7 @@
<Otherwise /> <Otherwise />
</Choose> </Choose>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
<Folder Include="Stub\" />
</ItemGroup>
</Project> </Project>

@ -77,7 +77,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Blocking unary request - unary response call. /// Blocking unary request - unary response call.
/// </summary> /// </summary>
public TResponse UnaryCall(Channel channel, string methodName, TRequest msg) public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers)
{ {
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{ {
@ -92,7 +92,11 @@ namespace Grpc.Core.Internal
halfcloseRequested = true; halfcloseRequested = true;
readingDone = true; readingDone = true;
} }
call.BlockingUnary(cq, payload, unaryResponseHandler);
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
}
try try
{ {
@ -109,7 +113,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Starts a unary request - unary response call. /// Starts a unary request - unary response call.
/// </summary> /// </summary>
public Task<TResponse> UnaryCallAsync(TRequest msg) public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
@ -122,8 +126,10 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartUnary(payload, unaryResponseHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartUnary(payload, unaryResponseHandler, metadataArray);
}
return unaryResponseTcs.Task; return unaryResponseTcs.Task;
} }
} }
@ -132,7 +138,7 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call. /// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary> /// </summary>
public Task<TResponse> ClientStreamingCallAsync() public Task<TResponse> ClientStreamingCallAsync(Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
@ -142,7 +148,10 @@ namespace Grpc.Core.Internal
readingDone = true; readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>(); unaryResponseTcs = new TaskCompletionSource<TResponse>();
call.StartClientStreaming(unaryResponseHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartClientStreaming(unaryResponseHandler, metadataArray);
}
return unaryResponseTcs.Task; return unaryResponseTcs.Task;
} }
@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
/// <summary> /// <summary>
/// Starts a unary request - streamed response call. /// Starts a unary request - streamed response call.
/// </summary> /// </summary>
public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver) public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
@ -165,7 +174,10 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg); byte[] payload = UnsafeSerialize(msg);
call.StartServerStreaming(payload, finishedHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartServerStreaming(payload, finishedHandler, metadataArray);
}
StartReceiveMessage(); StartReceiveMessage();
} }
@ -175,7 +187,7 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call. /// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary> /// </summary>
public void StartDuplexStreamingCall(IObserver<TResponse> readObserver) public void StartDuplexStreamingCall(IObserver<TResponse> readObserver, Metadata headers)
{ {
lock (myLock) lock (myLock)
{ {
@ -185,7 +197,10 @@ namespace Grpc.Core.Internal
this.readObserver = readObserver; this.readObserver = readObserver;
call.StartDuplexStreaming(finishedHandler); using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartDuplexStreaming(finishedHandler, metadataArray);
}
StartReceiveMessage(); StartReceiveMessage();
} }

@ -57,25 +57,28 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq, static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len); byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
@ -109,29 +112,29 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline); return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
} }
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length))); AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
} }
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback) public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length)); grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
} }
public void StartClientStreaming(CompletionCallbackDelegate callback) public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
} }
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length))); AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
} }
public void StartDuplexStreaming(CompletionCallbackDelegate callback) public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{ {
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
} }
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)

@ -0,0 +1,72 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_metadata_array from <grpc/grpc.h>
/// </summary>
internal class MetadataArraySafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern MetadataArraySafeHandle grpcsharp_metadata_array_create(UIntPtr capacity);
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
static extern void grpcsharp_metadata_array_add(MetadataArraySafeHandle array, string key, byte[] value, UIntPtr valueLength);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_metadata_array_destroy_full(IntPtr array);
private MetadataArraySafeHandle()
{
}
public static MetadataArraySafeHandle Create(Metadata metadata)
{
var entries = metadata.Entries;
var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)entries.Count));
for (int i = 0; i < entries.Count; i++)
{
grpcsharp_metadata_array_add(metadataArray, entries[i].Key, entries[i].ValueBytes, new UIntPtr((ulong)entries[i].ValueBytes.Length));
}
return metadataArray;
}
protected override bool ReleaseHandle()
{
grpcsharp_metadata_array_destroy_full(handle);
return true;
}
}
}

@ -32,6 +32,7 @@
#endregion #endregion
using System; using System;
using Grpc.Core.Utils;
namespace Grpc.Core namespace Grpc.Core
{ {
@ -45,8 +46,8 @@ namespace Grpc.Core
public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer) public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer)
{ {
this.serializer = serializer; this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = deserializer; this.deserializer = Preconditions.CheckNotNull(deserializer);
} }
public Func<T, byte[]> Serializer public Func<T, byte[]> Serializer

@ -0,0 +1,126 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using System.Text;
namespace Grpc.Core
{
/// <summary>
/// gRPC call metadata.
/// </summary>
public class Metadata
{
public static readonly Metadata Empty = new Metadata(ImmutableList<MetadataEntry>.Empty);
readonly ImmutableList<MetadataEntry> entries;
public Metadata(ImmutableList<MetadataEntry> entries)
{
this.entries = entries;
}
public ImmutableList<MetadataEntry> Entries
{
get
{
return this.entries;
}
}
public static Builder CreateBuilder()
{
return new Builder();
}
public struct MetadataEntry
{
readonly string key;
readonly byte[] valueBytes;
public MetadataEntry(string key, byte[] valueBytes)
{
this.key = key;
this.valueBytes = valueBytes;
}
public MetadataEntry(string key, string value)
{
this.key = key;
this.valueBytes = Encoding.ASCII.GetBytes(value);
}
public string Key
{
get
{
return this.key;
}
}
// TODO: using ByteString would guarantee immutability.
public byte[] ValueBytes
{
get
{
return this.valueBytes;
}
}
}
public class Builder
{
readonly List<Metadata.MetadataEntry> entries = new List<Metadata.MetadataEntry>();
public List<MetadataEntry> Entries
{
get
{
return entries;
}
}
public Builder Add(MetadataEntry entry)
{
entries.Add(entry);
return this;
}
public Metadata Build()
{
return new Metadata(entries.ToImmutableList());
}
}
}
}

@ -43,12 +43,10 @@ namespace Grpc.Core
/// </summary> /// </summary>
public class ServerServiceDefinition public class ServerServiceDefinition
{ {
readonly string serviceName;
readonly ImmutableDictionary<string, IServerCallHandler> callHandlers; readonly ImmutableDictionary<string, IServerCallHandler> callHandlers;
private ServerServiceDefinition(string serviceName, ImmutableDictionary<string, IServerCallHandler> callHandlers) private ServerServiceDefinition(ImmutableDictionary<string, IServerCallHandler> callHandlers)
{ {
this.serviceName = serviceName;
this.callHandlers = callHandlers; this.callHandlers = callHandlers;
} }
@ -79,7 +77,7 @@ namespace Grpc.Core
Method<TRequest, TResponse> method, Method<TRequest, TResponse> method,
UnaryRequestServerMethod<TRequest, TResponse> handler) UnaryRequestServerMethod<TRequest, TResponse> handler)
{ {
callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler)); callHandlers.Add(GetFullMethodName(serviceName, method.Name), ServerCalls.UnaryRequestCall(method, handler));
return this; return this;
} }
@ -87,13 +85,18 @@ namespace Grpc.Core
Method<TRequest, TResponse> method, Method<TRequest, TResponse> method,
StreamingRequestServerMethod<TRequest, TResponse> handler) StreamingRequestServerMethod<TRequest, TResponse> handler)
{ {
callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler)); callHandlers.Add(GetFullMethodName(serviceName, method.Name), ServerCalls.StreamingRequestCall(method, handler));
return this; return this;
} }
public ServerServiceDefinition Build() public ServerServiceDefinition Build()
{ {
return new ServerServiceDefinition(serviceName, callHandlers.ToImmutableDictionary()); return new ServerServiceDefinition(callHandlers.ToImmutableDictionary());
}
private string GetFullMethodName(string serviceName, string methodName)
{
return serviceName + "/" + methodName;
} }
} }
} }

@ -0,0 +1,73 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
namespace Grpc.Core
{
// TODO: support adding timeout to methods.
/// <summary>
/// Base for client-side stubs.
/// </summary>
public abstract class AbstractStub<TStub, TConfig>
where TConfig : StubConfiguration
{
readonly Channel channel;
readonly TConfig config;
public AbstractStub(Channel channel, TConfig config)
{
this.channel = channel;
this.config = config;
}
public Channel Channel
{
get
{
return this.channel;
}
}
/// <summary>
/// Creates a new call to given method.
/// </summary>
protected Call<TRequest, TResponse> CreateCall<TRequest, TResponse>(string serviceName, Method<TRequest, TResponse> method)
{
var headerBuilder = Metadata.CreateBuilder();
config.HeaderInterceptor(headerBuilder);
return new Call<TRequest, TResponse>(serviceName, method, channel, headerBuilder.Build());
}
}
}

@ -0,0 +1,64 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
namespace Grpc.Core
{
public delegate void HeaderInterceptorDelegate(Metadata.Builder headerBuilder);
public class StubConfiguration
{
/// <summary>
/// The default stub configuration.
/// </summary>
public static readonly StubConfiguration Default = new StubConfiguration((headerBuilder) => { });
readonly HeaderInterceptorDelegate headerInterceptor;
public StubConfiguration(HeaderInterceptorDelegate headerInterceptor)
{
this.headerInterceptor = Preconditions.CheckNotNull(headerInterceptor);
}
public HeaderInterceptorDelegate HeaderInterceptor
{
get
{
return headerInterceptor;
}
}
}
}

@ -61,7 +61,14 @@ namespace math.Tests
int port = server.AddPort(host + ":0"); int port = server.AddPort(host + ":0");
server.Start(); server.Start();
channel = new Channel(host + ":" + port); channel = new Channel(host + ":" + port);
client = MathGrpc.NewStub(channel);
// TODO: get rid of the custom header here once we have dedicated tests
// for header support.
var stubConfig = new StubConfiguration((headerBuilder) =>
{
headerBuilder.Add(new Metadata.MetadataEntry("customHeader", "abcdef"));
});
client = MathGrpc.NewStub(channel, stubConfig);
} }
[TestFixtureTearDown] [TestFixtureTearDown]

@ -45,6 +45,8 @@ namespace math
/// </summary> /// </summary>
public class MathGrpc public class MathGrpc
{ {
static readonly string ServiceName = "/math.Math";
static readonly Marshaller<DivArgs> DivArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom); static readonly Marshaller<DivArgs> DivArgsMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivArgs.ParseFrom);
static readonly Marshaller<DivReply> DivReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom); static readonly Marshaller<DivReply> DivReplyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), DivReply.ParseFrom);
static readonly Marshaller<Num> NumMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom); static readonly Marshaller<Num> NumMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Num.ParseFrom);
@ -52,25 +54,25 @@ namespace math
static readonly Method<DivArgs, DivReply> DivMethod = new Method<DivArgs, DivReply>( static readonly Method<DivArgs, DivReply> DivMethod = new Method<DivArgs, DivReply>(
MethodType.Unary, MethodType.Unary,
"/math.Math/Div", "Div",
DivArgsMarshaller, DivArgsMarshaller,
DivReplyMarshaller); DivReplyMarshaller);
static readonly Method<FibArgs, Num> FibMethod = new Method<FibArgs, Num>( static readonly Method<FibArgs, Num> FibMethod = new Method<FibArgs, Num>(
MethodType.ServerStreaming, MethodType.ServerStreaming,
"/math.Math/Fib", "Fib",
FibArgsMarshaller, FibArgsMarshaller,
NumMarshaller); NumMarshaller);
static readonly Method<Num, Num> SumMethod = new Method<Num, Num>( static readonly Method<Num, Num> SumMethod = new Method<Num, Num>(
MethodType.ClientStreaming, MethodType.ClientStreaming,
"/math.Math/Sum", "Sum",
NumMarshaller, NumMarshaller,
NumMarshaller); NumMarshaller);
static readonly Method<DivArgs, DivReply> DivManyMethod = new Method<DivArgs, DivReply>( static readonly Method<DivArgs, DivReply> DivManyMethod = new Method<DivArgs, DivReply>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/math.Math/DivMany", "DivMany",
DivArgsMarshaller, DivArgsMarshaller,
DivReplyMarshaller); DivReplyMarshaller);
@ -87,42 +89,43 @@ namespace math
IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken)); IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken));
} }
public class MathServiceClientStub : IMathServiceClient public class MathServiceClientStub : AbstractStub<MathServiceClientStub, StubConfiguration>, IMathServiceClient
{ {
readonly Channel channel; public MathServiceClientStub(Channel channel) : this(channel, StubConfiguration.Default)
{
}
public MathServiceClientStub(Channel channel) public MathServiceClientStub(Channel channel, StubConfiguration config) : base(channel, config)
{ {
this.channel = channel;
} }
public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken)) public DivReply Div(DivArgs request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivMethod, channel); var call = CreateCall(ServiceName, DivMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken)) public Task<DivReply> DivAsync(DivArgs request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivMethod, channel); var call = CreateCall(ServiceName, DivMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken)) public void Fib(FibArgs request, IObserver<Num> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<FibArgs, Num>(FibMethod, channel); var call = CreateCall(ServiceName, FibMethod);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token); Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
} }
public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken)) public ClientStreamingAsyncResult<Num, Num> Sum(CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Num, Num>(SumMethod, channel); var call = CreateCall(ServiceName, SumMethod);
return Calls.AsyncClientStreamingCall(call, token); return Calls.AsyncClientStreamingCall(call, token);
} }
public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<DivArgs> DivMany(IObserver<DivReply> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<DivArgs, DivReply>(DivManyMethod, channel); var call = CreateCall(ServiceName, DivManyMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
} }
@ -141,7 +144,7 @@ namespace math
public static ServerServiceDefinition BindService(IMathService serviceImpl) public static ServerServiceDefinition BindService(IMathService serviceImpl)
{ {
return ServerServiceDefinition.CreateBuilder("/math.Math/") return ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(DivMethod, serviceImpl.Div) .AddMethod(DivMethod, serviceImpl.Div)
.AddMethod(FibMethod, serviceImpl.Fib) .AddMethod(FibMethod, serviceImpl.Fib)
.AddMethod(SumMethod, serviceImpl.Sum) .AddMethod(SumMethod, serviceImpl.Sum)
@ -152,5 +155,10 @@ namespace math
{ {
return new MathServiceClientStub(channel); return new MathServiceClientStub(channel);
} }
public static IMathServiceClient NewStub(Channel channel, StubConfiguration config)
{
return new MathServiceClientStub(channel, config);
}
} }
} }

@ -39,8 +39,7 @@
<Reference Include="Google.ProtocolBuffers"> <Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath> <HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference> </Reference>
<Reference Include="System.Collections.Immutable, Version=1.1.34.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> <Reference Include="System.Collections.Immutable">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\System.Collections.Immutable.1.1.34-rc\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> <HintPath>..\packages\System.Collections.Immutable.1.1.34-rc\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference> </Reference>
</ItemGroup> </ItemGroup>

@ -44,6 +44,8 @@ namespace grpc.testing
/// </summary> /// </summary>
public class TestServiceGrpc public class TestServiceGrpc
{ {
static readonly string ServiceName = "/grpc.testing.TestService";
static readonly Marshaller<Empty> EmptyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom); static readonly Marshaller<Empty> EmptyMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), Empty.ParseFrom);
static readonly Marshaller<SimpleRequest> SimpleRequestMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom); static readonly Marshaller<SimpleRequest> SimpleRequestMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleRequest.ParseFrom);
static readonly Marshaller<SimpleResponse> SimpleResponseMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom); static readonly Marshaller<SimpleResponse> SimpleResponseMarshaller = Marshallers.Create((arg) => arg.ToByteArray(), SimpleResponse.ParseFrom);
@ -54,37 +56,37 @@ namespace grpc.testing
static readonly Method<Empty, Empty> EmptyCallMethod = new Method<Empty, Empty>( static readonly Method<Empty, Empty> EmptyCallMethod = new Method<Empty, Empty>(
MethodType.Unary, MethodType.Unary,
"/grpc.testing.TestService/EmptyCall", "EmptyCall",
EmptyMarshaller, EmptyMarshaller,
EmptyMarshaller); EmptyMarshaller);
static readonly Method<SimpleRequest, SimpleResponse> UnaryCallMethod = new Method<SimpleRequest, SimpleResponse>( static readonly Method<SimpleRequest, SimpleResponse> UnaryCallMethod = new Method<SimpleRequest, SimpleResponse>(
MethodType.Unary, MethodType.Unary,
"/grpc.testing.TestService/UnaryCall", "UnaryCall",
SimpleRequestMarshaller, SimpleRequestMarshaller,
SimpleResponseMarshaller); SimpleResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> StreamingOutputCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> StreamingOutputCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.ServerStreaming, MethodType.ServerStreaming,
"/grpc.testing.TestService/StreamingOutputCall", "StreamingOutputCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCallMethod = new Method<StreamingInputCallRequest, StreamingInputCallResponse>( static readonly Method<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCallMethod = new Method<StreamingInputCallRequest, StreamingInputCallResponse>(
MethodType.ClientStreaming, MethodType.ClientStreaming,
"/grpc.testing.TestService/StreamingInputCall", "StreamingInputCall",
StreamingInputCallRequestMarshaller, StreamingInputCallRequestMarshaller,
StreamingInputCallResponseMarshaller); StreamingInputCallResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/grpc.testing.TestService/FullDuplexCall", "FullDuplexCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>( static readonly Method<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCallMethod = new Method<StreamingOutputCallRequest, StreamingOutputCallResponse>(
MethodType.DuplexStreaming, MethodType.DuplexStreaming,
"/grpc.testing.TestService/HalfDuplexCall", "HalfDuplexCall",
StreamingOutputCallRequestMarshaller, StreamingOutputCallRequestMarshaller,
StreamingOutputCallResponseMarshaller); StreamingOutputCallResponseMarshaller);
@ -107,60 +109,61 @@ namespace grpc.testing
IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)); IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
} }
public class TestServiceClientStub : ITestServiceClient public class TestServiceClientStub : AbstractStub<TestServiceClientStub, StubConfiguration>, ITestServiceClient
{ {
readonly Channel channel; public TestServiceClientStub(Channel channel) : base(channel, StubConfiguration.Default)
{
}
public TestServiceClientStub(Channel channel) public TestServiceClientStub(Channel channel, StubConfiguration config) : base(channel, config)
{ {
this.channel = channel;
} }
public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken)) public Empty EmptyCall(Empty request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Empty, Empty>(EmptyCallMethod, channel); var call = CreateCall(ServiceName, EmptyCallMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken)) public Task<Empty> EmptyCallAsync(Empty request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<Empty, Empty>(EmptyCallMethod, channel); var call = CreateCall(ServiceName, EmptyCallMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken)) public SimpleResponse UnaryCall(SimpleRequest request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(UnaryCallMethod, channel); var call = CreateCall(ServiceName, UnaryCallMethod);
return Calls.BlockingUnaryCall(call, request, token); return Calls.BlockingUnaryCall(call, request, token);
} }
public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)) public Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<SimpleRequest, SimpleResponse>(UnaryCallMethod, channel); var call = CreateCall(ServiceName, UnaryCallMethod);
return Calls.AsyncUnaryCall(call, request, token); return Calls.AsyncUnaryCall(call, request, token);
} }
public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(StreamingOutputCallMethod, channel); var call = CreateCall(ServiceName, StreamingOutputCallMethod);
Calls.AsyncServerStreamingCall(call, request, responseObserver, token); Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
} }
public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken)) public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingInputCallRequest, StreamingInputCallResponse>(StreamingInputCallMethod, channel); var call = CreateCall(ServiceName, StreamingInputCallMethod);
return Calls.AsyncClientStreamingCall(call, token); return Calls.AsyncClientStreamingCall(call, token);
} }
public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(FullDuplexCallMethod, channel); var call = CreateCall(ServiceName, FullDuplexCallMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken)) public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
{ {
var call = new Grpc.Core.Call<StreamingOutputCallRequest, StreamingOutputCallResponse>(HalfDuplexCallMethod, channel); var call = CreateCall(ServiceName, HalfDuplexCallMethod);
return Calls.DuplexStreamingCall(call, responseObserver, token); return Calls.DuplexStreamingCall(call, responseObserver, token);
} }
} }
@ -183,7 +186,7 @@ namespace grpc.testing
public static ServerServiceDefinition BindService(ITestService serviceImpl) public static ServerServiceDefinition BindService(ITestService serviceImpl)
{ {
return ServerServiceDefinition.CreateBuilder("/grpc.testing.TestService/") return ServerServiceDefinition.CreateBuilder(ServiceName)
.AddMethod(EmptyCallMethod, serviceImpl.EmptyCall) .AddMethod(EmptyCallMethod, serviceImpl.EmptyCall)
.AddMethod(UnaryCallMethod, serviceImpl.UnaryCall) .AddMethod(UnaryCallMethod, serviceImpl.UnaryCall)
.AddMethod(StreamingOutputCallMethod, serviceImpl.StreamingOutputCall) .AddMethod(StreamingOutputCallMethod, serviceImpl.StreamingOutputCall)

@ -102,34 +102,114 @@ grpcsharp_batch_context *grpcsharp_batch_context_create() {
return ctx; return ctx;
} }
/** /*
* Destroys metadata array including keys and values. * Destroys array->metadata.
* The array pointer itself is not freed.
*/
void grpcsharp_metadata_array_destroy_metadata_only(
grpc_metadata_array *array) {
gpr_free(array->metadata);
}
/*
* Destroys keys, values and array->metadata.
* The array pointer itself is not freed.
*/
void grpcsharp_metadata_array_destroy_metadata_including_entries(
grpc_metadata_array *array) {
size_t i;
if (array->metadata) {
for (i = 0; i < array->count; i++) {
gpr_free((void *)array->metadata[i].key);
gpr_free((void *)array->metadata[i].value);
}
}
gpr_free(array->metadata);
}
/*
* Fully destroys the metadata array.
*/
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_metadata_array_destroy_full(grpc_metadata_array *array) {
if (!array) {
return;
}
grpcsharp_metadata_array_destroy_metadata_including_entries(array);
gpr_free(array);
}
/*
* Creates an empty metadata array with given capacity.
* Array can later be destroyed by grpc_metadata_array_destroy_full.
*/ */
void grpcsharp_metadata_array_destroy_recursive(grpc_metadata_array *array) { GPR_EXPORT grpc_metadata_array *GPR_CALLTYPE
if (!array->metadata) { grpcsharp_metadata_array_create(size_t capacity) {
grpc_metadata_array *array =
(grpc_metadata_array *)gpr_malloc(sizeof(grpc_metadata_array));
grpc_metadata_array_init(array);
array->capacity = capacity;
array->count = 0;
if (capacity > 0) {
array->metadata =
(grpc_metadata *)gpr_malloc(sizeof(grpc_metadata) * capacity);
memset(array->metadata, 0, sizeof(grpc_metadata) * capacity);
} else {
array->metadata = NULL;
}
return array;
}
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_metadata_array_add(grpc_metadata_array *array, const char *key,
const char *value, size_t value_length) {
size_t i = array->count;
GPR_ASSERT(array->count < array->capacity);
array->metadata[i].key = gpr_strdup(key);
array->metadata[i].value = (char *)gpr_malloc(value_length);
memcpy((void *)array->metadata[i].value, value, value_length);
array->metadata[i].value_length = value_length;
array->count++;
}
/* Move contents of metadata array */
void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
grpc_metadata_array *src) {
if (!src) {
dest->capacity = 0;
dest->count = 0;
dest->metadata = NULL;
return; return;
} }
/* TODO: destroy also keys and values */
grpc_metadata_array_destroy(array); dest->capacity = src->capacity;
dest->count = src->count;
dest->metadata = src->metadata;
src->capacity = 0;
src->count = 0;
src->metadata = NULL;
} }
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) { void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) { if (!ctx) {
return; return;
} }
grpcsharp_metadata_array_destroy_recursive(&(ctx->send_initial_metadata)); grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_initial_metadata));
grpc_byte_buffer_destroy(ctx->send_message); grpc_byte_buffer_destroy(ctx->send_message);
grpcsharp_metadata_array_destroy_recursive( grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_status_from_server.trailing_metadata)); &(ctx->send_status_from_server.trailing_metadata));
gpr_free(ctx->send_status_from_server.status_details); gpr_free(ctx->send_status_from_server.status_details);
grpc_metadata_array_destroy(&(ctx->recv_initial_metadata)); grpcsharp_metadata_array_destroy_metadata_only(&(ctx->recv_initial_metadata));
grpc_byte_buffer_destroy(ctx->recv_message); grpc_byte_buffer_destroy(ctx->recv_message);
grpc_metadata_array_destroy(&(ctx->recv_status_on_client.trailing_metadata)); grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->recv_status_on_client.trailing_metadata));
gpr_free((void *)ctx->recv_status_on_client.status_details); gpr_free((void *)ctx->recv_status_on_client.status_details);
/* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is /* NOTE: ctx->server_rpc_new.call is not destroyed because callback handler is
@ -137,7 +217,8 @@ void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
to take its ownership. */ to take its ownership. */
grpc_call_details_destroy(&(ctx->server_rpc_new.call_details)); grpc_call_details_destroy(&(ctx->server_rpc_new.call_details));
grpc_metadata_array_destroy(&(ctx->server_rpc_new.request_metadata)); grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->server_rpc_new.request_metadata));
gpr_free(ctx); gpr_free(ctx);
} }
@ -346,17 +427,19 @@ grpcsharp_call_start_write_from_copied_buffer(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback, grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) { const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[6]; grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_SEND_MESSAGE; ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
@ -389,9 +472,11 @@ GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call, grpcsharp_call_blocking_unary(grpc_call *call,
grpc_completion_queue *dedicated_cq, grpc_completion_queue *dedicated_cq,
callback_funcptr callback, callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len) { const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer, GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
send_buffer_len) == GRPC_CALL_OK); send_buffer_len,
initial_metadata) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */ /* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) == GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
@ -403,17 +488,19 @@ grpcsharp_call_blocking_unary(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call, grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback) { callback_funcptr callback,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[4]; grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -435,21 +522,20 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
} }
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpcsharp_call_start_server_streaming(grpc_call *call, grpc_call *call, callback_funcptr callback, const char *send_buffer,
callback_funcptr callback, size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
const char *send_buffer,
size_t send_buffer_len) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[5]; grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_SEND_MESSAGE; ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len); ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
@ -476,17 +562,19 @@ grpcsharp_call_start_server_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback) { callback_funcptr callback,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */ /* TODO: don't use magic number */
grpc_op ops[3]; grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create(); grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback; ctx->callback = callback;
/* TODO: implement sending the metadata... */
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
/* ctx->send_initial_metadata is already zeroed out. */ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
ops[0].data.send_initial_metadata.count = 0; initial_metadata);
ops[0].data.send_initial_metadata.metadata = NULL; ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);

Loading…
Cancel
Save