added WriteOptions support and enabled NoCompress flag to be used for all writes

pull/2860/head
Jan Tattermusch 10 years ago
parent c29812fc2c
commit bff90ac384
  1. 16
      src/csharp/Grpc.Core/CallOptions.cs
  2. 63
      src/csharp/Grpc.Core/CompressionLevel.cs
  3. 2
      src/csharp/Grpc.Core/Grpc.Core.csproj
  4. 12
      src/csharp/Grpc.Core/IAsyncStreamWriter.cs
  5. 67
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  6. 4
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  7. 8
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  8. 57
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  9. 24
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  10. 12
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  11. 25
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  12. 31
      src/csharp/Grpc.Core/ServerCallContext.cs
  13. 83
      src/csharp/Grpc.Core/WriteOptions.cs
  14. 65
      src/csharp/ext/grpc_csharp_ext.c

@ -47,6 +47,7 @@ namespace Grpc.Core
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
readonly WriteOptions writeOptions;
/// <summary>
/// Creates a new instance of <c>CallOptions</c>.
@ -54,10 +55,12 @@ namespace Grpc.Core
/// <param name="headers">Headers to be sent with the call.</param>
/// <param name="deadline">Deadline for the call to finish. null means no deadline.</param>
/// <param name="cancellationToken">Can be used to request cancellation of the call.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
/// <param name="writeOptions">Write options that will be used for this call.</param>
public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken), WriteOptions writeOptions = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers != null ? headers : new Metadata();
// TODO(jtattermusch): allow null value of deadline?
this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
this.cancellationToken = cancellationToken;
}
@ -85,5 +88,16 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
/// <summary>
/// Write options that will be used for this call.
/// </summary>
public WriteOptions WriteOptions
{
get
{
return this.writeOptions;
}
}
}
}

@ -0,0 +1,63 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Compression level based on grpc_compression_level from grpc/compression.h
/// </summary>
public enum CompressionLevel
{
/// <summary>
/// No compression.
/// </summary>
None = 0,
/// <summary>
/// Low compression.
/// </summary>
Low,
/// <summary>
/// Medium compression.
/// </summary>
Medium,
/// <summary>
/// High compression.
/// </summary>
High,
}
}

@ -115,6 +115,8 @@
<Compile Include="ChannelState.cs" />
<Compile Include="CallInvocationDetails.cs" />
<Compile Include="CallOptions.cs" />
<Compile Include="CompressionLevel.cs" />
<Compile Include="WriteOptions.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />

@ -50,5 +50,17 @@ namespace Grpc.Core
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task WriteAsync(T message);
/// <summary>
/// Write options that will be used for the next write.
/// If null, default options will be used.
/// Once set, this property maintains its value across subsequent
/// writes.
/// Internally, closing the stream is on client and sending
/// status from server is treated as a write, so write options
/// are also applied to these operations.
/// </summary>
/// <value>The write options.</value>
WriteOptions WriteOptions { get; set; }
}
}

@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
readonly CallInvocationDetails<TRequest, TResponse> callDetails;
readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.callDetails = callDetails;
this.details = callDetails;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@ -89,11 +89,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
call.StartUnary(payload, ctx, metadataArray);
call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@ -130,7 +130,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@ -138,9 +138,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@ -157,14 +157,14 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
call.StartClientStreaming(HandleUnaryResponse, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
@ -181,16 +181,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartServerStreaming(payload, HandleFinished, metadataArray);
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@ -206,11 +206,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
Initialize(callDetails.Channel.Environment.CompletionQueue);
Initialize(details.Channel.Environment.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
call.StartDuplexStreaming(HandleFinished, metadataArray, GetWriteFlagsForCall());
}
}
}
@ -219,9 +219,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@ -238,14 +238,14 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
public void StartSendCloseFromClient(WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendCloseFromClient(HandleHalfclosed);
call.StartSendCloseFromClient(HandleHalfclosed, writeFlags);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@ -278,6 +278,14 @@ namespace Grpc.Core.Internal
}
}
public CallInvocationDetails<TRequest, TResponse> Details
{
get
{
return this.details;
}
}
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@ -310,14 +318,14 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@ -325,13 +333,22 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
var token = callDetails.Options.CancellationToken;
var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
}
}
/// <summary>
/// Gets WriteFlags set in callDetails.Options.WriteOptions
/// </summary>
private WriteFlags GetWriteFlagsForCall()
{
var writeOptions = details.Options.WriteOptions;
return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>
/// Handler for unary response completion.
/// </summary>

@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@ -132,7 +132,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(payload, HandleSendFinished);
call.StartSendMessage(HandleSendFinished, payload, writeFlags);
sendCompletionDelegate = completionDelegate;
}
}

@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@ -102,7 +102,7 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendStatusFromServer(Status status, Metadata trailers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
@ -111,7 +111,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags);
}
halfcloseRequested = true;
readingDone = true;

@ -53,32 +53,32 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
BatchContextSafeHandle ctx);
BatchContextSafeHandle ctx, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@ -88,6 +88,10 @@ namespace Grpc.Core.Internal
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@ -103,60 +107,60 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
grpcsharp_call_start_client_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
}
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@ -173,6 +177,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray, writeFlags).CheckOk();
}
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();

@ -40,24 +40,44 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
call.StartSendCloseFromClient(GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public WriteOptions WriteOptions
{
get
{
return this.writeOptions;
}
set
{
writeOptions = value;
}
}
private WriteFlags GetWriteFlags()
{
var options = writeOptions;
return options != null ? options.Flags : default(WriteFlags);
}
}
}

@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@ -304,13 +304,13 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken);
newRpc.RequestMetadata, cancellationToken, writeOptionsHolder);
}
}
}

@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@ -52,15 +53,33 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendMessage(message, taskSource.CompletionDelegate);
call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
call.StartSendStatusFromServer(status, trailers, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public WriteOptions WriteOptions
{
get
{
return writeOptions;
}
set
{
writeOptions = value;
}
}
private WriteFlags GetWriteFlags()
{
var options = writeOptions;
return options != null ? options.Flags : default(WriteFlags);
}
}
}

@ -36,6 +36,8 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
@ -54,8 +56,9 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
private IHasWriteOptions writeOptionsHolder;
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, IHasWriteOptions writeOptionsHolder)
{
this.method = method;
this.host = host;
@ -63,6 +66,7 @@ namespace Grpc.Core
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.writeOptionsHolder = writeOptionsHolder;
}
/// <summary>Name of method called in this RPC.</summary>
@ -141,5 +145,30 @@ namespace Grpc.Core
status = value;
}
}
/// <summary>
/// Allows setting write options for the following write.
/// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
/// Both properties are backed by the same underlying value.
/// </summary>
public WriteOptions WriteOptions
{
get
{
return writeOptionsHolder.WriteOptions;
}
set
{
writeOptionsHolder.WriteOptions = value;
}
}
}
/// <summary>
/// Allows sharing write options between ServerCallContext and other objects.
/// </summary>
public interface IHasWriteOptions
{
WriteOptions WriteOptions { get; set; }
}
}

@ -0,0 +1,83 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
namespace Grpc.Core
{
/// <summary>
/// Flags for write operations.
/// </summary>
[Flags]
public enum WriteFlags
{
/// <summary>
/// Hint that the write may be buffered and need not go out on the wire immediately.
/// gRPC is free to buffer the message until the next non-buffered
/// write, or until write stream completion, but it need not buffer completely or at all.
/// </summary>
BufferHint = 0x1,
/// <summary>
/// Force compression to be disabled for a particular write.
/// </summary>
NoCompress = 0x2
}
/// <summary>
/// Options for write operations.
/// </summary>
public class WriteOptions
{
/// <summary>
/// Default write options.
/// </summary>
public static readonly WriteOptions Default = new WriteOptions();
private WriteFlags flags;
public WriteOptions(WriteFlags flags = default(WriteFlags))
{
this.flags = flags;
}
public WriteFlags Flags
{
get
{
return this.flags;
}
}
}
}

@ -497,7 +497,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[6];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -506,15 +506,15 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = 0;
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
ops[2].flags = write_flags;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -542,7 +542,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -551,7 +551,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -578,7 +578,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -587,15 +587,15 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
ops[1].flags = 0;
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = 0;
ops[2].flags = write_flags;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -619,7 +619,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[3];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -628,7 +628,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -651,31 +651,31 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
const char *send_buffer, size_t send_buffer_len, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].flags = 0;
ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
grpcsharp_batch_context *ctx) {
grpcsharp_batch_context *ctx, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[0].flags = 0;
ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata) {
const char *status_details, grpc_metadata_array *trailing_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
@ -688,7 +688,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.count;
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@ -706,16 +706,29 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[0].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops[1].data.recv_close_on_server.cancelled =
(&ctx->recv_close_on_server_cancelled);
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_initial_metadata(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata,
gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}

Loading…
Cancel
Save