support for reading response headers on client side

pull/3029/head
Jan Tattermusch 10 years ago
parent 3af838a2d7
commit 4c25efa519
  1. 1
      src/csharp/.gitignore
  2. 11
      src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
  3. 58
      src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
  4. 16
      src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
  5. 16
      src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
  6. 4
      src/csharp/Grpc.Core/Calls.cs
  7. 1
      src/csharp/Grpc.Core/Channel.cs
  8. 10
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  9. 11
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  10. 4
      src/csharp/Grpc.Core/Internal/INativeCall.cs
  11. 55
      src/csharp/ext/grpc_csharp_ext.c

@ -5,4 +5,5 @@ test-results
packages
Grpc.v12.suo
TestResult.xml
/TestResults
*.nupkg

@ -137,6 +137,12 @@ namespace Grpc.Core.Internal.Tests
set;
}
public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
{
get;
set;
}
public SendCompletionHandler SendCompletionHandler
{
get;
@ -206,6 +212,11 @@ namespace Grpc.Core.Internal.Tests
ReceivedMessageHandler = callback;
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
ReceivedResponseHeadersHandler = callback;
}
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;

@ -32,13 +32,16 @@
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Tests
@ -92,6 +95,61 @@ namespace Grpc.Core.Tests
Assert.AreEqual("PASS", await call.ResponseAsync);
}
[Test]
public async Task ResponseHeadersAsync_ClientStreamingCall()
{
helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
{
await context.WriteResponseHeadersAsync(headers);
return "PASS";
});
var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
await call.RequestStream.CompleteAsync();
var responseHeaders = await call.ResponseHeadersAsync;
Assert.AreEqual("ascii-header", responseHeaders[0].Key);
Assert.AreEqual("PASS", await call.ResponseAsync);
}
[Test]
public async Task ResponseHeadersAsync_ServerStreamingCall()
{
helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
{
await context.WriteResponseHeadersAsync(headers);
await responseStream.WriteAsync("PASS");
});
var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
var responseHeaders = await call.ResponseHeadersAsync;
Assert.AreEqual("ascii-header", responseHeaders[0].Key);
CollectionAssert.AreEqual(new [] { "PASS" }, await call.ResponseStream.ToListAsync());
}
[Test]
public async Task ResponseHeadersAsync_DuplexStreamingCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
{
await context.WriteResponseHeadersAsync(headers);
while (await requestStream.MoveNext())
{
await responseStream.WriteAsync(requestStream.Current);
}
});
var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
var responseHeaders = await call.ResponseHeadersAsync;
var messages = new[] { "PASS" };
await call.RequestStream.WriteAllAsync(messages);
Assert.AreEqual("ascii-header", responseHeaders[0].Key);
CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync());
}
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{

@ -32,6 +32,7 @@
#endregion
using System;
using System.Threading.Tasks;
namespace Grpc.Core
{
@ -42,14 +43,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@ -77,6 +80,17 @@ namespace Grpc.Core
}
}
/// <summary>
/// Asynchronous access to response headers.
/// </summary>
public Task<Metadata> ResponseHeadersAsync
{
get
{
return this.responseHeadersAsync;
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.

@ -32,6 +32,7 @@
#endregion
using System;
using System.Threading.Tasks;
namespace Grpc.Core
{
@ -41,13 +42,15 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@ -64,6 +67,17 @@ namespace Grpc.Core
}
}
/// <summary>
/// Asynchronous access to response headers.
/// </summary>
public Task<Metadata> ResponseHeadersAsync
{
get
{
return this.responseHeadersAsync;
}
}
/// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.

@ -93,7 +93,7 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
@ -130,7 +130,7 @@ namespace Grpc.Core
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
}
}

@ -58,7 +58,6 @@ namespace Grpc.Core
readonly List<ChannelOption> options;
bool shutdownRequested;
bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.

@ -199,6 +199,7 @@ namespace Grpc.Core.Internal
{
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@ -219,6 +220,7 @@ namespace Grpc.Core.Internal
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@ -362,6 +364,14 @@ namespace Grpc.Core.Internal
return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
responseHeadersTcs.SetResult(responseHeaders);
}
/// <summary>
/// Handler for unary response completion.
/// </summary>

@ -86,6 +86,10 @@ namespace Grpc.Core.Internal
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
@ -172,6 +176,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
var ctx = BatchContextSafeHandle.Create();

@ -40,6 +40,8 @@ namespace Grpc.Core.Internal
internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
internal delegate void SendCompletionHandler(bool success);
internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
@ -67,6 +69,8 @@ namespace Grpc.Core.Internal
void StartReceiveMessage(ReceivedMessageHandler callback);
void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata);

@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
grpc_op ops[5];
grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -615,23 +615,18 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[2].flags = 0;
ops[2].reserved = NULL;
ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[3].flags = 0;
ops[3].reserved = NULL;
ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[4].data.recv_status_on_client.trailing_metadata =
ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[3].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[4].data.recv_status_on_client.status =
ops[3].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[4].data.recv_status_on_client.status_details =
ops[3].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[4].data.recv_status_on_client.status_details_capacity =
ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[4].flags = 0;
ops[4].reserved = NULL;
ops[3].flags = 0;
ops[3].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@ -642,7 +637,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
grpc_op ops[2];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -652,28 +647,36 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].flags = 0;
ops[0].reserved = NULL;
ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[1].flags = 0;
ops[1].reserved = NULL;
ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[2].data.recv_status_on_client.trailing_metadata =
ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
ops[1].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
ops[2].data.recv_status_on_client.status =
ops[1].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
ops[2].data.recv_status_on_client.status_details =
ops[1].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
ops[2].data.recv_status_on_client.status_details_capacity =
ops[1].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
ops[2].flags = 0;
ops[2].reserved = NULL;
ops[1].flags = 0;
ops[1].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
ops[0].flags = 0;
ops[0].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,

Loading…
Cancel
Save