Merge github.com:grpc/grpc into break-break-break-the-locks

pull/1993/head
Craig Tiller 10 years ago
commit 0d911aad69
  1. 4
      BUILD
  2. 2
      Makefile
  3. 2
      build.json
  4. 1
      include/grpc++/byte_buffer.h
  5. 68
      include/grpc/byte_buffer.h
  6. 11
      include/grpc/byte_buffer_reader.h
  7. 8
      include/grpc/census.h
  8. 6
      include/grpc/compression.h
  9. 29
      include/grpc/grpc.h
  10. 6
      include/grpc/grpc_security.h
  11. 8
      include/grpc/support/tls_pthread.h
  12. 2
      src/core/compression/algorithm.c
  13. 2
      src/core/compression/message_compress.h
  14. 32
      src/core/surface/byte_buffer.c
  15. 56
      src/core/surface/byte_buffer_reader.c
  16. 8
      src/core/surface/call.c
  17. 66
      src/core/transport/chttp2_transport.c
  18. 4
      src/cpp/proto/proto_utils.cc
  19. 2
      src/cpp/util/byte_buffer.cc
  20. 2
      src/csharp/Grpc.Core.Tests/ClientServerTest.cs
  21. 4
      src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
  22. 52
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueEventTest.cs
  23. 64
      src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
  24. 8
      src/csharp/Grpc.Core.Tests/PInvokeTest.cs
  25. 36
      src/csharp/Grpc.Core/Channel.cs
  26. 4
      src/csharp/Grpc.Core/Grpc.Core.csproj
  27. 31
      src/csharp/Grpc.Core/GrpcEnvironment.cs
  28. 35
      src/csharp/Grpc.Core/Internal/AsyncCall.cs
  29. 43
      src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
  30. 8
      src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
  31. 44
      src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
  32. 104
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  33. 60
      src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
  34. 14
      src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
  35. 88
      src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
  36. 2
      src/csharp/Grpc.Core/Internal/DebugStats.cs
  37. 40
      src/csharp/Grpc.Core/Internal/Enums.cs
  38. 20
      src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
  39. 25
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  40. 2
      src/csharp/Grpc.Core/Internal/Timespec.cs
  41. 55
      src/csharp/Grpc.Core/Server.cs
  42. 2
      src/csharp/Grpc.Examples.MathClient/MathClient.cs
  43. 2
      src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
  44. 4
      src/csharp/Grpc.IntegrationTesting/InteropClient.cs
  45. 2
      src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
  46. 114
      src/csharp/ext/grpc_csharp_ext.c
  47. 2
      src/node/ext/byte_buffer.cc
  48. 2
      src/node/package.json
  49. 4
      src/node/src/common.js
  50. 90
      src/node/test/common_test.js
  51. 38
      src/node/test/test_messages.proto
  52. 4
      src/objective-c/GRPCClient/private/NSData+GRPC.m
  53. 2
      src/php/ext/grpc/byte_buffer.c
  54. 2
      src/python/src/grpc/_adapter/_c/utility.c
  55. 2
      src/ruby/ext/grpc/rb_byte_buffer.c
  56. 3
      test/core/end2end/cq_verifier.c
  57. 4
      test/core/end2end/tests/cancel_after_accept.c
  58. 4
      test/core/end2end/tests/cancel_after_accept_and_writes_closed.c
  59. 2
      test/core/end2end/tests/cancel_after_invoke.c
  60. 2
      test/core/end2end/tests/cancel_before_invoke.c
  61. 4
      test/core/end2end/tests/invoke_large_request.c
  62. 2
      test/core/end2end/tests/max_message_length.c
  63. 4
      test/core/end2end/tests/ping_pong_streaming.c
  64. 4
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  65. 4
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  66. 4
      test/core/end2end/tests/request_response_with_payload.c
  67. 4
      test/core/end2end/tests/request_response_with_payload_and_call_creds.c
  68. 4
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  69. 2
      test/core/end2end/tests/request_with_large_metadata.c
  70. 2
      test/core/end2end/tests/request_with_payload.c
  71. 2
      test/core/fling/client.c
  72. 79
      test/core/surface/byte_buffer_reader_test.c
  73. 6
      test/cpp/qps/client_async.cc
  74. 5
      test/cpp/qps/qps_driver.cc
  75. 2
      test/cpp/qps/qps_test.cc
  76. 4
      test/cpp/qps/qps_test_openloop.cc
  77. 4
      test/cpp/qps/qps_worker.cc
  78. 114
      test/cpp/qps/qpstest.proto
  79. 3
      test/cpp/qps/server_async.cc
  80. 2
      tools/doxygen/Doxyfile.core
  81. 2
      tools/doxygen/Doxyfile.core.internal
  82. 2
      vsprojects/grpc/grpc.vcxproj
  83. 6
      vsprojects/grpc/grpc.vcxproj.filters
  84. 2
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj
  85. 6
      vsprojects/grpc_unsecure/grpc_unsecure.vcxproj.filters

@ -154,7 +154,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",
@ -347,6 +346,7 @@ cc_library(
"include/grpc/grpc_security.h",
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
"include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/census.h",
@ -375,7 +375,6 @@ cc_library(
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",
@ -545,6 +544,7 @@ cc_library(
hdrs = [
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
"include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h",
"include/grpc/census.h",

@ -3044,6 +3044,7 @@ PUBLIC_HEADERS_C += \
include/grpc/grpc_security.h \
include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/census.h \
@ -3286,6 +3287,7 @@ LIBGRPC_UNSECURE_SRC = \
PUBLIC_HEADERS_C += \
include/grpc/byte_buffer.h \
include/grpc/byte_buffer_reader.h \
include/grpc/compression.h \
include/grpc/grpc.h \
include/grpc/status.h \
include/grpc/census.h \

@ -101,6 +101,7 @@
"public_headers": [
"include/grpc/byte_buffer.h",
"include/grpc/byte_buffer_reader.h",
"include/grpc/compression.h",
"include/grpc/grpc.h",
"include/grpc/status.h"
],
@ -115,7 +116,6 @@
"src/core/channel/http_client_filter.h",
"src/core/channel/http_server_filter.h",
"src/core/channel/noop_filter.h",
"src/core/compression/algorithm.h",
"src/core/compression/message_compress.h",
"src/core/debug/trace.h",
"src/core/iomgr/alarm.h",

@ -35,6 +35,7 @@
#define GRPCXX_BYTE_BUFFER_H
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/log.h>
#include <grpc++/config.h>
#include <grpc++/slice.h>

@ -34,17 +34,77 @@
#ifndef GRPC_BYTE_BUFFER_H
#define GRPC_BYTE_BUFFER_H
#include <grpc/grpc.h>
#include <grpc/compression.h>
#include <grpc/support/slice_buffer.h>
typedef enum { GRPC_BB_SLICE_BUFFER } grpc_byte_buffer_type;
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
GRPC_BB_RAW
/* Future types may include GRPC_BB_PROTOBUF, etc. */
} grpc_byte_buffer_type;
/* byte buffers are containers for messages passed in from the public api's */
struct grpc_byte_buffer {
grpc_byte_buffer_type type;
union {
gpr_slice_buffer slice_buffer;
struct {
grpc_compression_algorithm compression;
gpr_slice_buffer slice_buffer;
} raw;
} data;
};
typedef struct grpc_byte_buffer grpc_byte_buffer;
/** Returns a RAW byte buffer instance over the given slices (up to \a nslices).
*
* Increases the reference count for all \a slices processed. The user is
* responsible for invoking grpc_byte_buffer_destroy on the returned instance.*/
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
size_t nslices);
/** Returns a *compressed* RAW byte buffer instance over the given slices (up to
* \a nslices). The \a compression argument defines the compression algorithm
* used to generate the data in \a slices.
*
* Increases the reference count for all \a slices processed. The user is
* responsible for invoking grpc_byte_buffer_destroy on the returned instance.*/
grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression);
/** Copies input byte buffer \a bb.
*
* Increases the reference count of all the source slices. The user is
* responsible for calling grpc_byte_buffer_destroy over the returned copy. */
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
/** Returns the size of the given byte buffer, in bytes. */
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
/** Destroys \a byte_buffer deallocating all its memory. */
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
/** Reader for byte buffers. Iterates over slices in the byte buffer */
struct grpc_byte_buffer_reader;
typedef struct grpc_byte_buffer_reader grpc_byte_buffer_reader;
/** Initialize \a reader to read over \a buffer */
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer);
/** Cleanup and destroy \a reader */
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
/** Updates \a slice with the next piece of data from from \a reader and returns
* 1. Returns 0 at the end of the stream. Caller is responsible for calling
* gpr_slice_unref on the result. */
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice);
#ifdef __cplusplus
}
#endif
#endif /* GRPC_BYTE_BUFFER_H */

@ -37,8 +37,13 @@
#include <grpc/grpc.h>
#include <grpc/byte_buffer.h>
#ifdef __cplusplus
extern "C" {
#endif
struct grpc_byte_buffer_reader {
grpc_byte_buffer *buffer;
grpc_byte_buffer *buffer_in;
grpc_byte_buffer *buffer_out;
/* Different current objects correspond to different types of byte buffers */
union {
/* Index into a slice buffer's array of slices */
@ -46,4 +51,8 @@ struct grpc_byte_buffer_reader {
} current;
};
#ifdef __cplusplus
}
#endif
#endif /* GRPC_BYTE_BUFFER_READER_H */

@ -40,6 +40,10 @@
#include <grpc/grpc.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Identify census functionality that can be enabled via census_initialize(). */
enum census_functions {
CENSUS_NONE = 0, /* Do not enable census. */
@ -92,4 +96,8 @@ int census_context_deserialize(const char *buffer, census_context **context);
* future census calls will result in undefined behavior. */
void census_context_destroy(census_context *context);
#ifdef __cplusplus
}
#endif
#endif /* CENSUS_CENSUS_H */

@ -31,8 +31,8 @@
*
*/
#ifndef GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
#define GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H
#ifndef GRPC_COMPRESSION_H
#define GRPC_COMPRESSION_H
/* The various compression algorithms supported by GRPC */
typedef enum {
@ -46,4 +46,4 @@ typedef enum {
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm);
#endif /* GRPC_INTERNAL_CORE_COMPRESSION_ALGORITHM_H */
#endif /* GRPC_COMPRESSION_H */

@ -37,6 +37,7 @@
#include <grpc/status.h>
#include <stddef.h>
#include <grpc/byte_buffer.h>
#include <grpc/support/slice.h>
#include <grpc/support/time.h>
@ -155,34 +156,6 @@ typedef enum grpc_call_error {
(start_write/add_metadata). Illegal on invoke/accept. */
#define GRPC_WRITE_NO_COMPRESS (0x00000002u)
/* A buffer of bytes */
struct grpc_byte_buffer;
typedef struct grpc_byte_buffer grpc_byte_buffer;
/* Sample helpers to obtain byte buffers (these will certainly move
someplace else) */
grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices);
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb);
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb);
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer);
/* Reader for byte buffers. Iterates over slices in the byte buffer */
struct grpc_byte_buffer_reader;
typedef struct grpc_byte_buffer_reader grpc_byte_buffer_reader;
/** Initialize \a reader to read over \a buffer */
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer);
/** Cleanup and destroy \a reader */
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
/* At the end of the stream, returns 0. Otherwise, returns 1 and sets slice to
be the returned slice. Caller is responsible for calling gpr_slice_unref on
the result. */
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice);
/* A single metadata element */
typedef struct grpc_metadata {
const char *key;

@ -34,8 +34,8 @@
#ifndef GRPC_GRPC_SECURITY_H
#define GRPC_GRPC_SECURITY_H
#include "grpc.h"
#include "status.h"
#include <grpc/grpc.h>
#include <grpc/status.h>
#ifdef __cplusplus
extern "C" {
@ -117,7 +117,7 @@ grpc_credentials *grpc_service_account_credentials_create(
grpc_credentials *grpc_jwt_credentials_create(const char *json_key,
gpr_timespec token_lifetime);
/* Creates an Oauth2 Refresh Token crednetials object. May return NULL if the
/* Creates an Oauth2 Refresh Token credentials object. May return NULL if the
input is invalid.
WARNING: Do NOT use this credentials to connect to a non-google service as
this could result in an oauth2 token leak.

@ -49,7 +49,13 @@ struct gpr_pthread_thread_local {
#define gpr_tls_init(tls) GPR_ASSERT(0 == pthread_key_create(&(tls)->key, NULL))
#define gpr_tls_destroy(tls) pthread_key_delete((tls)->key)
gpr_intptr gpr_tls_set(struct gpr_pthread_thread_local *tls, gpr_intptr value);
#define gpr_tls_get(tls) ((gpr_intptr)pthread_getspecific((tls)->key))
#ifdef __cplusplus
extern "C" {
#endif
gpr_intptr gpr_tls_set(struct gpr_pthread_thread_local *tls, gpr_intptr value);
#ifdef __cplusplus
}
#endif
#endif

@ -31,7 +31,7 @@
*
*/
#include "src/core/compression/algorithm.h"
#include <grpc/compression.h>
const char *grpc_compression_algorithm_name(
grpc_compression_algorithm algorithm) {

@ -34,7 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
#define GRPC_INTERNAL_CORE_COMPRESSION_MESSAGE_COMPRESS_H
#include "src/core/compression/algorithm.h"
#include <grpc/compression.h>
#include <grpc/support/slice_buffer.h>
/* compress 'input' to 'output' using 'algorithm'.

@ -35,25 +35,31 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices) {
grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
size_t nslices) {
return grpc_raw_compressed_byte_buffer_create(slices, nslices,
GRPC_COMPRESS_NONE);
}
grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
gpr_slice *slices, size_t nslices, grpc_compression_algorithm compression) {
size_t i;
grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
bb->type = GRPC_BB_SLICE_BUFFER;
gpr_slice_buffer_init(&bb->data.slice_buffer);
bb->type = GRPC_BB_RAW;
bb->data.raw.compression = compression;
gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
for (i = 0; i < nslices; i++) {
gpr_slice_ref(slices[i]);
gpr_slice_buffer_add(&bb->data.slice_buffer, slices[i]);
gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slices[i]);
}
return bb;
}
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
return grpc_byte_buffer_create(bb->data.slice_buffer.slices,
bb->data.slice_buffer.count);
case GRPC_BB_RAW:
return grpc_raw_byte_buffer_create(bb->data.raw.slice_buffer.slices,
bb->data.raw.slice_buffer.count);
}
gpr_log(GPR_INFO, "should never get here");
abort();
@ -63,8 +69,8 @@ grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
if (!bb) return;
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
gpr_slice_buffer_destroy(&bb->data.slice_buffer);
case GRPC_BB_RAW:
gpr_slice_buffer_destroy(&bb->data.raw.slice_buffer);
break;
}
free(bb);
@ -72,8 +78,8 @@ void grpc_byte_buffer_destroy(grpc_byte_buffer *bb) {
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_SLICE_BUFFER:
return bb->data.slice_buffer.length;
case GRPC_BB_RAW:
return bb->data.raw.slice_buffer.length;
}
gpr_log(GPR_ERROR, "should never reach here");
abort();

@ -33,41 +33,73 @@
#include <grpc/byte_buffer_reader.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/slice_buffer.h>
#include <grpc/byte_buffer.h>
#include "src/core/compression/message_compress.h"
static int is_compressed(grpc_byte_buffer *buffer) {
switch (buffer->type) {
case GRPC_BB_RAW:
if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
return 0 /* GPR_FALSE */;
}
break;
}
return 1 /* GPR_TRUE */;
}
void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) {
reader->buffer = buffer;
switch (buffer->type) {
case GRPC_BB_SLICE_BUFFER:
gpr_slice_buffer decompressed_slices_buffer;
reader->buffer_in = buffer;
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
gpr_slice_buffer_init(&decompressed_slices_buffer);
if (is_compressed(reader->buffer_in)) {
grpc_msg_decompress(reader->buffer_in->data.raw.compression,
&reader->buffer_in->data.raw.slice_buffer,
&decompressed_slices_buffer);
reader->buffer_out = grpc_raw_byte_buffer_create(
decompressed_slices_buffer.slices,
decompressed_slices_buffer.count);
gpr_slice_buffer_destroy(&decompressed_slices_buffer);
} else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
}
reader->current.index = 0;
break;
}
}
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
/* no-op: the user is responsible for memory deallocation.
* Other cleanup operations would go here if needed. */
switch (reader->buffer_in->type) {
case GRPC_BB_RAW:
/* keeping the same if-else structure as in the init function */
if (is_compressed(reader->buffer_in)) {
grpc_byte_buffer_destroy(reader->buffer_out);
}
break;
}
}
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice) {
grpc_byte_buffer *buffer = reader->buffer;
gpr_slice_buffer *slice_buffer;
switch (buffer->type) {
case GRPC_BB_SLICE_BUFFER:
slice_buffer = &buffer->data.slice_buffer;
switch (reader->buffer_in->type) {
case GRPC_BB_RAW: {
gpr_slice_buffer *slice_buffer;
slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
if (reader->current.index < slice_buffer->count) {
*slice = gpr_slice_ref(slice_buffer->slices[reader->current.index]);
reader->current.index += 1;
return 1;
} else {
return 0;
}
break;
}
}
return 0;
}

@ -636,7 +636,7 @@ static void call_on_done_send(void *pc, int success) {
static void finish_message(grpc_call *call) {
/* TODO(ctiller): this could be a lot faster if coded directly */
grpc_byte_buffer *byte_buffer = grpc_byte_buffer_create(
grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
call->incoming_message.slices, call->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
@ -806,9 +806,9 @@ static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
size_t i;
switch (byte_buffer->type) {
case GRPC_BB_SLICE_BUFFER:
for (i = 0; i < byte_buffer->data.slice_buffer.count; i++) {
gpr_slice slice = byte_buffer->data.slice_buffer.slices[i];
case GRPC_BB_RAW:
for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
gpr_slice_ref(slice);
grpc_sopb_add_slice(sopb, slice);
}

@ -230,7 +230,10 @@ struct transport {
/* basic state management - what are we doing at the moment? */
gpr_uint8 reading;
gpr_uint8 writing;
gpr_uint8 calling_back;
/** are we calling back (via cb) with a channel-level event */
gpr_uint8 calling_back_channel;
/** are we calling back any grpc_transport_op completion events */
gpr_uint8 calling_back_ops;
gpr_uint8 destroying;
gpr_uint8 closed;
error_state error_state;
@ -357,7 +360,7 @@ static void push_setting(transport *t, grpc_chttp2_setting_id id,
gpr_uint32 value);
static int prepare_callbacks(transport *t);
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb);
static void run_callbacks(transport *t);
static void call_cb_closed(transport *t, const grpc_transport_callbacks *cb);
static int prepare_write(transport *t);
@ -565,7 +568,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
}
gpr_mu_lock(&t->mu);
t->calling_back = 1;
t->calling_back_channel = 1;
ref_transport(t); /* matches unref at end of this function */
gpr_mu_unlock(&t->mu);
@ -574,7 +577,7 @@ static void init_transport(transport *t, grpc_transport_setup_callback setup,
lock(t);
t->cb = sr.callbacks;
t->cb_user_data = sr.user_data;
t->calling_back = 0;
t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
@ -595,7 +598,7 @@ static void destroy_transport(grpc_transport *gt) {
We need to be not writing as cancellation finalization may produce some
callbacks that NEED to be made to close out some streams when t->writing
becomes 0. */
while (t->calling_back || t->writing) {
while (t->calling_back_channel || t->writing) {
gpr_cv_wait(&t->cv, &t->mu, gpr_inf_future);
}
drop_connection(t);
@ -830,28 +833,29 @@ static void unlock(transport *t) {
finish_reads(t);
/* gather any callbacks that need to be made */
if (!t->calling_back) {
t->calling_back = perform_callbacks = prepare_callbacks(t);
if (cb) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back = 1;
}
}
if (!t->calling_back_ops) {
t->calling_back_ops = perform_callbacks = prepare_callbacks(t);
if (perform_callbacks) ref_transport(t);
}
if (perform_callbacks || call_closed || num_goaways) {
ref_transport(t);
if (!t->calling_back_channel && cb) {
if (t->error_state == ERROR_STATE_SEEN && !t->writing) {
call_closed = 1;
t->calling_back_channel = 1;
t->cb = NULL; /* no more callbacks */
t->error_state = ERROR_STATE_NOTIFIED;
}
if (t->num_pending_goaways) {
goaways = t->pending_goaways;
num_goaways = t->num_pending_goaways;
t->pending_goaways = NULL;
t->num_pending_goaways = 0;
t->cap_pending_goaways = 0;
t->calling_back_channel = 1;
}
if (call_closed || num_goaways) {
ref_transport(t);
}
}
/* finally unlock */
@ -865,7 +869,11 @@ static void unlock(transport *t) {
}
if (perform_callbacks) {
run_callbacks(t, cb);
run_callbacks(t);
lock(t);
t->calling_back_ops = 0;
unlock(t);
unref_transport(t);
}
if (call_closed) {
@ -878,9 +886,9 @@ static void unlock(transport *t) {
perform_write(t, ep);
}
if (perform_callbacks || call_closed || num_goaways) {
if (call_closed || num_goaways) {
lock(t);
t->calling_back = 0;
t->calling_back_channel = 0;
if (t->destroying) gpr_cv_signal(&t->cv);
unlock(t);
unref_transport(t);
@ -2101,7 +2109,7 @@ static int prepare_callbacks(transport *t) {
return t->executing_callbacks.count > 0;
}
static void run_callbacks(transport *t, const grpc_transport_callbacks *cb) {
static void run_callbacks(transport *t) {
size_t i;
for (i = 0; i < t->executing_callbacks.count; i++) {
op_closure c = t->executing_callbacks.callbacks[i];

@ -49,8 +49,8 @@ class GrpcBufferWriter GRPC_FINAL
explicit GrpcBufferWriter(grpc_byte_buffer** bp,
int block_size = kMaxBufferLength)
: block_size_(block_size), byte_count_(0), have_backup_(false) {
*bp = grpc_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.slice_buffer;
*bp = grpc_raw_byte_buffer_create(NULL, 0);
slice_buffer_ = &(*bp)->data.raw.slice_buffer;
}
~GrpcBufferWriter() GRPC_OVERRIDE {

@ -42,7 +42,7 @@ ByteBuffer::ByteBuffer(Slice* slices, size_t nslices) {
for (size_t i = 0; i < nslices; i++) {
c_slices[i] = slices[i].slice_;
}
buffer_ = grpc_byte_buffer_create(c_slices.data(), nslices);
buffer_ = grpc_raw_byte_buffer_create(c_slices.data(), nslices);
}
void ByteBuffer::Clear() {

@ -86,7 +86,7 @@ namespace Grpc.Core.Tests
server.AddServiceDefinition(ServiceDefinition);
int port = server.AddListeningPort(Host, Server.PickUnusedPort);
server.Start();
channel = new Channel(Host + ":" + port);
channel = new Channel(Host, port);
}
[TearDown]

@ -3,7 +3,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>10.0.0</ProductVersion>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{86EC5CB4-4EA2-40A2-8057-86542A0353BB}</ProjectGuid>
<OutputType>Library</OutputType>
@ -46,6 +46,8 @@
<Compile Include="TimespecTest.cs" />
<Compile Include="PInvokeTest.cs" />
<Compile Include="Internal\MetadataArraySafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueSafeHandleTest.cs" />
<Compile Include="Internal\CompletionQueueEventTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>

@ -0,0 +1,52 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class CompletionQueueEventTest
{
[Test]
public void CreateAndDestroy()
{
Assert.AreEqual(CompletionQueueEvent.NativeSize, Marshal.SizeOf(typeof(CompletionQueueEvent)));
}
}
}

@ -0,0 +1,64 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
public class CompletionQueueSafeHandleTest
{
[Test]
public void CreateAndDestroy()
{
var cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
}
[Test]
public void CreateAndShutdown()
{
var cq = CompletionQueueSafeHandle.Create();
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type);
Assert.AreNotEqual(IntPtr.Zero, ev.success);
Assert.AreEqual(IntPtr.Zero, ev.tag);
}
}
}

@ -48,7 +48,7 @@ namespace Grpc.Core.Tests
int counter;
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
@ -88,7 +88,7 @@ namespace Grpc.Core.Tests
[Test]
public void NativeCallbackBenchmark()
{
CompletionCallbackDelegate handler = Handler;
OpCompletionDelegate handler = Handler;
counter = 0;
BenchmarkUtil.RunBenchmark(
@ -114,7 +114,7 @@ namespace Grpc.Core.Tests
10000, 10000,
() =>
{
grpcsharp_test_callback(new CompletionCallbackDelegate(Handler));
grpcsharp_test_callback(new OpCompletionDelegate(Handler));
});
Assert.AreNotEqual(0, counter);
}
@ -134,7 +134,7 @@ namespace Grpc.Core.Tests
});
}
private void Handler(bool success, IntPtr ptr)
private void Handler(bool success)
{
counter++;
}

@ -45,9 +45,13 @@ namespace Grpc.Core
readonly string target;
/// <summary>
/// Creates a channel.
/// Creates a channel that connects to a specific host.
/// Port will default to 80 for an unsecure channel and to 443 a secure channel.
/// </summary>
public Channel(string target, Credentials credentials = null, ChannelArgs channelArgs = null)
/// <param name="host">The DNS name of IP address of the host.</param>
/// <param name="credentials">Optional credentials to create a secure channel.</param>
/// <param name="channelArgs">Optional channel arguments.</param>
public Channel(string host, Credentials credentials = null, ChannelArgs channelArgs = null)
{
using (ChannelArgsSafeHandle nativeChannelArgs = CreateNativeChannelArgs(channelArgs))
{
@ -55,23 +59,27 @@ namespace Grpc.Core
{
using (CredentialsSafeHandle nativeCredentials = credentials.ToNativeCredentials())
{
this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, target, nativeChannelArgs);
this.handle = ChannelSafeHandle.CreateSecure(nativeCredentials, host, nativeChannelArgs);
}
}
else
{
this.handle = ChannelSafeHandle.Create(target, nativeChannelArgs);
this.handle = ChannelSafeHandle.Create(host, nativeChannelArgs);
}
}
this.target = GetOverridenTarget(target, channelArgs);
this.target = GetOverridenTarget(host, channelArgs);
}
public string Target
/// <summary>
/// Creates a channel that connects to a specific host and port.
/// </summary>
/// <param name="host">DNS name or IP address</param>
/// <param name="port">the port</param>
/// <param name="credentials">Optional credentials to create a secure channel.</param>
/// <param name="channelArgs">Optional channel arguments.</param>
public Channel(string host, int port, Credentials credentials = null, ChannelArgs channelArgs = null) :
this(string.Format("{0}:{1}", host, port), credentials, channelArgs)
{
get
{
return this.target;
}
}
public void Dispose()
@ -80,6 +88,14 @@ namespace Grpc.Core
GC.SuppressFinalize(this);
}
internal string Target
{
get
{
return target;
}
}
internal ChannelSafeHandle Handle
{
get

@ -73,7 +73,6 @@
<Compile Include="Marshaller.cs" />
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\AsyncStreamExtensions.cs" />
<Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" />
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="Utils\ExceptionHelper.cs" />
<Compile Include="Internal\CredentialsSafeHandle.cs" />
@ -101,6 +100,9 @@
<Compile Include="Internal\AtomicCounter.cs" />
<Compile Include="Internal\DebugStats.cs" />
<Compile Include="ServerCallContext.cs" />
<Compile Include="Internal\CompletionQueueEvent.cs" />
<Compile Include="Internal\CompletionRegistry.cs" />
<Compile Include="Internal\BatchContextSafeHandle.cs" />
</ItemGroup>
<ItemGroup>
<None Include="packages.config" />

@ -54,6 +54,7 @@ namespace Grpc.Core
static volatile GrpcEnvironment instance;
readonly GrpcThreadPool threadPool;
readonly CompletionRegistry completionRegistry;
bool isClosed;
/// <summary>
@ -105,6 +106,19 @@ namespace Grpc.Core
}
}
internal static CompletionRegistry CompletionRegistry
{
get
{
var inst = instance;
if (inst == null)
{
throw new InvalidOperationException("GRPC environment not initialized");
}
return inst.completionRegistry;
}
}
/// <summary>
/// Creates gRPC environment.
/// </summary>
@ -112,6 +126,7 @@ namespace Grpc.Core
{
GrpcLog.RedirectNativeLogs(Console.Error);
grpcsharp_init();
completionRegistry = new CompletionRegistry();
threadPool = new GrpcThreadPool(THREAD_POOL_SIZE);
threadPool.Start();
// TODO: use proper logging here
@ -139,14 +154,24 @@ namespace Grpc.Core
{
var remainingClientCalls = DebugStats.ActiveClientCalls.Count;
if (remainingClientCalls != 0)
{
Console.WriteLine("Warning: Detected {0} client calls that weren't disposed properly.", remainingClientCalls);
{
DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
}
var remainingServerCalls = DebugStats.ActiveServerCalls.Count;
if (remainingServerCalls != 0)
{
Console.WriteLine("Warning: Detected {0} server calls that weren't disposed properly.", remainingServerCalls);
DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
}
var pendingBatchCompletions = DebugStats.PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
DebugWarning(string.Format("Detected {0} pending batch completions.", pendingBatchCompletions));
}
}
private static void DebugWarning(string message)
{
throw new Exception("Shutdown check: " + message);
}
}
}

@ -47,9 +47,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
readonly CompletionCallbackDelegate unaryResponseHandler;
readonly CompletionCallbackDelegate finishedHandler;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@ -60,8 +57,6 @@ namespace Grpc.Core.Internal
public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
{
this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse);
this.finishedHandler = CreateBatchCompletionCallback(HandleFinished);
}
public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
@ -96,7 +91,21 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.BlockingUnary(cq, payload, unaryResponseHandler, metadataArray);
using (var ctx = BatchContextSafeHandle.Create())
{
call.StartUnary(payload, ctx, metadataArray);
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
try
{
HandleUnaryResponse(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
}
}
try
@ -129,7 +138,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartUnary(payload, unaryResponseHandler, metadataArray);
call.StartUnary(payload, HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
}
@ -151,7 +160,7 @@ namespace Grpc.Core.Internal
unaryResponseTcs = new TaskCompletionSource<TResponse>();
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartClientStreaming(unaryResponseHandler, metadataArray);
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
return unaryResponseTcs.Task;
@ -175,7 +184,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartServerStreaming(payload, finishedHandler, metadataArray);
call.StartServerStreaming(payload, HandleFinished, metadataArray);
}
}
}
@ -194,7 +203,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartDuplexStreaming(finishedHandler, metadataArray);
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
}
}
@ -229,7 +238,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendCloseFromClient(halfclosedHandler);
call.StartSendCloseFromClient(HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
@ -274,7 +283,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
{
lock (myLock)
{
@ -307,7 +316,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
private void HandleFinished(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleFinished(bool success, BatchContextSafeHandle ctx)
{
var status = ctx.GetReceivedStatus();

@ -51,13 +51,8 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
protected readonly CompletionCallbackDelegate sendFinishedHandler;
protected readonly CompletionCallbackDelegate readFinishedHandler;
protected readonly CompletionCallbackDelegate halfclosedHandler;
protected readonly object myLock = new object();
protected GCHandle gchandle;
protected CallSafeHandle call;
protected bool disposed;
@ -77,10 +72,6 @@ namespace Grpc.Core.Internal
{
this.serializer = Preconditions.CheckNotNull(serializer);
this.deserializer = Preconditions.CheckNotNull(deserializer);
this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished);
this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished);
this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed);
}
/// <summary>
@ -121,9 +112,6 @@ namespace Grpc.Core.Internal
{
lock (myLock)
{
// Make sure this object and the delegated held by it will not be garbage collected
// before we release this handle.
gchandle = GCHandle.Alloc(this);
this.call = call;
}
}
@ -141,7 +129,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendMessage(payload, sendFinishedHandler);
call.StartSendMessage(payload, HandleSendFinished);
sendCompletionDelegate = completionDelegate;
}
}
@ -157,7 +145,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckReadingAllowed();
call.StartReceiveMessage(readFinishedHandler);
call.StartReceiveMessage(HandleReadFinished);
readCompletionDelegate = completionDelegate;
}
}
@ -197,7 +185,6 @@ namespace Grpc.Core.Internal
{
call.Dispose();
}
gchandle.Free();
disposed = true;
}
@ -281,30 +268,10 @@ namespace Grpc.Core.Internal
}
}
/// <summary>
/// Creates completion callback delegate that wraps the batch completion handler in a try catch block to
/// prevent propagating exceptions accross managed/unmanaged boundary.
/// </summary>
protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler)
{
return new CompletionCallbackDelegate((success, batchContextPtr) =>
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
handler(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
});
}
/// <summary>
/// Handles send completion.
/// </summary>
private void HandleSendFinished(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@ -328,7 +295,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles halfclose completion.
/// </summary>
private void HandleHalfclosed(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx)
{
AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
@ -353,7 +320,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles streaming read completion.
/// </summary>
private void HandleReadFinished(bool success, BatchContextSafeHandleNotOwned ctx)
protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx)
{
var payload = ctx.GetReceivedMessage();

@ -47,12 +47,10 @@ namespace Grpc.Core.Internal
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
readonly CompletionCallbackDelegate finishedServersideHandler;
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer)
{
this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside);
}
public void Initialize(CallSafeHandle call)
@ -72,7 +70,7 @@ namespace Grpc.Core.Internal
started = true;
call.StartServerSide(finishedServersideHandler);
call.StartServerSide(HandleFinishedServerside);
return finishedServersideTcs.Task;
}
}
@ -107,7 +105,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
call.StartSendStatusFromServer(status, halfclosedHandler);
call.StartSendStatusFromServer(status, HandleHalfclosed);
halfcloseRequested = true;
sendCompletionDelegate = completionDelegate;
}
@ -121,7 +119,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Handles the server side close completion.
/// </summary>
private void HandleFinishedServerside(bool success, BatchContextSafeHandleNotOwned ctx)
private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx)
{
bool cancelled = ctx.GetReceivedCloseOnServerCancelled();

@ -41,32 +41,50 @@ namespace Grpc.Core.Internal
/// Not owned version of
/// grpcsharp_batch_context
/// </summary>
internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid
internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx);
static extern BatchContextSafeHandle grpcsharp_batch_context_create();
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen);
static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx);
static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char*
static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx);
static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx); // returns const char*
[DllImport("grpc_csharp_ext.dll")]
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char*
static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandleNotOwned ctx);
static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandle ctx); // returns const char*
public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false)
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_batch_context_recv_close_on_server_cancelled(BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_batch_context_destroy(IntPtr ctx);
private BatchContextSafeHandle()
{
}
public static BatchContextSafeHandle Create()
{
SetHandle(handle);
return grpcsharp_batch_context_create();
}
public IntPtr Handle
{
get
{
return handle;
}
}
public Status GetReceivedStatus()
@ -102,5 +120,11 @@ namespace Grpc.Core.Internal
{
return grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
protected override bool ReleaseHandle()
{
grpcsharp_batch_context_destroy(handle);
return true;
}
}
}

@ -37,8 +37,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void CompletionCallbackDelegate(bool success, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
@ -57,49 +55,40 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
@ -113,64 +102,84 @@ namespace Grpc.Core.Internal
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
.CheckOk();
}
public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
}
public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
}
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
}
public void StartReceiveMessage(CompletionCallbackDelegate callback)
public void StartReceiveMessage(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_recv_message(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
public void StartServerSide(CompletionCallbackDelegate callback)
public void StartServerSide(BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
public void Cancel()
{
AssertCallOk(grpcsharp_call_cancel(this));
grpcsharp_call_cancel(this).CheckOk();
}
public void CancelWithStatus(Status status)
{
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail).CheckOk();
}
protected override bool ReleaseHandle()
@ -179,14 +188,11 @@ namespace Grpc.Core.Internal
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}

@ -0,0 +1,60 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Runtime.InteropServices;
namespace Grpc.Core.Internal
{
/// <summary>
/// grpc_event from grpc/grpc.h
/// </summary>
[StructLayout(LayoutKind.Sequential)]
internal struct CompletionQueueEvent
{
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_sizeof_grpc_event();
public GRPCCompletionType type;
public int success;
public IntPtr tag;
internal static int NativeSize
{
get
{
return grpcsharp_sizeof_grpc_event();
}
}
}
}

@ -46,7 +46,10 @@ namespace Grpc.Core.Internal
static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq);
static extern CompletionQueueEvent grpcsharp_completion_queue_next(CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
static extern CompletionQueueEvent grpcsharp_completion_queue_pluck(CompletionQueueSafeHandle cq, IntPtr tag);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_completion_queue_destroy(IntPtr cq);
@ -60,9 +63,14 @@ namespace Grpc.Core.Internal
return grpcsharp_completion_queue_create();
}
public GRPCCompletionType NextWithCallback()
public CompletionQueueEvent Next()
{
return grpcsharp_completion_queue_next(this);
}
public CompletionQueueEvent Pluck(IntPtr tag)
{
return grpcsharp_completion_queue_next_with_callback(this);
return grpcsharp_completion_queue_pluck(this, tag);
}
public void Shutdown()

@ -0,0 +1,88 @@
#region Copyright notice and license
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#endregion
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
internal delegate void OpCompletionDelegate(bool success);
internal delegate void BatchCompletionDelegate(bool success, BatchContextSafeHandle ctx);
internal class CompletionRegistry
{
readonly ConcurrentDictionary<IntPtr, OpCompletionDelegate> dict = new ConcurrentDictionary<IntPtr, OpCompletionDelegate>();
public void Register(IntPtr key, OpCompletionDelegate callback)
{
DebugStats.PendingBatchCompletions.Increment();
Preconditions.CheckState(dict.TryAdd(key, callback));
}
public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
OpCompletionDelegate opCallback = ((success) => HandleBatchCompletion(success, ctx, callback));
Register(ctx.Handle, opCallback);
}
public OpCompletionDelegate Extract(IntPtr key)
{
OpCompletionDelegate value;
Preconditions.CheckState(dict.TryRemove(key, out value));
DebugStats.PendingBatchCompletions.Decrement();
return value;
}
private static void HandleBatchCompletion(bool success, BatchContextSafeHandle ctx, BatchCompletionDelegate callback)
{
try
{
callback(success, ctx);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
finally
{
if (ctx != null)
{
ctx.Dispose();
}
}
}
}
}

@ -41,5 +41,7 @@ namespace Grpc.Core.Internal
public static readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
public static readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
public static readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
}
}

@ -33,35 +33,47 @@
using System;
using System.Runtime.InteropServices;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// from grpc/grpc.h
/// grpc_call_error from grpc/grpc.h
/// </summary>
internal enum GRPCCallError
{
/* everything went ok */
GRPC_CALL_OK = 0,
OK = 0,
/* something failed, we don't know what */
GRPC_CALL_ERROR,
Error,
/* this method is not available on the server */
GRPC_CALL_ERROR_NOT_ON_SERVER,
NotOnServer,
/* this method is not available on the client */
GRPC_CALL_ERROR_NOT_ON_CLIENT,
NotOnClient,
/* this method must be called before server_accept */
GRPC_CALL_ERROR_ALREADY_ACCEPTED,
AlreadyAccepted,
/* this method must be called before invoke */
GRPC_CALL_ERROR_ALREADY_INVOKED,
AlreadyInvoked,
/* this method must be called after invoke */
GRPC_CALL_ERROR_NOT_INVOKED,
NotInvoked,
/* this call is already finished
(writes_done or write_status has already been called) */
GRPC_CALL_ERROR_ALREADY_FINISHED,
AlreadyFinished,
/* there is already an outstanding read/write operation on the call */
GRPC_CALL_ERROR_TOO_MANY_OPERATIONS,
TooManyOperations,
/* the flags value was illegal for this call */
GRPC_CALL_ERROR_INVALID_FLAGS
InvalidFlags
}
internal static class CallErrorExtensions
{
/// <summary>
/// Checks the call API invocation's result is OK.
/// </summary>
public static void CheckOk(this GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
}
}
/// <summary>
@ -70,12 +82,12 @@ namespace Grpc.Core.Internal
internal enum GRPCCompletionType
{
/* Shutting down */
GRPC_QUEUE_SHUTDOWN,
Shutdown,
/* No event before timeout */
GRPC_QUEUE_TIMEOUT,
Timeout,
/* operation completion */
GRPC_OP_COMPLETE
OpComplete
}
}

@ -112,12 +112,26 @@ namespace Grpc.Core.Internal
/// </summary>
private void RunHandlerLoop()
{
GRPCCompletionType completionType;
CompletionQueueEvent ev;
do
{
completionType = cq.NextWithCallback();
ev = cq.Next();
if (ev.type == GRPCCompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
try
{
var callback = GrpcEnvironment.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
{
Console.WriteLine("Exception occured while invoking completion delegate: " + e);
}
}
}
while (completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN);
while (ev.type != GRPCCompletionType.Shutdown);
Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
}
}

@ -44,9 +44,6 @@ namespace Grpc.Core.Internal
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
@ -59,11 +56,14 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
static extern void grpcsharp_server_shutdown_and_notify_callback(ServerSafeHandle server, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
@ -97,14 +97,18 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown(this);
}
public void ShutdownAndNotify(CompletionCallbackDelegate callback)
public void ShutdownAndNotify(BatchCompletionDelegate callback)
{
grpcsharp_server_shutdown_and_notify_callback(this, callback);
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_shutdown_and_notify_callback(this, ctx);
}
public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
public void RequestCall(CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
var ctx = BatchContextSafeHandle.Create();
GrpcEnvironment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
grpcsharp_server_request_call(this, cq, ctx).CheckOk();
}
protected override bool ReleaseHandle()
@ -112,10 +116,5 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle);
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
}
}

@ -51,7 +51,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern int gprsharp_sizeof_timespec();
// TODO: revisit this.
// NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8
// so IntPtr seems to have the right size to work on both.
public System.IntPtr tv_sec;

@ -52,10 +52,8 @@ namespace Grpc.Core
/// </summary>
public const int PickUnusedPort = 0;
// TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly CompletionCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
//readonly OpCompletionDelegate serverShutdownHandler;
//readonly OpCompletionDelegate newServerRpcHandler;
readonly ServerSafeHandle handle;
readonly object myLock = new object();
@ -69,8 +67,8 @@ namespace Grpc.Core
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
//this.newServerRpcHandler = HandleNewServerRpc;
//this.serverShutdownHandler = HandleServerShutdown;
}
/// <summary>
@ -108,7 +106,7 @@ namespace Grpc.Core
/// </summary>
/// <returns>The port on which server will be listening.</returns>
/// <param name="host">the host</param>
/// <param name="port">the port. If zero, , an unused port is chosen automatically.</param>
/// <param name="port">the port. If zero, an unused port is chosen automatically.</param>
public int AddListeningPort(string host, int port, ServerCredentials credentials)
{
Preconditions.CheckNotNull(credentials);
@ -144,7 +142,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
handle.ShutdownAndNotify(serverShutdownHandler);
var ctx = BatchContextSafeHandle.Create();
handle.ShutdownAndNotify(HandleServerShutdown);
await shutdownTcs.Task;
handle.Dispose();
}
@ -194,7 +193,7 @@ namespace Grpc.Core
{
if (!shutdownRequested)
{
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
handle.RequestCall(GetCompletionQueue(), HandleNewServerRpc);
}
}
}
@ -222,44 +221,28 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
private void HandleNewServerRpc(bool success, IntPtr batchContextPtr)
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
// TODO: handle error
// TODO: handle error
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
CallSafeHandle call = ctx.GetServerRpcNewCall();
string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
{
Task.Run(async () => await InvokeCallHandler(call, method));
}
AllowOneRpc();
}
catch (Exception e)
// after server shutdown, the callback returns with null call
if (!call.IsInvalid)
{
Console.WriteLine("Caught exception in a native handler: " + e);
Task.Run(async () => await InvokeCallHandler(call, method));
}
AllowOneRpc();
}
/// <summary>
/// Handles native callback.
/// </summary>
private void HandleServerShutdown(bool success, IntPtr batchContextPtr)
private void HandleServerShutdown(bool success, BatchContextSafeHandle ctx)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
shutdownTcs.SetResult(null);
}
private static CompletionQueueSafeHandle GetCompletionQueue()

@ -41,7 +41,7 @@ namespace math
{
GrpcEnvironment.Initialize();
using (Channel channel = new Channel("127.0.0.1:23456"))
using (Channel channel = new Channel("127.0.0.1", 23456))
{
Math.IMathClient stub = new Math.MathClient(channel);
MathExamples.DivExample(stub);

@ -60,7 +60,7 @@ namespace math.Tests
server.AddServiceDefinition(Math.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host, Server.PickUnusedPort);
server.Start();
channel = new Channel(host + ":" + port);
channel = new Channel(host, port);
// TODO(jtattermusch): get rid of the custom header here once we have dedicated tests
// for header support.

@ -104,8 +104,6 @@ namespace Grpc.IntegrationTesting
{
GrpcEnvironment.Initialize();
string addr = string.Format("{0}:{1}", options.serverHost, options.serverPort);
Credentials credentials = null;
if (options.useTls)
{
@ -119,7 +117,7 @@ namespace Grpc.IntegrationTesting
.AddString(ChannelArgs.SslTargetNameOverrideKey, options.serverHostOverride).Build();
}
using (Channel channel = new Channel(addr, credentials, channelArgs))
using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelArgs))
{
var stubConfig = StubConfiguration.Default;
if (options.testCase == "service_account_creds" || options.testCase == "compute_engine_creds")

@ -65,7 +65,7 @@ namespace Grpc.IntegrationTesting
var channelArgs = ChannelArgs.CreateBuilder()
.AddString(ChannelArgs.SslTargetNameOverrideKey, TestCredentials.DefaultHostOverride).Build();
channel = new Channel(host + ":" + port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
channel = new Channel(host, port, TestCredentials.CreateTestClientCredentials(true), channelArgs);
client = TestService.NewStub(channel);
}

@ -60,7 +60,7 @@
grpc_byte_buffer *string_to_byte_buffer(const char *buffer, size_t len) {
gpr_slice slice = gpr_slice_from_copied_buffer(buffer, len);
grpc_byte_buffer *bb = grpc_byte_buffer_create(&slice, 1);
grpc_byte_buffer *bb = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return bb;
}
@ -91,13 +91,9 @@ typedef struct gprcsharp_batch_context {
grpc_call_details call_details;
grpc_metadata_array request_metadata;
} server_rpc_new;
/* callback will be called upon completion */
callback_funcptr callback;
} grpcsharp_batch_context;
grpcsharp_batch_context *grpcsharp_batch_context_create() {
GPR_EXPORT grpcsharp_batch_context *GPR_CALLTYPE grpcsharp_batch_context_create() {
grpcsharp_batch_context *ctx = gpr_malloc(sizeof(grpcsharp_batch_context));
memset(ctx, 0, sizeof(grpcsharp_batch_context));
return ctx;
@ -192,7 +188,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array *dest,
src->metadata = NULL;
}
void grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
GPR_EXPORT void GPR_CALLTYPE grpcsharp_batch_context_destroy(grpcsharp_batch_context *ctx) {
if (!ctx) {
return;
}
@ -306,25 +302,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
grpc_completion_queue_destroy(cq);
}
GPR_EXPORT grpc_completion_type GPR_CALLTYPE
grpcsharp_completion_queue_next_with_callback(grpc_completion_queue *cq) {
grpc_event ev;
grpcsharp_batch_context *batch_context;
grpc_completion_type t;
ev = grpc_completion_queue_next(cq, gpr_inf_future);
t = ev.type;
if (t == GRPC_OP_COMPLETE && ev.tag) {
/* NEW API handler */
batch_context = (grpcsharp_batch_context *)ev.tag;
batch_context->callback((gpr_int32) ev.success, batch_context);
grpcsharp_batch_context_destroy(batch_context);
}
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_next(grpc_completion_queue *cq) {
return grpc_completion_queue_next(cq, gpr_inf_future);
}
/* return completion type to allow some handling for events that have no
* tag - such as GRPC_QUEUE_SHUTDOWN
*/
return t;
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
return grpc_completion_queue_pluck(cq, tag, gpr_inf_future);
}
/* Channel */
@ -413,14 +398,11 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) {
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[6];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -454,34 +436,12 @@ grpcsharp_call_start_unary(grpc_call *call, callback_funcptr callback,
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
/* Synchronous unary call */
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_call_blocking_unary(grpc_call *call,
grpc_completion_queue *dedicated_cq,
callback_funcptr callback,
const char *send_buffer, size_t send_buffer_len,
grpc_metadata_array *initial_metadata) {
GPR_ASSERT(grpcsharp_call_start_unary(call, callback, send_buffer,
send_buffer_len,
initial_metadata) == GRPC_CALL_OK);
/* TODO: we would like to use pluck, but we don't know the tag */
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
GRPC_OP_COMPLETE);
grpc_completion_queue_shutdown(dedicated_cq);
GPR_ASSERT(grpcsharp_completion_queue_next_with_callback(dedicated_cq) ==
GRPC_QUEUE_SHUTDOWN);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_client_streaming(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[4];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -510,13 +470,10 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, callback_funcptr callback, const char *send_buffer,
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[5];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -549,13 +506,10 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_duplex_streaming(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
grpc_op ops[3];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@ -581,13 +535,10 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
@ -597,12 +548,9 @@ grpcsharp_call_send_message(grpc_call *call, callback_funcptr callback,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_close_from_client(grpc_call *call,
callback_funcptr callback) {
grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
@ -610,14 +558,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call,
callback_funcptr callback,
grpcsharp_batch_context *ctx,
grpc_status_code status_code,
const char *status_details) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@ -629,25 +574,18 @@ grpcsharp_call_send_status_from_server(grpc_call *call,
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_recv_message(grpc_call *call, callback_funcptr callback) {
grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[1];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_start_serverside(grpc_call *call, callback_funcptr callback) {
grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
/* TODO: don't use magic number */
grpc_op ops[2];
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
ops[0].data.send_initial_metadata.count = 0;
ops[0].data.send_initial_metadata.metadata = NULL;
@ -684,9 +622,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_shutdown(grpc_server *server) {
GPR_EXPORT void GPR_CALLTYPE
grpcsharp_server_shutdown_and_notify_callback(grpc_server *server,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
grpcsharp_batch_context *ctx) {
grpc_server_shutdown_and_notify(server, ctx);
}
@ -696,10 +632,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_server_destroy(grpc_server *server) {
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_server_request_call(grpc_server *server, grpc_completion_queue *cq,
callback_funcptr callback) {
grpcsharp_batch_context *ctx = grpcsharp_batch_context_create();
ctx->callback = callback;
grpcsharp_batch_context *ctx) {
return grpc_server_request_call(
server, &(ctx->server_rpc_new.call), &(ctx->server_rpc_new.call_details),
&(ctx->server_rpc_new.request_metadata), cq, cq, ctx);
@ -796,3 +729,8 @@ grpcsharp_test_callback(callback_funcptr callback) {
/* For testing */
GPR_EXPORT void *GPR_CALLTYPE grpcsharp_test_nop(void *ptr) { return ptr; }
/* For testing */
GPR_EXPORT gpr_int32 GPR_CALLTYPE grpcsharp_sizeof_grpc_event(void) {
return sizeof(grpc_event);
}

@ -57,7 +57,7 @@ grpc_byte_buffer *BufferToByteBuffer(Handle<Value> buffer) {
char *data = ::node::Buffer::Data(buffer);
gpr_slice slice = gpr_slice_malloc(length);
memcpy(GPR_SLICE_START_PTR(slice), data, length);
grpc_byte_buffer *byte_buffer(grpc_byte_buffer_create(&slice, 1));
grpc_byte_buffer *byte_buffer(grpc_raw_byte_buffer_create(&slice, 1));
gpr_slice_unref(slice);
return byte_buffer;
}

@ -1,6 +1,6 @@
{
"name": "grpc",
"version": "0.9.0",
"version": "0.9.1",
"author": "Google Inc.",
"description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/",

@ -47,7 +47,9 @@ function deserializeCls(cls) {
* @return {cls} The resulting object
*/
return function deserialize(arg_buf) {
return cls.decode(arg_buf).toRaw();
// Convert to a native object with binary fields as Buffers (first argument)
// and longs as strings (second argument)
return cls.decode(arg_buf).toRaw(false, true);
};
}

@ -0,0 +1,90 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
'use strict';
var assert = require('assert');
var common = require('../src/common.js');
var ProtoBuf = require('protobufjs');
var messages_proto = ProtoBuf.loadProtoFile(
__dirname + '/test_messages.proto').build();
describe('Proto message serialize and deserialize', function() {
var longSerialize = common.serializeCls(messages_proto.LongValues);
var longDeserialize = common.deserializeCls(messages_proto.LongValues);
var pos_value = '314159265358979';
var neg_value = '-27182818284590';
it('should preserve positive int64 values', function() {
var serialized = longSerialize({int_64: pos_value});
assert.strictEqual(longDeserialize(serialized).int_64.toString(),
pos_value);
});
it('should preserve negative int64 values', function() {
var serialized = longSerialize({int_64: neg_value});
assert.strictEqual(longDeserialize(serialized).int_64.toString(),
neg_value);
});
it('should preserve uint64 values', function() {
var serialized = longSerialize({uint_64: pos_value});
assert.strictEqual(longDeserialize(serialized).uint_64.toString(),
pos_value);
});
it('should preserve positive sint64 values', function() {
var serialized = longSerialize({sint_64: pos_value});
assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
pos_value);
});
it('should preserve negative sint64 values', function() {
var serialized = longSerialize({sint_64: neg_value});
assert.strictEqual(longDeserialize(serialized).sint_64.toString(),
neg_value);
});
it('should preserve fixed64 values', function() {
var serialized = longSerialize({fixed_64: pos_value});
assert.strictEqual(longDeserialize(serialized).fixed_64.toString(),
pos_value);
});
it('should preserve positive sfixed64 values', function() {
var serialized = longSerialize({sfixed_64: pos_value});
assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
pos_value);
});
it('should preserve negative sfixed64 values', function() {
var serialized = longSerialize({sfixed_64: neg_value});
assert.strictEqual(longDeserialize(serialized).sfixed_64.toString(),
neg_value);
});
});

@ -0,0 +1,38 @@
// Copyright 2015, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
message LongValues {
int64 int_64 = 1;
uint64 uint_64 = 2;
sint64 sint_64 = 3;
fixed64 fixed_64 = 4;
sfixed64 sfixed_64 = 5;
}

@ -55,7 +55,7 @@ static void CopyByteBufferToCharArray(grpc_byte_buffer *buffer, char *array) {
static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(array, length);
grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}
@ -85,7 +85,7 @@ static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array,
// The following implementation is thus not optimal, sometimes requiring two
// copies (one by self.bytes and another by gpr_slice_from_copied_buffer).
// If it turns out to be an issue, we can use enumerateByteRangesUsingblock:
// to create an array of gpr_slice objects to pass to grpc_byte_buffer_create.
// to create an array of gpr_slice objects to pass to grpc_raw_byte_buffer_create.
// That would make it do exactly one copy, always.
return CopyCharArrayToNewByteBuffer((const char *)self.bytes, (size_t)self.length);
}

@ -51,7 +51,7 @@
grpc_byte_buffer *string_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}

@ -179,7 +179,7 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) {
PyString_AsStringAndSize(
PyTuple_GET_ITEM(op, MESSAGE_INDEX), &message, &message_size);
message_slice = gpr_slice_from_copied_buffer(message, message_size);
c_op.data.send_message = grpc_byte_buffer_create(&message_slice, 1);
c_op.data.send_message = grpc_raw_byte_buffer_create(&message_slice, 1);
gpr_slice_unref(message_slice);
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:

@ -42,7 +42,7 @@
grpc_byte_buffer* grpc_rb_s_to_byte_buffer(char *string, size_t length) {
gpr_slice slice = gpr_slice_from_copied_buffer(string, length);
grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
grpc_byte_buffer *buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
return buffer;
}

@ -133,7 +133,8 @@ static int byte_buffer_eq_slice(grpc_byte_buffer *bb, gpr_slice b) {
if (!bb) return 0;
a = merge_slices(bb->data.slice_buffer.slices, bb->data.slice_buffer.count);
a = merge_slices(bb->data.raw.slice_buffer.slices,
bb->data.raw.slice_buffer.count);
ok = GPR_SLICE_LENGTH(a) == GPR_SLICE_LENGTH(b) &&
0 == memcmp(GPR_SLICE_START_PTR(a), GPR_SLICE_START_PTR(b),
GPR_SLICE_LENGTH(a));

@ -121,9 +121,9 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",

@ -121,9 +121,9 @@ static void test_cancel_after_accept_and_writes_closed(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
int was_cancelled = 2;
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",

@ -119,7 +119,7 @@ static void test_cancel_after_invoke(grpc_end2end_test_config config,
grpc_byte_buffer *response_payload_recv = NULL;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",
"foo.test.google.fr", deadline);

@ -116,7 +116,7 @@ static void test_cancel_before_invoke(grpc_end2end_test_config config,
grpc_byte_buffer *response_payload_recv = NULL;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
c = grpc_channel_create_call(f.client, f.client_cq, "/foo",
"foo.test.google.fr", deadline);

@ -109,9 +109,9 @@ static void test_invoke_large_request(grpc_end2end_test_config config) {
grpc_call *c;
grpc_call *s;
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = n_seconds_time(30);
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -109,7 +109,7 @@ static void test_max_message_length(grpc_end2end_test_config config) {
grpc_op *op;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;

@ -165,8 +165,8 @@ static void test_pingpong_streaming(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(s, ops, op - ops, tag(101)));
for (i = 0; i < messages; i++) {
request_payload = grpc_byte_buffer_create(&request_payload_slice, 1);
response_payload = grpc_byte_buffer_create(&response_payload_slice, 1);
request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1);
response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
op = ops;
op->op = GRPC_OP_SEND_MESSAGE;

@ -105,9 +105,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {
{"key1-bin",

@ -105,9 +105,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}},
{"key2", "val2", 4, {{NULL, NULL, NULL}}}};

@ -103,9 +103,9 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
grpc_call *c;
grpc_call *s;
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
cq_verifier *v_client = cq_verifier_create(f.client_cq);
cq_verifier *v_server = cq_verifier_create(f.server_cq);

@ -156,9 +156,9 @@ static void request_response_with_payload_and_call_creds(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_end2end_test_fixture f = begin_test(config, test_name, NULL, NULL);

@ -105,9 +105,9 @@ static void test_request_response_with_metadata_and_payload(
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
gpr_slice response_payload_slice = gpr_slice_from_copied_string("hello you");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_byte_buffer *response_payload =
grpc_byte_buffer_create(&response_payload_slice, 1);
grpc_raw_byte_buffer_create(&response_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta_c[2] = {{"key1", "val1", 4, {{NULL, NULL, NULL}}}, {"key2", "val2", 4, {{NULL, NULL, NULL}}}};
grpc_metadata meta_s[2] = {{"key3", "val3", 4, {{NULL, NULL, NULL}}}, {"key4", "val4", 4, {{NULL, NULL, NULL}}}};

@ -103,7 +103,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
grpc_call *s;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_metadata meta;
grpc_end2end_test_fixture f = begin_test(config, "test_request_with_large_metadata", NULL, NULL);

@ -103,7 +103,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
grpc_call *s;
gpr_slice request_payload_slice = gpr_slice_from_copied_string("hello world");
grpc_byte_buffer *request_payload =
grpc_byte_buffer_create(&request_payload_slice, 1);
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
gpr_timespec deadline = five_seconds_time();
grpc_end2end_test_fixture f = begin_test(config, "test_invoke_request_with_payload", NULL, NULL);
cq_verifier *v_client = cq_verifier_create(f.client_cq);

@ -183,7 +183,7 @@ int main(int argc, char **argv) {
channel = grpc_channel_create(target, NULL);
cq = grpc_completion_queue_create();
the_buffer = grpc_byte_buffer_create(&slice, payload_size);
the_buffer = grpc_raw_byte_buffer_create(&slice, payload_size);
histogram = gpr_histogram_create(0.01, 60e9);
sc.init();

@ -42,6 +42,8 @@
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
#include "src/core/compression/message_compress.h"
#include <string.h>
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
@ -55,7 +57,7 @@ static void test_read_one_slice(void) {
LOG_TEST("test_read_one_slice");
slice = gpr_slice_from_copied_string("test");
buffer = grpc_byte_buffer_create(&slice, 1);
buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
@ -77,7 +79,28 @@ static void test_read_one_slice_malloc(void) {
LOG_TEST("test_read_one_slice_malloc");
slice = gpr_slice_malloc(4);
memcpy(GPR_SLICE_START_PTR(slice), "test", 4);
buffer = grpc_byte_buffer_create(&slice, 1);
buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
GPR_ASSERT(first_code != 0);
GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(first_slice), "test", 4) == 0);
gpr_slice_unref(first_slice);
second_code = grpc_byte_buffer_reader_next(&reader, &second_slice);
GPR_ASSERT(second_code == 0);
grpc_byte_buffer_destroy(buffer);
}
static void test_read_none_compressed_slice(void) {
gpr_slice slice;
grpc_byte_buffer *buffer;
grpc_byte_buffer_reader reader;
gpr_slice first_slice, second_slice;
int first_code, second_code;
LOG_TEST("test_read_none_compressed_slice");
slice = gpr_slice_from_copied_string("test");
buffer = grpc_raw_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
grpc_byte_buffer_reader_init(&reader, buffer);
first_code = grpc_byte_buffer_reader_next(&reader, &first_slice);
@ -89,9 +112,61 @@ static void test_read_one_slice_malloc(void) {
grpc_byte_buffer_destroy(buffer);
}
static void read_compressed_slice(grpc_compression_algorithm algorithm,
int input_size) {
gpr_slice input_slice;
gpr_slice_buffer sliceb_in;
gpr_slice_buffer sliceb_out;
grpc_byte_buffer *buffer;
grpc_byte_buffer_reader reader;
gpr_slice read_slice;
int read_count = 0;
gpr_slice_buffer_init(&sliceb_in);
gpr_slice_buffer_init(&sliceb_out);
input_slice = gpr_slice_malloc(input_size);
memset(GPR_SLICE_START_PTR(input_slice), 'a', input_size);
gpr_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */
GPR_ASSERT(grpc_msg_compress(algorithm, &sliceb_in, &sliceb_out));
buffer = grpc_raw_compressed_byte_buffer_create(
sliceb_out.slices, sliceb_out.count, algorithm);
grpc_byte_buffer_reader_init(&reader, buffer);
while (grpc_byte_buffer_reader_next(&reader, &read_slice)) {
GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(read_slice),
GPR_SLICE_START_PTR(input_slice) + read_count,
GPR_SLICE_LENGTH(read_slice)) == 0);
read_count += GPR_SLICE_LENGTH(read_slice);
gpr_slice_unref(read_slice);
}
GPR_ASSERT(read_count == input_size);
grpc_byte_buffer_reader_destroy(&reader);
grpc_byte_buffer_destroy(buffer);
gpr_slice_buffer_destroy(&sliceb_out);
gpr_slice_buffer_destroy(&sliceb_in);
}
static void test_read_gzip_compressed_slice(void) {
const int INPUT_SIZE = 2048;
LOG_TEST("test_read_gzip_compressed_slice");
read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE);
}
static void test_read_deflate_compressed_slice(void) {
const int INPUT_SIZE = 2048;
LOG_TEST("test_read_deflate_compressed_slice");
read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
}
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
test_read_one_slice_malloc();
test_read_none_compressed_slice();
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
return 0;
}

@ -236,9 +236,9 @@ class AsyncClient : public Client {
}
if ((closed_loop_ || !rpc_deadlines_[thread_idx].empty()) &&
grpc_time_source::now() > deadline) {
// we have missed some 1-second deadline, which is too much
gpr_log(GPR_INFO, "Missed an RPC deadline, giving up");
return false;
// we have missed some 1-second deadline, which is worth noting
gpr_log(GPR_INFO, "Missed an RPC deadline");
// Don't give up, as there might be some truly heavy tails
}
if (got_event) {
ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);

@ -132,6 +132,9 @@ static void QpsDriver() {
pareto->set_alpha(FLAGS_load_param_2);
break;
}
default:
GPR_ASSERT(false);
break;
}
ServerConfig server_config;
@ -153,7 +156,7 @@ static void QpsDriver() {
FLAGS_warmup_seconds, FLAGS_benchmark_seconds, FLAGS_local_workers);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportQPSPerCore(*result, server_config);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
GetReporter()->ReportTimes(*result);
}

@ -67,7 +67,7 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
GetReporter()->ReportQPSPerCore(*result, server_config);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
}

@ -61,7 +61,7 @@ static void RunQPS() {
client_config.set_load_type(POISSON);
client_config.mutable_load_params()->
mutable_poisson()->set_offered_load(10000.0);
ServerConfig server_config;
server_config.set_server_type(ASYNC_SERVER);
server_config.set_enable_ssl(false);
@ -70,7 +70,7 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
GetReporter()->ReportQPSPerCore(*result, server_config);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
}

@ -71,6 +71,8 @@ std::unique_ptr<Client> CreateClient(const ClientConfig& config) {
return (config.rpc_type() == RpcType::UNARY)
? CreateAsyncUnaryClient(config)
: CreateAsyncStreamingClient(config);
default:
abort();
}
abort();
}
@ -82,6 +84,8 @@ std::unique_ptr<Server> CreateServer(const ServerConfig& config,
return CreateSynchronousServer(config, server_port);
case ServerType::ASYNC_SERVER:
return CreateAsyncServer(config, server_port);
default:
abort();
}
abort();
}

@ -30,92 +30,92 @@
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto2";
syntax = "proto3";
package grpc.testing;
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 1;
COMPRESSABLE = 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 2;
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 3;
RANDOM = 2;
}
message StatsRequest {
// run number
optional int32 test_num = 1;
int32 test_num = 1;
}
message ServerStats {
// wall clock time
required double time_elapsed = 1;
double time_elapsed = 1;
// user time used by the server process and threads
required double time_user = 2;
double time_user = 2;
// server time used by the server process and all threads
required double time_system = 3;
double time_system = 3;
}
message Payload {
// The type of data in body.
optional PayloadType type = 1;
PayloadType type = 1;
// Primary contents of payload.
optional bytes body = 2;
bytes body = 2;
}
message HistogramData {
repeated uint32 bucket = 1;
required double min_seen = 2;
required double max_seen = 3;
required double sum = 4;
required double sum_of_squares = 5;
required double count = 6;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
enum ClientType {
SYNCHRONOUS_CLIENT = 1;
ASYNC_CLIENT = 2;
SYNCHRONOUS_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 1;
ASYNC_SERVER = 2;
SYNCHRONOUS_SERVER = 0;
ASYNC_SERVER = 1;
}
enum RpcType {
UNARY = 1;
STREAMING = 2;
UNARY = 0;
STREAMING = 1;
}
enum LoadType {
CLOSED_LOOP = 1;
POISSON = 2;
UNIFORM = 3;
DETERMINISTIC = 4;
PARETO = 5;
CLOSED_LOOP = 0;
POISSON = 1;
UNIFORM = 2;
DETERMINISTIC = 3;
PARETO = 4;
}
message PoissonParams {
optional double offered_load = 1;
double offered_load = 1;
}
message UniformParams {
optional double interarrival_lo = 1;
optional double interarrival_hi = 2;
double interarrival_lo = 1;
double interarrival_hi = 2;
}
message DeterministicParams {
optional double offered_load = 1;
double offered_load = 1;
}
message ParetoParams {
optional double interarrival_base = 1;
optional double alpha = 2;
double interarrival_base = 1;
double alpha = 2;
}
message LoadParams {
@ -129,17 +129,17 @@ message LoadParams {
message ClientConfig {
repeated string server_targets = 1;
required ClientType client_type = 2;
optional bool enable_ssl = 3 [default = false];
required int32 outstanding_rpcs_per_channel = 4;
required int32 client_channels = 5;
required int32 payload_size = 6;
ClientType client_type = 2;
bool enable_ssl = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
optional int32 async_client_threads = 7;
optional RpcType rpc_type = 8 [default = UNARY];
optional string host = 9;
optional LoadType load_type = 10 [default = CLOSED_LOOP];
optional LoadParams load_params = 11;
int32 async_client_threads = 7;
RpcType rpc_type = 8;
string host = 9;
LoadType load_type = 10;
LoadParams load_params = 11;
}
// Request current stats
@ -154,21 +154,21 @@ message ClientArgs {
}
message ClientStats {
required HistogramData latencies = 1;
required double time_elapsed = 3;
required double time_user = 4;
required double time_system = 5;
HistogramData latencies = 1;
double time_elapsed = 2;
double time_user = 3;
double time_system = 4;
}
message ClientStatus {
optional ClientStats stats = 1;
ClientStats stats = 1;
}
message ServerConfig {
required ServerType server_type = 1;
optional int32 threads = 2 [default = 1];
optional bool enable_ssl = 3 [default = false];
optional string host = 4;
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
string host = 4;
}
message ServerArgs {
@ -179,25 +179,25 @@ message ServerArgs {
}
message ServerStatus {
optional ServerStats stats = 1;
required int32 port = 2;
ServerStats stats = 1;
int32 port = 2;
}
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1 [default = COMPRESSABLE];
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2 [default = 0];
int32 response_size = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
Payload payload = 3;
}
message SimpleResponse {
optional Payload payload = 1;
Payload payload = 1;
}
service TestService {

@ -101,10 +101,11 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
std::lock_guard<std::mutex> g(shutdown_mutex_);
std::unique_lock<std::mutex> g(shutdown_mutex_);
if (!shutdown_) {
// this RPC context is done, so refresh it
if (!still_going) {
g.unlock();
ctx->Reset();
}
} else {

@ -760,7 +760,7 @@ WARN_LOGFILE =
# spaces.
# Note: If this tag is empty the current directory is searched.
INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h
INPUT = include/grpc/grpc_security.h include/grpc/byte_buffer.h include/grpc/byte_buffer_reader.h include/grpc/compression.h include/grpc/grpc.h include/grpc/status.h include/grpc/census.h include/grpc/support/alloc.h include/grpc/support/atm.h include/grpc/support/atm_gcc_atomic.h include/grpc/support/atm_gcc_sync.h include/grpc/support/atm_win32.h include/grpc/support/cancellable_platform.h include/grpc/support/cmdline.h include/grpc/support/cpu.h include/grpc/support/histogram.h include/grpc/support/host_port.h include/grpc/support/log.h include/grpc/support/log_win32.h include/grpc/support/port_platform.h include/grpc/support/slice.h include/grpc/support/slice_buffer.h include/grpc/support/string_util.h include/grpc/support/subprocess.h include/grpc/support/sync.h include/grpc/support/sync_generic.h include/grpc/support/sync_posix.h include/grpc/support/sync_win32.h include/grpc/support/thd.h include/grpc/support/time.h include/grpc/support/tls.h include/grpc/support/tls_gcc.h include/grpc/support/tls_msvc.h include/grpc/support/tls_pthread.h include/grpc/support/useful.h
# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding. Doxygen uses

File diff suppressed because one or more lines are too long

@ -150,6 +150,7 @@
<ClInclude Include="..\..\include\grpc\grpc_security.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h" />
<ClInclude Include="..\..\include\grpc\compression.h" />
<ClInclude Include="..\..\include\grpc\grpc.h" />
<ClInclude Include="..\..\include\grpc\status.h" />
<ClInclude Include="..\..\include\grpc\census.h" />
@ -181,7 +182,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
<ClInclude Include="..\..\src\core\debug\trace.h" />
<ClInclude Include="..\..\src\core\iomgr\alarm.h" />

@ -360,6 +360,9 @@
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h">
<Filter>include\grpc</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc\compression.h">
<Filter>include\grpc</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc\grpc.h">
<Filter>include\grpc</Filter>
</ClInclude>
@ -449,9 +452,6 @@
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\compression\algorithm.h">
<Filter>src\core\compression</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\compression\message_compress.h">
<Filter>src\core\compression</Filter>
</ClInclude>

@ -148,6 +148,7 @@
<ItemGroup>
<ClInclude Include="..\..\include\grpc\byte_buffer.h" />
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h" />
<ClInclude Include="..\..\include\grpc\compression.h" />
<ClInclude Include="..\..\include\grpc\grpc.h" />
<ClInclude Include="..\..\include\grpc\status.h" />
<ClInclude Include="..\..\include\grpc\census.h" />
@ -163,7 +164,6 @@
<ClInclude Include="..\..\src\core\channel\http_client_filter.h" />
<ClInclude Include="..\..\src\core\channel\http_server_filter.h" />
<ClInclude Include="..\..\src\core\channel\noop_filter.h" />
<ClInclude Include="..\..\src\core\compression\algorithm.h" />
<ClInclude Include="..\..\src\core\compression\message_compress.h" />
<ClInclude Include="..\..\src\core\debug\trace.h" />
<ClInclude Include="..\..\src\core\iomgr\alarm.h" />

@ -291,6 +291,9 @@
<ClInclude Include="..\..\include\grpc\byte_buffer_reader.h">
<Filter>include\grpc</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc\compression.h">
<Filter>include\grpc</Filter>
</ClInclude>
<ClInclude Include="..\..\include\grpc\grpc.h">
<Filter>include\grpc</Filter>
</ClInclude>
@ -332,9 +335,6 @@
<ClInclude Include="..\..\src\core\channel\noop_filter.h">
<Filter>src\core\channel</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\compression\algorithm.h">
<Filter>src\core\compression</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\compression\message_compress.h">
<Filter>src\core\compression</Filter>
</ClInclude>

Loading…
Cancel
Save