From 988ca514b9cd87c7010bfafe329b9495603e9b7b Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 29 Jul 2019 18:53:00 -0700 Subject: [PATCH] serialization to IBufferWriter attempt 1 --- .../Grpc.Core.Api/SerializationContext.cs | 18 +++++ .../Internal/DefaultSerializationContext.cs | 74 +++++++++++++++++-- .../Internal/NativeMethods.Generated.cs | 33 +++++++++ src/csharp/ext/grpc_csharp_ext.c | 61 +++++++++++++++ .../runtimes/grpc_csharp_ext_dummy_stubs.c | 12 +++ .../Grpc.Core/Internal/native_methods.include | 3 + 6 files changed, 196 insertions(+), 5 deletions(-) diff --git a/src/csharp/Grpc.Core.Api/SerializationContext.cs b/src/csharp/Grpc.Core.Api/SerializationContext.cs index 9aef2adbcd5..79107040411 100644 --- a/src/csharp/Grpc.Core.Api/SerializationContext.cs +++ b/src/csharp/Grpc.Core.Api/SerializationContext.cs @@ -17,6 +17,7 @@ #endregion using System; +using System.Buffers; namespace Grpc.Core { @@ -35,5 +36,22 @@ namespace Grpc.Core { throw new NotImplementedException(); } + + /// + /// Expose serializer as buffer writer + /// + public virtual IBufferWriter GetBufferWriter() + { + throw new NotImplementedException(); + } + + /// + /// Complete the payload written so far. + /// + public virtual void Complete() + { + throw new NotImplementedException(); + + } } } diff --git a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs index cceb194879e..9098850a52e 100644 --- a/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs +++ b/src/csharp/Grpc.Core/Internal/DefaultSerializationContext.cs @@ -17,6 +17,8 @@ #endregion using Grpc.Core.Utils; +using System; +using System.Buffers; using System.Threading; namespace Grpc.Core.Internal @@ -27,7 +29,8 @@ namespace Grpc.Core.Internal new ThreadLocal(() => new DefaultSerializationContext(), false); bool isComplete; - byte[] payload; + //byte[] payload; + NativeBufferWriter bufferWriter; public DefaultSerializationContext() { @@ -38,18 +41,48 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckState(!isComplete); this.isComplete = true; - this.payload = payload; + + GetBufferWriter(); + var destSpan = bufferWriter.GetSpan(payload.Length); + payload.AsSpan().CopyTo(destSpan); + bufferWriter.Advance(payload.Length); + bufferWriter.Complete(); + //this.payload = payload; + } + + /// + /// Expose serializer as buffer writer + /// + public override IBufferWriter GetBufferWriter() + { + if (bufferWriter == null) + { + // TODO: avoid allocation.. + bufferWriter = new NativeBufferWriter(); + } + return bufferWriter; + } + + /// + /// Complete the payload written so far. + /// + public override void Complete() + { + GrpcPreconditions.CheckState(!isComplete); + bufferWriter.Complete(); + this.isComplete = true; } - internal byte[] GetPayload() + internal SliceBufferSafeHandle GetPayload() { - return this.payload; + return bufferWriter.GetSliceBuffer(); } public void Reset() { this.isComplete = false; - this.payload = null; + //this.payload = null; + this.bufferWriter = null; } public static DefaultSerializationContext GetInitializedThreadLocal() @@ -58,5 +91,36 @@ namespace Grpc.Core.Internal instance.Reset(); return instance; } + + private class NativeBufferWriter : IBufferWriter + { + private SliceBufferSafeHandle sliceBuffer = SliceBufferSafeHandle.Create(); + + public void Advance(int count) + { + sliceBuffer.Advance(count); + } + + public Memory GetMemory(int sizeHint = 0) + { + // TODO: implement + throw new NotImplementedException(); + } + + public Span GetSpan(int sizeHint = 0) + { + return sliceBuffer.GetSpan(sizeHint); + } + + public void Complete() + { + sliceBuffer.Complete(); + } + + public SliceBufferSafeHandle GetSliceBuffer() + { + return sliceBuffer; + } + } } } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs index ca3dfe97ce1..b23170376d4 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs @@ -122,6 +122,9 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_auth_context_property_iterator_delegate grpcsharp_auth_context_property_iterator; public readonly Delegates.grpcsharp_auth_property_iterator_next_delegate grpcsharp_auth_property_iterator_next; public readonly Delegates.grpcsharp_auth_context_release_delegate grpcsharp_auth_context_release; + public readonly Delegates.grpcsharp_slice_buffer_create_delegate grpcsharp_slice_buffer_create; + public readonly Delegates.grpcsharp_slice_buffer_adjust_tail_space_delegate grpcsharp_slice_buffer_adjust_tail_space; + public readonly Delegates.grpcsharp_slice_buffer_destroy_delegate grpcsharp_slice_buffer_destroy; public readonly Delegates.gprsharp_now_delegate gprsharp_now; public readonly Delegates.gprsharp_inf_future_delegate gprsharp_inf_future; public readonly Delegates.gprsharp_inf_past_delegate gprsharp_inf_past; @@ -224,6 +227,9 @@ namespace Grpc.Core.Internal this.grpcsharp_auth_context_property_iterator = GetMethodDelegate(library); this.grpcsharp_auth_property_iterator_next = GetMethodDelegate(library); this.grpcsharp_auth_context_release = GetMethodDelegate(library); + this.grpcsharp_slice_buffer_create = GetMethodDelegate(library); + this.grpcsharp_slice_buffer_adjust_tail_space = GetMethodDelegate(library); + this.grpcsharp_slice_buffer_destroy = GetMethodDelegate(library); this.gprsharp_now = GetMethodDelegate(library); this.gprsharp_inf_future = GetMethodDelegate(library); this.gprsharp_inf_past = GetMethodDelegate(library); @@ -325,6 +331,9 @@ namespace Grpc.Core.Internal this.grpcsharp_auth_context_property_iterator = DllImportsFromStaticLib.grpcsharp_auth_context_property_iterator; this.grpcsharp_auth_property_iterator_next = DllImportsFromStaticLib.grpcsharp_auth_property_iterator_next; this.grpcsharp_auth_context_release = DllImportsFromStaticLib.grpcsharp_auth_context_release; + this.grpcsharp_slice_buffer_create = DllImportsFromStaticLib.grpcsharp_slice_buffer_create; + this.grpcsharp_slice_buffer_adjust_tail_space = DllImportsFromStaticLib.grpcsharp_slice_buffer_adjust_tail_space; + this.grpcsharp_slice_buffer_destroy = DllImportsFromStaticLib.grpcsharp_slice_buffer_destroy; this.gprsharp_now = DllImportsFromStaticLib.gprsharp_now; this.gprsharp_inf_future = DllImportsFromStaticLib.gprsharp_inf_future; this.gprsharp_inf_past = DllImportsFromStaticLib.gprsharp_inf_past; @@ -426,6 +435,9 @@ namespace Grpc.Core.Internal this.grpcsharp_auth_context_property_iterator = DllImportsFromSharedLib.grpcsharp_auth_context_property_iterator; this.grpcsharp_auth_property_iterator_next = DllImportsFromSharedLib.grpcsharp_auth_property_iterator_next; this.grpcsharp_auth_context_release = DllImportsFromSharedLib.grpcsharp_auth_context_release; + this.grpcsharp_slice_buffer_create = DllImportsFromSharedLib.grpcsharp_slice_buffer_create; + this.grpcsharp_slice_buffer_adjust_tail_space = DllImportsFromSharedLib.grpcsharp_slice_buffer_adjust_tail_space; + this.grpcsharp_slice_buffer_destroy = DllImportsFromSharedLib.grpcsharp_slice_buffer_destroy; this.gprsharp_now = DllImportsFromSharedLib.gprsharp_now; this.gprsharp_inf_future = DllImportsFromSharedLib.gprsharp_inf_future; this.gprsharp_inf_past = DllImportsFromSharedLib.gprsharp_inf_past; @@ -530,6 +542,9 @@ namespace Grpc.Core.Internal public delegate AuthContextSafeHandle.NativeAuthPropertyIterator grpcsharp_auth_context_property_iterator_delegate(AuthContextSafeHandle authContext); public delegate IntPtr grpcsharp_auth_property_iterator_next_delegate(ref AuthContextSafeHandle.NativeAuthPropertyIterator iterator); // returns const auth_property* public delegate void grpcsharp_auth_context_release_delegate(IntPtr authContext); + public delegate SliceBufferSafeHandle grpcsharp_slice_buffer_create_delegate(); + public delegate IntPtr grpcsharp_slice_buffer_adjust_tail_space_delegate(SliceBufferSafeHandle sliceBuffer, UIntPtr availableTailSpace, UIntPtr requestedTailSpace); + public delegate void grpcsharp_slice_buffer_destroy_delegate(IntPtr sliceBuffer); public delegate Timespec gprsharp_now_delegate(ClockType clockType); public delegate Timespec gprsharp_inf_future_delegate(ClockType clockType); public delegate Timespec gprsharp_inf_past_delegate(ClockType clockType); @@ -812,6 +827,15 @@ namespace Grpc.Core.Internal [DllImport(ImportName)] public static extern void grpcsharp_auth_context_release(IntPtr authContext); + [DllImport(ImportName)] + public static extern SliceBufferSafeHandle grpcsharp_slice_buffer_create(); + + [DllImport(ImportName)] + public static extern IntPtr grpcsharp_slice_buffer_adjust_tail_space(SliceBufferSafeHandle sliceBuffer, UIntPtr availableTailSpace, UIntPtr requestedTailSpace); + + [DllImport(ImportName)] + public static extern void grpcsharp_slice_buffer_destroy(IntPtr sliceBuffer); + [DllImport(ImportName)] public static extern Timespec gprsharp_now(ClockType clockType); @@ -1111,6 +1135,15 @@ namespace Grpc.Core.Internal [DllImport(ImportName)] public static extern void grpcsharp_auth_context_release(IntPtr authContext); + [DllImport(ImportName)] + public static extern SliceBufferSafeHandle grpcsharp_slice_buffer_create(); + + [DllImport(ImportName)] + public static extern IntPtr grpcsharp_slice_buffer_adjust_tail_space(SliceBufferSafeHandle sliceBuffer, UIntPtr availableTailSpace, UIntPtr requestedTailSpace); + + [DllImport(ImportName)] + public static extern void grpcsharp_slice_buffer_destroy(IntPtr sliceBuffer); + [DllImport(ImportName)] public static extern Timespec gprsharp_now(ClockType clockType); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 02e219ead87..e271494c942 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -1182,6 +1182,67 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_redirect_log(grpcsharp_log_func func) { typedef void(GPR_CALLTYPE* test_callback_funcptr)(int32_t success); +/* Slice buffer functionality */ +GPR_EXPORT grpc_slice_buffer* GPR_CALLTYPE +grpcsharp_slice_buffer_create() { + grpc_slice_buffer* slice_buffer = (grpc_slice_buffer*)gpr_malloc(sizeof(grpc_slice_buffer)); + grpc_slice_buffer_init(slice_buffer); + return slice_buffer; +} + +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_slice_buffer_destroy(grpc_slice_buffer* buffer) { + grpc_slice_buffer_destroy(buffer); + gpr_free(buffer); +} + +GPR_EXPORT grpc_byte_buffer* GPR_CALLTYPE +grpcsharp_create_byte_buffer_from_stolen_slices(grpc_slice_buffer* slice_buffer) { + grpc_byte_buffer* bb = + (grpc_byte_buffer*)gpr_malloc(sizeof(grpc_byte_buffer)); + memset(bb, 0, sizeof(grpc_byte_buffer)); + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = GRPC_COMPRESS_NONE; + bb->data.raw.slice_buffer = *slice_buffer; + // TODO: just use move slice_buffer... + + // we transferred the ownership of members from the slice buffer to the + // the internals of byte buffer, so we just overwrite the original slice buffer with + // default values. + grpc_slice_buffer_init(slice_buffer); // TODO: need to reset available_tail_space after this.. + return bb; +} + +GPR_EXPORT void* GPR_CALLTYPE +grpcsharp_slice_buffer_adjust_tail_space(grpc_slice_buffer* buffer, size_t available_tail_space, + size_t requested_tail_space) { + + // TODO: what if available_tail_space == requested_tail_space == 0 + + if (available_tail_space >= requested_tail_space) + { + // TODO: should this be allowed at all? + grpc_slice_buffer garbage; + grpc_slice_buffer_trim_end(buffer, available_tail_space - requested_tail_space, &garbage); + grpc_slice_buffer_reset_and_unref(&garbage); + } + else + { + if (available_tail_space > 0) + { + grpc_slice_buffer garbage; + grpc_slice_buffer_trim_end(buffer, available_tail_space, &garbage); + grpc_slice_buffer_reset_and_unref(&garbage); + } + + grpc_slice new_slice = grpc_slice_malloc(requested_tail_space); + grpc_slice_buffer_add(buffer, new_slice); + } + + grpc_slice* last_slice = &(buffer->slices[buffer->count - 1]); + return GRPC_SLICE_END_PTR(*last_slice) - requested_tail_space; +} + /* Version info */ GPR_EXPORT const char* GPR_CALLTYPE grpcsharp_version_string() { return grpc_version_string(); diff --git a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c index 8ff26e4ca4d..bc312bc6daf 100644 --- a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c +++ b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c @@ -374,6 +374,18 @@ void grpcsharp_auth_context_release() { fprintf(stderr, "Should never reach here"); abort(); } +void grpcsharp_slice_buffer_create() { + fprintf(stderr, "Should never reach here"); + abort(); +} +void grpcsharp_slice_buffer_adjust_tail_space() { + fprintf(stderr, "Should never reach here"); + abort(); +} +void grpcsharp_slice_buffer_destroy() { + fprintf(stderr, "Should never reach here"); + abort(); +} void gprsharp_now() { fprintf(stderr, "Should never reach here"); abort(); diff --git a/templates/src/csharp/Grpc.Core/Internal/native_methods.include b/templates/src/csharp/Grpc.Core/Internal/native_methods.include index ed7c93a7479..d8f94744723 100644 --- a/templates/src/csharp/Grpc.Core/Internal/native_methods.include +++ b/templates/src/csharp/Grpc.Core/Internal/native_methods.include @@ -88,6 +88,9 @@ native_method_signatures = [ 'AuthContextSafeHandle.NativeAuthPropertyIterator grpcsharp_auth_context_property_iterator(AuthContextSafeHandle authContext)', 'IntPtr grpcsharp_auth_property_iterator_next(ref AuthContextSafeHandle.NativeAuthPropertyIterator iterator) // returns const auth_property*', 'void grpcsharp_auth_context_release(IntPtr authContext)', + 'SliceBufferSafeHandle grpcsharp_slice_buffer_create()', + 'IntPtr grpcsharp_slice_buffer_adjust_tail_space(SliceBufferSafeHandle sliceBuffer, UIntPtr availableTailSpace, UIntPtr requestedTailSpace)', + 'void grpcsharp_slice_buffer_destroy(IntPtr sliceBuffer)', 'Timespec gprsharp_now(ClockType clockType)', 'Timespec gprsharp_inf_future(ClockType clockType)', 'Timespec gprsharp_inf_past(ClockType clockType)',