implemented sending initial metadata

pull/2860/head
Jan Tattermusch 10 years ago
parent bff90ac384
commit 8368b2e4b9
  1. 1
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  2. 8
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  3. 30
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  4. 14
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  5. 1
      src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
  6. 6
      src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
  7. 8
      src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
  8. 17
      src/csharp/Grpc.Core/ServerCallContext.cs
  9. 25
      src/csharp/ext/grpc_csharp_ext.c

@ -64,6 +64,7 @@ namespace Grpc.Core.Internal
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails;
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but

@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
protected bool initialMetadataSent;
protected long streamingWritesCounter;
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(HandleSendFinished, payload, writeFlags);
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
}
}

@ -97,6 +97,34 @@ namespace Grpc.Core.Internal
StartReadMessageInternal(completionDelegate);
}
/// <summary>
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
/// completionDelegate is invoked upon completion.
/// </summary>
public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
Preconditions.CheckState(streamingWritesCounter > 0, "Response headers can only be sent before the first write starts.");
CheckSendingAllowed();
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags);
}
this.initialMetadataSent = true;
sendCompletionDelegate = completionDelegate;
}
}
/// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
@ -111,7 +139,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags);
call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;

@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@ -142,11 +142,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags)
public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk();
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
@ -156,11 +156,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
}
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk();
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@ -177,7 +177,7 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);

@ -68,6 +68,7 @@ namespace Grpc.Core.Internal
{
return this.writeOptions;
}
set
{
writeOptions = value;

@ -304,13 +304,15 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken)
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, writeOptionsHolder);
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}

@ -64,12 +64,20 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
public WriteOptions WriteOptions
{
get
{
return writeOptions;
}
set
{
writeOptions = value;

@ -43,10 +43,8 @@ namespace Grpc.Core
/// <summary>
/// Context for a server-side call.
/// </summary>
public sealed class ServerCallContext
public class ServerCallContext
{
// TODO(jtattermusch): expose method to send initial metadata back to client
private readonly string method;
private readonly string host;
private readonly string peer;
@ -56,9 +54,11 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
private Func<Metadata, Task> writeHeadersFunc;
private IHasWriteOptions writeOptionsHolder;
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, IHasWriteOptions writeOptionsHolder)
public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
this.method = method;
this.host = host;
@ -66,8 +66,14 @@ namespace Grpc.Core
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsHolder = writeOptionsHolder;
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
return writeHeadersFunc(responseHeaders);
}
/// <summary>Name of method called in this RPC.</summary>
public string Method
@ -114,7 +120,7 @@ namespace Grpc.Core
}
}
///<summary>Cancellation token signals when call is cancelled.</summary>
/// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@ -157,6 +163,7 @@ namespace Grpc.Core
{
return writeOptionsHolder.WriteOptions;
}
set
{
writeOptionsHolder.WriteOptions = value;

@ -651,15 +651,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len, gpr_uint32 write_flags) {
const char *send_buffer, size_t send_buffer_len,
gpr_uint32 write_flags,
gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpc_op ops[2];
size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@ -675,9 +682,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata, gpr_uint32 write_flags) {
const char *status_details, grpc_metadata_array *trailing_metadata,
gpr_uint32 write_flags, gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpc_op ops[2];
size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@ -689,8 +698,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = write_flags;
ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[1].data.send_initial_metadata.count = 0;
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE

Loading…
Cancel
Save