fixed writeOptions and added test

pull/2860/head
Jan Tattermusch 9 years ago
parent c75c57c5af
commit 5321d49b51
  1. 128
      src/csharp/Grpc.Core.Tests/CompressionTest.cs
  2. 1
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  3. 1
      src/csharp/Grpc.Core/CallOptions.cs
  4. 4
      src/csharp/Grpc.Core/IAsyncStreamWriter.cs
  5. 8
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  6. 8
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  7. 30
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  8. 2
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  9. 4
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  10. 29
      src/csharp/ext/grpc_csharp_ext.c

@ -0,0 +1,128 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
{
public class CompressionTest
{
MockServiceHelper helper;
Server server;
Channel channel;
[SetUp]
public void Init()
{
helper = new MockServiceHelper();
server = helper.GetServer();
server.Start();
channel = helper.GetChannel();
}
[TearDown]
public void Cleanup()
{
channel.Dispose();
server.ShutdownAsync().Wait();
}
[TestFixtureTearDown]
public void CleanupClass()
{
GrpcEnvironment.Shutdown();
}
[Test]
public void WriteOptions_Unary()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
{
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
return request;
});
var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
Calls.BlockingUnaryCall(helper.CreateUnaryCall(callOptions), "abc");
}
[Test]
public async Task WriteOptions_DuplexStreaming()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await requestStream.ToList();
context.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await context.WriteResponseHeadersAsync(new Metadata { new Metadata.Entry("ascii-header", "abcdefg") });
await responseStream.WriteAsync("X");
responseStream.WriteOptions = null;
await responseStream.WriteAsync("Y");
responseStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await responseStream.WriteAsync("Z");
});
var callOptions = new CallOptions(writeOptions: new WriteOptions(WriteFlags.NoCompress));
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(callOptions));
// check that write options from call options are propagated to request stream.
Assert.IsTrue((call.RequestStream.WriteOptions.Flags & WriteFlags.NoCompress) != 0);
call.RequestStream.WriteOptions = new WriteOptions();
await call.RequestStream.WriteAsync("A");
call.RequestStream.WriteOptions = null;
await call.RequestStream.WriteAsync("B");
call.RequestStream.WriteOptions = new WriteOptions(WriteFlags.NoCompress);
await call.RequestStream.WriteAsync("C");
await call.RequestStream.CompleteAsync();
await call.ResponseStream.ToList();
}
}
}

@ -79,6 +79,7 @@
<Compile Include="ChannelTest.cs" />
<Compile Include="MockServiceHelper.cs" />
<Compile Include="ResponseHeadersTest.cs" />
<Compile Include="CompressionTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -63,6 +63,7 @@ namespace Grpc.Core
// TODO(jtattermusch): allow null value of deadline?
this.deadline = deadline.HasValue ? deadline.Value : DateTime.MaxValue;
this.cancellationToken = cancellationToken;
this.writeOptions = writeOptions;
}
/// <summary>

@ -56,10 +56,6 @@ namespace Grpc.Core
/// If null, default options will be used.
/// Once set, this property maintains its value across subsequent
/// writes.
/// Internally, closing the stream is on client and sending
/// status from server is treated as a write, so write options
/// are also applied to these operations.
/// </summary>
/// <value>The write options.</value>
WriteOptions WriteOptions { get; set; }
}

@ -165,7 +165,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray, GetWriteFlagsForCall());
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@ -211,7 +211,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray, GetWriteFlagsForCall());
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@ -239,14 +239,14 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendCloseFromClient(WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendCloseFromClient(HandleHalfclosed, writeFlags);
call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;

@ -103,7 +103,7 @@ namespace Grpc.Core.Internal
/// to make things simpler.
/// completionDelegate is invoked upon completion.
/// </summary>
public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
@ -118,7 +118,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags);
call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
}
this.initialMetadataSent = true;
@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
public void StartSendStatusFromServer(Status status, Metadata trailers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
@ -140,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent);
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;

@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
@ -66,7 +66,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
@ -74,11 +74,11 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
BatchContextSafeHandle ctx, WriteFlags writeFlags);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@ -90,7 +90,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@ -121,11 +121,11 @@ namespace Grpc.Core.Internal
.CheckOk();
}
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
@ -135,11 +135,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
@ -149,18 +149,18 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk();
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@ -177,11 +177,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray, writeFlags).CheckOk();
grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
public void Cancel()

@ -58,7 +58,7 @@ namespace Grpc.Core.Internal
public Task CompleteAsync()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(GetWriteFlags(), taskSource.CompletionDelegate);
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}

@ -60,14 +60,14 @@ namespace Grpc.Core.Internal
public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendStatusFromServer(status, trailers, GetWriteFlags(), taskSource.CompletionDelegate);
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate);
call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
return taskSource.Task;
}

@ -506,7 +506,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
@ -514,7 +514,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = write_flags;
ops[2].flags = 0;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -542,7 +542,7 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -551,7 +551,7 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -587,7 +587,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[1].flags = write_flags;
ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[2].flags = write_flags;
ops[2].flags = 0;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -619,7 +619,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -628,7 +628,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
@ -671,11 +671,11 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
grpcsharp_batch_context *ctx, gpr_uint32 write_flags) {
grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[0].flags = write_flags;
ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
@ -683,7 +683,7 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata,
gpr_uint32 write_flags, gpr_int32 send_empty_initial_metadata) {
gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[2];
size_t nops = send_empty_initial_metadata ? 2 : 1;
@ -697,7 +697,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.count;
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
@ -731,8 +731,7 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_initial_metadata(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata,
gpr_uint32 write_flags) {
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
@ -741,7 +740,7 @@ grpcsharp_call_send_initial_metadata(grpc_call *call,
ops[0].data.send_initial_metadata.count = ctx->send_initial_metadata.count;
ops[0].data.send_initial_metadata.metadata =
ctx->send_initial_metadata.metadata;
ops[0].flags = write_flags;
ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}

Loading…
Cancel
Save