diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 8e9739335f0..e8d0b0647fd 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -47,6 +47,7 @@ namespace Grpc.Core
readonly Metadata headers;
readonly DateTime deadline;
readonly CancellationToken cancellationToken;
+ readonly WriteOptions writeOptions;
///
/// Creates a new instance of CallOptions .
@@ -54,10 +55,12 @@ namespace Grpc.Core
/// Headers to be sent with the call.
/// Deadline for the call to finish. null means no deadline.
/// Can be used to request cancellation of the call.
- public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
+ /// Write options that will be used for this call.
+ public CallOptions(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken), WriteOptions writeOptions = null)
{
// TODO(jtattermusch): consider only creating metadata object once it's really needed.
this.headers = headers != null ? headers : new Metadata();
+ // TODO(jtattermusch): allow null value of deadline?
this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
this.cancellationToken = cancellationToken;
}
@@ -85,5 +88,16 @@ namespace Grpc.Core
{
get { return cancellationToken; }
}
+
+ ///
+ /// Write options that will be used for this call.
+ ///
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/CompressionLevel.cs b/src/csharp/Grpc.Core/CompressionLevel.cs
new file mode 100644
index 00000000000..399652b85e5
--- /dev/null
+++ b/src/csharp/Grpc.Core/CompressionLevel.cs
@@ -0,0 +1,63 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ ///
+ /// Compression level based on grpc_compression_level from grpc/compression.h
+ ///
+ public enum CompressionLevel
+ {
+ ///
+ /// No compression.
+ ///
+ None = 0,
+
+ ///
+ /// Low compression.
+ ///
+ Low,
+
+ ///
+ /// Medium compression.
+ ///
+ Medium,
+
+ ///
+ /// High compression.
+ ///
+ High,
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 52defd1965c..0616ed9f3ec 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -115,6 +115,8 @@
+
+
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 2000210252d..b554b6e2660 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -50,5 +50,17 @@ namespace Grpc.Core
///
/// the message to be written. Cannot be null.
Task WriteAsync(T message);
+
+ ///
+ /// Write options that will be used for the next write.
+ /// If null, default options will be used.
+ /// Once set, this property maintains its value across subsequent
+ /// writes.
+ /// Internally, closing the stream is on client and sending
+ /// status from server is treated as a write, so write options
+ /// are also applied to these operations.
+ ///
+ /// The write options.
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 414b5c42820..c26b0773baf 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType>();
- readonly CallInvocationDetails callDetails;
+ readonly CallInvocationDetails details;
// Completion of a pending unary response if not null.
TaskCompletionSource unaryResponseTcs;
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
public AsyncCall(CallInvocationDetails callDetails)
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- this.callDetails = callDetails;
+ this.details = callDetails;
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
@@ -89,11 +89,11 @@ namespace Grpc.Core.Internal
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -130,7 +130,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -138,9 +138,9 @@ namespace Grpc.Core.Internal
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -157,14 +157,14 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
readingDone = true;
unaryResponseTcs = new TaskCompletionSource();
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartClientStreaming(HandleUnaryResponse, metadataArray);
+ call.StartClientStreaming(HandleUnaryResponse, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
@@ -181,16 +181,16 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -206,11 +206,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(!started);
started = true;
- Initialize(callDetails.Channel.Environment.CompletionQueue);
+ Initialize(details.Channel.Environment.CompletionQueue);
- using (var metadataArray = MetadataArraySafeHandle.Create(callDetails.Options.Headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartDuplexStreaming(HandleFinished, metadataArray);
+ call.StartDuplexStreaming(HandleFinished, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -219,9 +219,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
///
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
///
@@ -238,14 +238,14 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
///
- public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
+ public void StartSendCloseFromClient(WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendCloseFromClient(HandleHalfclosed);
+ call.StartSendCloseFromClient(HandleHalfclosed, writeFlags);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@@ -278,6 +278,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
///
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -310,14 +318,14 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
}
private void Initialize(CompletionQueueSafeHandle cq)
{
- var call = callDetails.Channel.Handle.CreateCall(callDetails.Channel.Environment.CompletionRegistry, cq,
- callDetails.Method, callDetails.Host, Timespec.FromDateTime(callDetails.Options.Deadline));
- callDetails.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
InitializeInternal(call);
RegisterCancellationCallback();
}
@@ -325,13 +333,22 @@ namespace Grpc.Core.Internal
// Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
private void RegisterCancellationCallback()
{
- var token = callDetails.Options.CancellationToken;
+ var token = details.Options.CancellationToken;
if (token.CanBeCanceled)
{
token.Register(() => this.Cancel());
}
}
+ ///
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ ///
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
///
/// Handler for unary response completion.
///
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baebd..0c7694e9a59 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
///
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,7 +132,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags);
sendCompletionDelegate = completionDelegate;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee364..8d7bdf65aac 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
///
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
///
@@ -102,7 +102,7 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
///
- public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate completionDelegate)
+ public void StartSendStatusFromServer(Status status, Metadata trailers, WriteFlags writeFlags, AsyncCompletionDelegate completionDelegate)
{
lock (myLock)
{
@@ -111,7 +111,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171f..be1426feb47 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -53,32 +53,32 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
- BatchContextSafeHandle ctx);
+ BatchContextSafeHandle ctx, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -88,6 +88,10 @@ namespace Grpc.Core.Internal
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+
[DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@@ -103,60 +107,60 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
+ grpcsharp_call_start_client_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
- public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
+ grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk();
}
- public void StartSendCloseFromClient(BatchCompletionDelegate callback)
+ public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
+ grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -173,6 +177,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray, writeFlags).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463be..4a146949772 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,24 +40,44 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream : IClientStreamWriter
{
readonly AsyncCall call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource();
- call.StartSendCloseFromClient(taskSource.CompletionDelegate);
+ call.StartSendCloseFromClient(GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f6..34db03cc383 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -304,13 +304,13 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken)
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ newRpc.RequestMetadata, cancellationToken, writeOptionsHolder);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f6..1d79241f776 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
///
/// Writes responses asynchronously to an underlying AsyncCallServer object.
///
- internal class ServerResponseStream : IServerStreamWriter
+ internal class ServerResponseStream : IServerStreamWriter, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer call)
{
@@ -52,15 +53,33 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource();
- call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
+ call.StartSendStatusFromServer(status, trailers, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 032b1390db3..5657eb8513e 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -36,6 +36,8 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
+using Grpc.Core.Internal;
+
namespace Grpc.Core
{
///
@@ -54,8 +56,9 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken)
+ public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, IHasWriteOptions writeOptionsHolder)
{
this.method = method;
this.host = host;
@@ -63,6 +66,7 @@ namespace Grpc.Core
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeOptionsHolder = writeOptionsHolder;
}
/// Name of method called in this RPC.
@@ -141,5 +145,30 @@ namespace Grpc.Core
status = value;
}
}
+
+ ///
+ /// Allows setting write options for the following write.
+ /// For streaming response calls, this property is also exposed as on IServerStreamWriter for convenience.
+ /// Both properties are backed by the same underlying value.
+ ///
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptionsHolder.WriteOptions;
+ }
+ set
+ {
+ writeOptionsHolder.WriteOptions = value;
+ }
+ }
+ }
+
+ ///
+ /// Allows sharing write options between ServerCallContext and other objects.
+ ///
+ public interface IHasWriteOptions
+ {
+ WriteOptions WriteOptions { get; set; }
}
}
diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs
new file mode 100644
index 00000000000..ec4a7dd8cdd
--- /dev/null
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -0,0 +1,83 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+
+namespace Grpc.Core
+{
+ ///
+ /// Flags for write operations.
+ ///
+ [Flags]
+ public enum WriteFlags
+ {
+ ///
+ /// Hint that the write may be buffered and need not go out on the wire immediately.
+ /// gRPC is free to buffer the message until the next non-buffered
+ /// write, or until write stream completion, but it need not buffer completely or at all.
+ ///
+ BufferHint = 0x1,
+
+ ///
+ /// Force compression to be disabled for a particular write.
+ ///
+ NoCompress = 0x2
+ }
+
+
+ ///
+ /// Options for write operations.
+ ///
+ public class WriteOptions
+ {
+ ///
+ /// Default write options.
+ ///
+ public static readonly WriteOptions Default = new WriteOptions();
+
+ private WriteFlags flags;
+
+ public WriteOptions(WriteFlags flags = default(WriteFlags))
+ {
+ this.flags = flags;
+ }
+
+ public WriteFlags Flags
+ {
+ get
+ {
+ return this.flags;
+ }
+ }
+ }
+}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 048887bc12a..165459371b8 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -497,7 +497,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
- grpc_metadata_array *initial_metadata) {
+ grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[6];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -506,15 +506,15 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- ops[2].flags = 0;
+ ops[2].flags = write_flags;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@@ -542,7 +542,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
- grpc_metadata_array *initial_metadata) {
+ grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -551,7 +551,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@@ -578,7 +578,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
- size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
+ size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -587,15 +587,15 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[1].data.send_message = ctx->send_message;
- ops[1].flags = 0;
+ ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- ops[2].flags = 0;
+ ops[2].flags = write_flags;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@@ -619,7 +619,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
- grpc_metadata_array *initial_metadata) {
+ grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[3];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -628,7 +628,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@@ -651,31 +651,31 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
- const char *send_buffer, size_t send_buffer_len) {
+ const char *send_buffer, size_t send_buffer_len, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
- grpcsharp_batch_context *ctx) {
+ grpcsharp_batch_context *ctx, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
- const char *status_details, grpc_metadata_array *trailing_metadata) {
+ const char *status_details, grpc_metadata_array *trailing_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
@@ -688,7 +688,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.count;
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
- ops[0].flags = 0;
+ ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@@ -706,16 +706,29 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[0].data.send_initial_metadata.count = 0;
- ops[0].data.send_initial_metadata.metadata = NULL;
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ ops[0].data.recv_close_on_server.cancelled =
+ (&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
- ops[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- ops[1].data.recv_close_on_server.cancelled =
- (&ctx->recv_close_on_server_cancelled);
- ops[1].flags = 0;
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+}
+
+GPR_EXPORT grpc_call_error GPR_CALLTYPE
+grpcsharp_call_send_initial_metadata(grpc_call *call,
+ grpcsharp_batch_context *ctx,
+ grpc_metadata_array *initial_metadata,
+ gpr_uint32 write_flags) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
+ initial_metadata);
+ ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
+ ops[0].data.send_initial_metadata.metadata =
+ ctx->send_initial_metadata.metadata;
+ ops[0].flags = write_flags;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}