clang-format

pull/1369/head
Craig Tiller 10 years ago
parent c52779ff61
commit 1a727fde47
  1. 29
      src/core/channel/client_channel.c
  2. 24
      src/core/security/auth.c
  3. 3
      src/core/security/server_secure_chttp2.c
  4. 3
      src/core/surface/call.c
  5. 2
      src/core/surface/secure_channel_create.c
  6. 37
      src/core/transport/chttp2_transport.c
  7. 4
      src/core/transport/transport.c
  8. 4
      src/core/transport/transport.h
  9. 3
      src/core/transport/transport_op_string.c
  10. 351
      src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
  11. 163
      src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
  12. 335
      src/csharp/Grpc.Core/Server.cs
  13. 36
      src/csharp/Grpc.Examples.MathServer/MathServer.cs
  14. 3
      src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
  15. 122
      src/ruby/ext/grpc/rb_call.c
  16. 11
      test/core/channel/channel_stack_test.c
  17. 9
      test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c

@ -139,7 +139,8 @@ static void remove_waiting_child(channel_data *chand, call_data *calld) {
chand->waiting_child_count = new_count;
}
static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_op *op) {
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
@ -149,10 +150,10 @@ static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport
char status[GPR_LTOA_MIN_BUFSIZE];
grpc_metadata_batch mdb;
gpr_ltoa(GRPC_STATUS_CANCELLED, status);
calld->s.cancelled.status.md = grpc_mdelem_from_strings(chand->mdctx,
"grpc-status", status);
calld->s.cancelled.details.md = grpc_mdelem_from_strings(chand->mdctx,
"grpc-message", "Cancelled");
calld->s.cancelled.status.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
calld->s.cancelled.details.md =
grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
calld->s.cancelled.status.prev = calld->s.cancelled.details.next = NULL;
calld->s.cancelled.status.next = &calld->s.cancelled.details;
calld->s.cancelled.details.prev = &calld->s.cancelled.status;
@ -199,8 +200,10 @@ static void cc_start_transport_op(grpc_call_element *elem,
gpr_mu_unlock(&chand->mu);
}
} else {
/* check to see if we should initiate a connection (if we're not already),
but don't do so until outside the lock to avoid re-entrancy problems if
/* check to see if we should initiate a connection (if we're not
already),
but don't do so until outside the lock to avoid re-entrancy
problems if
the callback is immediate */
int initiate_transport_setup = 0;
if (!chand->transport_setup_initiated) {
@ -212,9 +215,9 @@ static void cc_start_transport_op(grpc_call_element *elem,
if (chand->waiting_child_count == chand->waiting_child_capacity) {
chand->waiting_child_capacity =
GPR_MAX(chand->waiting_child_capacity * 2, 8);
chand->waiting_children =
gpr_realloc(chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
chand->waiting_children = gpr_realloc(
chand->waiting_children,
chand->waiting_child_capacity * sizeof(call_data *));
}
calld->s.waiting_op = *op;
chand->waiting_children[chand->waiting_child_count++] = calld;
@ -236,8 +239,10 @@ static void cc_start_transport_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, &waiting_op);
handle_op_after_cancellation(elem, op);
} else {
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) != (op->send_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) != (op->recv_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.send_ops == NULL) !=
(op->send_ops == NULL));
GPR_ASSERT((calld->s.waiting_op.recv_ops == NULL) !=
(op->recv_ops == NULL));
if (op->send_ops) {
calld->s.waiting_op.send_ops = op->send_ops;
calld->s.waiting_op.is_last_send = op->is_last_send;

@ -76,7 +76,8 @@ static void on_credentials_metadata(void *user_data, grpc_mdelem **md_elems,
grpc_metadata_batch *mdb;
size_t i;
GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT);
GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx && op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
GPR_ASSERT(op->send_ops && op->send_ops->nops > calld->op_md_idx &&
op->send_ops->ops[calld->op_md_idx].type == GRPC_OP_METADATA);
mdb = &op->send_ops->ops[calld->op_md_idx].data.metadata;
for (i = 0; i < num_md; i++) {
grpc_metadata_batch_add_tail(mdb, &calld->md_links[i],
@ -105,7 +106,8 @@ static char *build_service_url(const char *url_scheme, call_data *calld) {
return service_url;
}
static void send_security_metadata(grpc_call_element *elem, grpc_transport_op *op) {
static void send_security_metadata(grpc_call_element *elem,
grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -144,7 +146,9 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
char *error_msg;
gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.",
grpc_mdstr_as_c_string(calld->host));
grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(chand->md_ctx, error_msg));
grpc_transport_op_add_cancellation(
&calld->op, GRPC_STATUS_UNAUTHENTICATED,
grpc_mdstr_from_string(chand->md_ctx, error_msg));
gpr_free(error_msg);
grpc_call_next_op(elem, &calld->op);
}
@ -156,7 +160,7 @@ static void on_host_checked(void *user_data, grpc_security_status status) {
op contains type and call direction information, in addition to the data
that is being sent or received. */
static void auth_start_transport_op(grpc_call_element *elem,
grpc_transport_op *op) {
grpc_transport_op *op) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *channeld = elem->channel_data;
@ -195,7 +199,9 @@ static void auth_start_transport_op(grpc_call_element *elem,
gpr_asprintf(&error_msg,
"Invalid host %s set in :authority metadata.",
call_host);
grpc_transport_op_add_cancellation(&calld->op, GRPC_STATUS_UNAUTHENTICATED, grpc_mdstr_from_string(channeld->md_ctx, error_msg));
grpc_transport_op_add_cancellation(
&calld->op, GRPC_STATUS_UNAUTHENTICATED,
grpc_mdstr_from_string(channeld->md_ctx, error_msg));
gpr_free(error_msg);
grpc_call_next_op(elem, &calld->op);
}
@ -220,7 +226,8 @@ static void channel_op(grpc_channel_element *elem,
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
const void *server_transport_data, grpc_transport_op *initial_op) {
const void *server_transport_data,
grpc_transport_op *initial_op) {
/* TODO(jboeuf):
Find a way to pass-in the credentials from the caller here. */
call_data *calld = elem->call_data;
@ -297,5 +304,6 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
}
const grpc_channel_filter grpc_client_auth_filter = {
auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem, destroy_call_elem,
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "auth"};
auth_start_transport_op, channel_op, sizeof(call_data), init_call_elem,
destroy_call_elem, sizeof(channel_data), init_channel_elem,
destroy_channel_elem, "auth"};

@ -72,7 +72,8 @@ static void state_unref(grpc_server_secure_state *state) {
static grpc_transport_setup_result setup_transport(void *server,
grpc_transport *transport,
grpc_mdctx *mdctx) {
static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
return grpc_server_setup_transport(server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}

@ -423,8 +423,7 @@ static void unlock(grpc_call *call) {
memset(&op, 0, sizeof(op));
if (!call->receiving &&
need_more_data(call)) {
if (!call->receiving && need_more_data(call)) {
op.recv_ops = &call->recv_ops;
op.recv_state = &call->recv_state;
op.on_done_recv = call_on_done_recv;

@ -210,7 +210,7 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
grpc_arg connector_arg;
grpc_channel_args *args_copy;
grpc_channel_args *new_args_from_connector;
grpc_channel_security_connector* connector;
grpc_channel_security_connector *connector;
grpc_mdctx *mdctx;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];

@ -731,8 +731,8 @@ static void stream_list_join(transport *t, stream *s, stream_list_id id) {
static void remove_from_stream_map(transport *t, stream *s) {
if (s->id == 0) return;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d", t->is_client? "CLI" : "SVR",
s->id));
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Removing stream %d",
t->is_client ? "CLI" : "SVR", s->id));
if (grpc_chttp2_stream_map_delete(&t->stream_map, s->id)) {
maybe_start_some_streams(t);
}
@ -1001,7 +1001,8 @@ static void maybe_start_some_streams(transport *t) {
stream *s = stream_list_remove_head(t, WAITING_FOR_CONCURRENCY);
if (!s) break;
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d", t->is_client? "CLI" : "SVR", s, t->next_stream_id));
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: Allocating new stream %p to id %d",
t->is_client ? "CLI" : "SVR", s, t->next_stream_id));
GPR_ASSERT(s->id == 0);
s->id = t->next_stream_id;
@ -1015,7 +1016,8 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->cancel_with_status != GRPC_STATUS_OK) {
cancel_stream(
t, s, op->cancel_with_status,
grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status), op->cancel_message, 1);
grpc_chttp2_grpc_status_to_http2_error(op->cancel_with_status),
op->cancel_message, 1);
}
if (op->send_ops) {
@ -1028,7 +1030,9 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
s->write_state = WRITE_STATE_QUEUED_CLOSE;
}
if (s->id == 0) {
IF_TRACING(gpr_log(GPR_DEBUG, "HTTP:%s: New stream %p waiting for concurrency", t->is_client? "CLI" : "SVR", s));
IF_TRACING(gpr_log(GPR_DEBUG,
"HTTP:%s: New stream %p waiting for concurrency",
t->is_client ? "CLI" : "SVR", s));
stream_list_join(t, s, WAITING_FOR_CONCURRENCY);
maybe_start_some_streams(t);
} else if (s->outgoing_window > 0) {
@ -1120,8 +1124,7 @@ static void add_incoming_metadata(transport *t, stream *s, grpc_mdelem *elem) {
static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
grpc_status_code local_status,
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message,
int send_rst) {
grpc_mdstr *optional_message, int send_rst) {
int had_outgoing;
char buffer[GPR_LTOA_MIN_BUFSIZE];
@ -1157,7 +1160,12 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
break;
}
} else {
add_incoming_metadata(t, s, grpc_mdelem_from_metadata_strings(t->metadata_context, grpc_mdstr_from_string(t->metadata_context, "grpc-message"), grpc_mdstr_ref(optional_message)));
add_incoming_metadata(
t, s,
grpc_mdelem_from_metadata_strings(
t->metadata_context,
grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
grpc_mdstr_ref(optional_message)));
}
add_metadata_batch(t, s);
maybe_finish_read(t, s);
@ -1182,8 +1190,10 @@ static void cancel_stream_id(transport *t, gpr_uint32 id,
static void cancel_stream(transport *t, stream *s,
grpc_status_code local_status,
grpc_chttp2_error_code error_code, grpc_mdstr *optional_message, int send_rst) {
cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message, send_rst);
grpc_chttp2_error_code error_code,
grpc_mdstr *optional_message, int send_rst) {
cancel_stream_inner(t, s, s->id, local_status, error_code, optional_message,
send_rst);
}
static void cancel_stream_cb(void *user_data, gpr_uint32 id, void *stream) {
@ -1310,10 +1320,9 @@ static void on_header(void *tp, grpc_mdelem *md) {
GPR_ASSERT(s);
IF_TRACING(gpr_log(GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id,
t->is_client? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key),
grpc_mdstr_as_c_string(md->value)));
IF_TRACING(gpr_log(
GPR_INFO, "HTTP:%d:%s:HDR: %s: %s", s->id, t->is_client ? "CLI" : "SVR",
grpc_mdstr_as_c_string(md->key), grpc_mdstr_as_c_string(md->value)));
if (md->key == t->str_grpc_timeout) {
gpr_timespec *cached_timeout = grpc_mdelem_get_user_data(md, free_timeout);

@ -95,7 +95,9 @@ void grpc_transport_op_finish_with_failure(grpc_transport_op *op) {
}
}
void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message) {
void grpc_transport_op_add_cancellation(grpc_transport_op *op,
grpc_status_code status,
grpc_mdstr *message) {
if (op->cancel_with_status == GRPC_STATUS_OK) {
op->cancel_with_status = status;
op->cancel_message = message;

@ -135,7 +135,9 @@ void grpc_transport_destroy_stream(grpc_transport *transport,
void grpc_transport_op_finish_with_failure(grpc_transport_op *op);
void grpc_transport_op_add_cancellation(grpc_transport_op *op, grpc_status_code status, grpc_mdstr *message);
void grpc_transport_op_add_cancellation(grpc_transport_op *op,
grpc_status_code status,
grpc_mdstr *message);
/* TODO(ctiller): remove this */
void grpc_transport_add_to_pollset(grpc_transport *transport,

@ -140,7 +140,8 @@ char *grpc_transport_op_string(grpc_transport_op *op) {
gpr_asprintf(&tmp, "CANCEL:%d", op->cancel_with_status);
gpr_strvec_add(&b, tmp);
if (op->cancel_message) {
gpr_asprintf(&tmp, ";msg='%s'", grpc_mdstr_as_c_string(op->cancel_message));
gpr_asprintf(&tmp, ";msg='%s'",
grpc_mdstr_as_c_string(op->cancel_message));
gpr_strvec_add(&b, tmp);
}
}

@ -34,158 +34,203 @@ using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core;
namespace Grpc.Core.Internal
{
internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
const uint GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")]
static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_blocking_unary(CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_call_destroy(IntPtr call);
private CallSafeHandle()
{
}
public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public void StartUnary(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
}
public void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray);
}
public void StartClientStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_client_streaming(this, callback, metadataArray));
}
public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong)payload.Length), metadataArray));
}
public void StartDuplexStreaming(CompletionCallbackDelegate callback, MetadataArraySafeHandle metadataArray)
{
AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
}
public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public void StartSendCloseFromClient(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail));
}
public void StartReceiveMessage(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
public void StartServerSide(CompletionCallbackDelegate callback)
{
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public void Cancel()
{
AssertCallOk(grpcsharp_call_cancel(this));
}
public void CancelWithStatus(Status status)
{
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail));
}
protected override bool ReleaseHandle()
{
grpcsharp_call_destroy(handle);
return true;
}
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)
{
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
namespace Grpc.Core.Internal {
internal delegate void CompletionCallbackDelegate(GRPCOpError error,
IntPtr batchContextPtr);
/// <summary>
/// grpc_call from <grpc/grpc.h>
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid {
const uint GRPC_WRITE_BUFFER_HINT = 1;
[DllImport("grpc_csharp_ext.dll")] static extern CallSafeHandle
grpcsharp_channel_create_call(ChannelSafeHandle channel,
CompletionQueueSafeHandle cq, string method,
string host, Timespec deadline);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status,
string description);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_start_unary(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_call_blocking_unary(
CallSafeHandle call, CompletionQueueSafeHandle dedicatedCq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_start_client_streaming(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_start_server_streaming(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
byte[] send_buffer, UIntPtr send_buffer_len,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_start_duplex_streaming(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_send_message(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
byte[] send_buffer, UIntPtr send_buffer_len);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_send_close_from_client(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_send_status_from_server(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback,
StatusCode statusCode, string statusMessage);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_recv_message(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback);
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_call_start_serverside(
CallSafeHandle call,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_call_destroy(IntPtr call);
private
CallSafeHandle() {}
public
static CallSafeHandle Create(ChannelSafeHandle channel,
CompletionQueueSafeHandle cq, string method,
string host, Timespec deadline) {
return grpcsharp_channel_create_call(channel, cq, method, host, deadline);
}
public
void StartUnary(byte[] payload, CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) {
AssertCallOk(grpcsharp_call_start_unary(
this, callback, payload, new UIntPtr((ulong)payload.Length),
metadataArray));
}
public
void BlockingUnary(CompletionQueueSafeHandle dedicatedCq, byte[] payload,
CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) {
grpcsharp_call_blocking_unary(this, dedicatedCq, callback, payload,
new UIntPtr((ulong)payload.Length),
metadataArray);
}
public
void StartClientStreaming(CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) {
AssertCallOk(
grpcsharp_call_start_client_streaming(this, callback, metadataArray));
}
public
void StartServerStreaming(byte[] payload,
CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) {
AssertCallOk(grpcsharp_call_start_server_streaming(
this, callback, payload, new UIntPtr((ulong)payload.Length),
metadataArray));
}
public
void StartDuplexStreaming(CompletionCallbackDelegate callback,
MetadataArraySafeHandle metadataArray) {
AssertCallOk(
grpcsharp_call_start_duplex_streaming(this, callback, metadataArray));
}
public
void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_send_message(
this, callback, payload, new UIntPtr((ulong)payload.Length)));
}
public
void StartSendCloseFromClient(CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_send_close_from_client(this, callback));
}
public
void StartSendStatusFromServer(Status status,
CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_send_status_from_server(
this, callback, status.StatusCode, status.Detail));
}
public
void StartReceiveMessage(CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_recv_message(this, callback));
}
public
void StartServerSide(CompletionCallbackDelegate callback) {
AssertCallOk(grpcsharp_call_start_serverside(this, callback));
}
public
void Cancel() { AssertCallOk(grpcsharp_call_cancel(this)); }
public
void CancelWithStatus(Status status) {
AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode,
status.Detail));
}
protected
override bool ReleaseHandle() {
grpcsharp_call_destroy(handle);
return true;
}
private
static void AssertCallOk(GRPCCallError callError) {
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK,
"Status not GRPC_CALL_OK");
}
private
static uint GetFlags(bool buffered) {
return buffered ? 0 : GRPC_WRITE_BUFFER_HINT;
}
}
}

@ -36,84 +36,89 @@ using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
namespace Grpc.Core.Internal
{
// TODO: we need to make sure that the delegates are not collected before invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")]
static extern int grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_shutdown(ServerSafeHandle server);
// TODO: get rid of the old callback style
[DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")]
static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_destroy(IntPtr server);
private ServerSafeHandle()
{
}
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args)
{
return grpcsharp_server_create(cq, args);
}
public int AddListeningPort(string addr)
{
return grpcsharp_server_add_http2_port(this, addr);
}
public int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials)
{
return grpcsharp_server_add_secure_http2_port(this, addr, credentials);
}
public void Start()
{
grpcsharp_server_start(this);
}
public void Shutdown()
{
grpcsharp_server_shutdown(this);
}
public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback)
{
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
return grpcsharp_server_request_call(this, cq, callback);
}
protected override bool ReleaseHandle()
{
grpcsharp_server_destroy(handle);
return true;
}
namespace Grpc.Core.Internal {
// TODO: we need to make sure that the delegates are not collected before
// invoked.
internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr);
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid {
[DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError
grpcsharp_server_request_call(
ServerSafeHandle server, CompletionQueueSafeHandle cq,
[MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate
callback);
[DllImport("grpc_csharp_ext.dll")] static extern ServerSafeHandle
grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args);
[DllImport("grpc_csharp_ext.dll")] static extern int
grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr);
[DllImport("grpc_csharp_ext.dll")] static extern int
grpcsharp_server_add_secure_http2_port(ServerSafeHandle server, string addr,
ServerCredentialsSafeHandle creds);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_server_shutdown(ServerSafeHandle server);
// TODO: get rid of the old callback style
[DllImport(
"grpc_csharp_ext.dll",
EntryPoint = "grpcsharp_server_shutdown_and_notify")] static extern void
grpcsharp_server_shutdown_and_notify_CALLBACK(
ServerSafeHandle server,
[MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate
callback);
[DllImport("grpc_csharp_ext.dll")] static extern void
grpcsharp_server_destroy(IntPtr server);
private
ServerSafeHandle() {}
public
static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq,
IntPtr args) {
return grpcsharp_server_create(cq, args);
}
public
int AddListeningPort(string addr) {
return grpcsharp_server_add_http2_port(this, addr);
}
public
int AddListeningPort(string addr, ServerCredentialsSafeHandle credentials) {
return grpcsharp_server_add_secure_http2_port(this, addr, credentials);
}
public
void Start() { grpcsharp_server_start(this); }
public
void Shutdown() { grpcsharp_server_shutdown(this); }
public
void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) {
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
public
GRPCCallError RequestCall(CompletionQueueSafeHandle cq,
CompletionCallbackDelegate callback) {
return grpcsharp_server_request_call(this, cq, callback);
}
protected
override bool ReleaseHandle() {
grpcsharp_server_destroy(handle);
return true;
}
}
}

@ -39,205 +39,186 @@ using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
namespace Grpc.Core
{
/// <summary>
/// Server is implemented only to be able to do
/// in-process testing.
/// </summary>
public class Server
{
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
namespace Grpc.Core {
/// <summary>
/// Server is implemented only to be able to do
/// in-process testing.
/// </summary>
public
class Server {
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue =
new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
readonly Dictionary<string, IServerCallHandler> callHandlers =
new Dictionary<string, IServerCallHandler>();
readonly TaskCompletionSource<object> shutdownTcs =
new TaskCompletionSource<object>();
public
Server() {
this.handle =
ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
// only call this before Start()
public
void AddServiceDefinition(ServerServiceDefinition serviceDefinition) {
foreach (var entry in serviceDefinition.CallHandlers) {
callHandlers.Add(entry.Key, entry.Value);
}
}
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
// only call before Start()
public
int AddListeningPort(string addr) { return handle.AddListeningPort(addr); }
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
// only call before Start()
public
int AddListeningPort(string addr, ServerCredentials credentials) {
using(var nativeCredentials = credentials.ToNativeCredentials()) {
return handle.AddListeningPort(addr, nativeCredentials);
}
}
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newServerRpcHandler = HandleNewServerRpc;
this.serverShutdownHandler = HandleServerShutdown;
}
public
void Start() {
handle.Start();
// only call this before Start()
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
{
foreach (var entry in serviceDefinition.CallHandlers)
{
callHandlers.Add(entry.Key, entry.Value);
}
}
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
}
// only call before Start()
public int AddListeningPort(string addr)
{
return handle.AddListeningPort(addr);
}
/// <summary>
/// Requests and handles single RPC call.
/// </summary>
internal void RunRpc() {
AllowOneRpc();
// only call before Start()
public int AddListeningPort(string addr, ServerCredentials credentials)
{
using (var nativeCredentials = credentials.ToNativeCredentials())
{
return handle.AddListeningPort(addr, nativeCredentials);
}
}
try {
var rpcInfo = newRpcQueue.Take();
public void Start()
{
handle.Start();
// Console.WriteLine("Server received RPC " + rpcInfo.Method);
// TODO: this basically means the server is single threaded....
StartHandlingRpcs();
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) {
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call,
GetCompletionQueue());
} catch (Exception e) {
Console.WriteLine("Exception while handling RPC: " + e);
}
}
/// <summary>
/// Requests and handles single RPC call.
/// </summary>
internal void RunRpc()
{
AllowOneRpc();
try
{
var rpcInfo = newRpcQueue.Take();
// Console.WriteLine("Server received RPC " + rpcInfo.Method);
IServerCallHandler callHandler;
if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
{
callHandler = new NoSuchMethodCallHandler();
}
callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
catch (Exception e)
{
Console.WriteLine("Exception while handling RPC: " + e);
}
}
/// <summary>
/// Requests server shutdown and when there are no more calls being
/// serviced,
/// cleans up used resources.
/// </summary>
/// <returns>The async.</returns>
public
async Task ShutdownAsync() {
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
}
/// <summary>
/// Requests server shutdown and when there are no more calls being serviced,
/// cleans up used resources.
/// </summary>
/// <returns>The async.</returns>
public async Task ShutdownAsync()
{
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
}
/// <summary>
/// To allow awaiting termination of the server.
/// </summary>
public
Task ShutdownTask {
get { return shutdownTcs.Task; }
}
/// <summary>
/// To allow awaiting termination of the server.
/// </summary>
public Task ShutdownTask
{
get
{
return shutdownTcs.Task;
}
}
public
void Kill() { handle.Dispose(); }
public void Kill()
{
handle.Dispose();
}
private
async Task StartHandlingRpcs() {
while (true) {
await Task.Factory.StartNew(RunRpc);
}
}
private async Task StartHandlingRpcs()
{
while (true)
{
await Task.Factory.StartNew(RunRpc);
}
}
private
void AllowOneRpc() {
AssertCallOk(
handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
}
private void AllowOneRpc()
{
AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
}
private
void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) {
try {
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
{
try
{
var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr);
if (error != GRPCOpError.GRPC_OP_OK)
{
// TODO: handle error
}
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid)
{
newRpcQueue.Add(rpcInfo);
}
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
if (error != GRPCOpError.GRPC_OP_OK) {
// TODO: handle error
}
private void HandleServerShutdown(IntPtr eventPtr)
{
try
{
shutdownTcs.SetResult(null);
}
catch (Exception e)
{
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(),
ctx.GetServerRpcNewMethod());
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
// after server shutdown, the callback returns with null call
if (!rpcInfo.Call.IsInvalid) {
newRpcQueue.Add(rpcInfo);
}
} catch (Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
private
void HandleServerShutdown(IntPtr eventPtr) {
try {
shutdownTcs.SetResult(null);
} catch (Exception e) {
Console.WriteLine("Caught exception in a native handler: " + e);
}
}
private struct NewRpcInfo
{
private CallSafeHandle call;
private string method;
public NewRpcInfo(CallSafeHandle call, string method)
{
this.call = call;
this.method = method;
}
public CallSafeHandle Call
{
get
{
return this.call;
}
}
public string Method
{
get
{
return this.method;
}
}
}
private
static void AssertCallOk(GRPCCallError callError) {
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK,
"Status not GRPC_CALL_OK");
}
private
static CompletionQueueSafeHandle GetCompletionQueue() {
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
private
struct NewRpcInfo {
private
CallSafeHandle call;
private
string method;
public
NewRpcInfo(CallSafeHandle call, string method) {
this.call = call;
this.method = method;
}
public
CallSafeHandle Call {
get { return this.call; }
}
public
string Method {
get { return this.method; }
}
}
}
}

@ -34,28 +34,26 @@ using System.Runtime.InteropServices;
using System.Threading;
using Grpc.Core;
namespace math
{
class MainClass
{
public static void Main(string[] args)
{
String host = "0.0.0.0";
namespace math {
class MainClass {
public
static void Main(string[] args) {
String host = "0.0.0.0";
GrpcEnvironment.Initialize();
GrpcEnvironment.Initialize();
Server server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host + ":23456");
server.Start();
Server server = new Server();
server.AddServiceDefinition(MathGrpc.BindService(new MathServiceImpl()));
int port = server.AddListeningPort(host + ":23456");
server.Start();
Console.WriteLine("MathServer listening on port " + port);
Console.WriteLine("MathServer listening on port " + port);
Console.WriteLine("Press any key to stop the server...");
Console.ReadKey();
Console.WriteLine("Press any key to stop the server...");
Console.ReadKey();
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
}
}

@ -16,7 +16,8 @@
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug</OutputPath>
<DefineConstants>DEBUG;</DefineConstants>
<DefineConstants>DEBUG;
</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<Externalconsole>true</Externalconsole>

@ -118,35 +118,36 @@ static void grpc_rb_call_destroy(void *p) {
}
static size_t md_ary_datasize(const void *p) {
const grpc_metadata_array* const ary = (grpc_metadata_array*)p;
size_t i, datasize = sizeof(grpc_metadata_array);
for (i = 0; i < ary->count; ++i) {
const grpc_metadata* const md = &ary->metadata[i];
datasize += strlen(md->key);
datasize += md->value_length;
}
datasize += ary->capacity * sizeof(grpc_metadata);
return datasize;
const grpc_metadata_array *const ary = (grpc_metadata_array *)p;
size_t i, datasize = sizeof(grpc_metadata_array);
for (i = 0; i < ary->count; ++i) {
const grpc_metadata *const md = &ary->metadata[i];
datasize += strlen(md->key);
datasize += md->value_length;
}
datasize += ary->capacity * sizeof(grpc_metadata);
return datasize;
}
static const rb_data_type_t grpc_rb_md_ary_data_type = {
"grpc_metadata_array",
{GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, md_ary_datasize},
NULL, NULL,
0
};
NULL,
NULL,
0};
/* Describes grpc_call struct for RTypedData */
static const rb_data_type_t grpc_call_data_type = {
"grpc_call",
{GRPC_RB_GC_NOT_MARKED, grpc_rb_call_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE},
NULL, NULL,
/* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because grpc_rb_call_destroy
NULL,
NULL,
/* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
* grpc_rb_call_destroy
* touches a hash object.
* TODO(yugui) Directly use st_table and call the free function earlier?
*/
0
};
0};
/* Error code details is a hash containing text strings describing errors */
VALUE rb_error_code_details;
@ -250,7 +251,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
}
md_ary->metadata[md_ary->count].value = RSTRING_PTR(rb_ary_entry(val, i));
md_ary->metadata[md_ary->count].value_length =
RSTRING_LEN(rb_ary_entry(val, i));
RSTRING_LEN(rb_ary_entry(val, i));
md_ary->count += 1;
}
} else {
@ -290,10 +291,11 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
/* grpc_rb_md_ary_convert converts a ruby metadata hash into
a grpc_metadata_array.
*/
static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
static void grpc_rb_md_ary_convert(VALUE md_ary_hash,
grpc_metadata_array *md_ary) {
VALUE md_ary_obj = Qnil;
if (md_ary_hash == Qnil) {
return; /* Do nothing if the expected has value is nil */
return; /* Do nothing if the expected has value is nil */
}
if (TYPE(md_ary_hash) != T_HASH) {
rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
@ -303,8 +305,8 @@ static void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ar
/* Initialize the array, compute it's capacity, then fill it. */
grpc_metadata_array_init(md_ary);
md_ary_obj = TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type,
md_ary);
md_ary_obj =
TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
@ -327,16 +329,14 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary) {
rb_hash_aset(result, key, value);
} else if (TYPE(value) == T_ARRAY) {
/* Add the string to the returned array */
rb_ary_push(value,
rb_str_new(md_ary->metadata[i].value,
md_ary->metadata[i].value_length));
rb_ary_push(value, rb_str_new(md_ary->metadata[i].value,
md_ary->metadata[i].value_length));
} else {
/* Add the current value with this key and the new one to an array */
new_ary = rb_ary_new();
rb_ary_push(new_ary, value);
rb_ary_push(new_ary,
rb_str_new(md_ary->metadata[i].value,
md_ary->metadata[i].value_length));
rb_ary_push(new_ary, rb_str_new(md_ary->metadata[i].value,
md_ary->metadata[i].value_length));
rb_hash_aset(result, key, new_ary);
}
}
@ -355,7 +355,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_obj_classname(key));
return ST_STOP;
}
switch(NUM2INT(key)) {
switch (NUM2INT(key)) {
case GRPC_OP_SEND_INITIAL_METADATA:
case GRPC_OP_SEND_MESSAGE:
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
@ -367,8 +367,7 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
rb_ary_push(ops_ary, key);
return ST_CONTINUE;
default:
rb_raise(rb_eTypeError, "invalid operation : bad value %d",
NUM2INT(key));
rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
};
return ST_STOP;
}
@ -377,8 +376,8 @@ static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
struct to the 'send_status_from_server' portion of an op.
*/
static void grpc_rb_op_update_status_from_server(grpc_op *op,
grpc_metadata_array* md_ary,
VALUE status) {
grpc_metadata_array *md_ary,
VALUE status) {
VALUE code = rb_struct_aref(status, sym_code);
VALUE details = rb_struct_aref(status, sym_details);
VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
@ -405,8 +404,8 @@ static void grpc_rb_op_update_status_from_server(grpc_op *op,
* grpc_rb_call_run_batch function */
typedef struct run_batch_stack {
/* The batch ops */
grpc_op ops[8]; /* 8 is the maximum number of operations */
size_t op_num; /* tracks the last added operation */
grpc_op ops[8]; /* 8 is the maximum number of operations */
size_t op_num; /* tracks the last added operation */
/* Data being sent */
grpc_metadata_array send_metadata;
@ -424,7 +423,7 @@ typedef struct run_batch_stack {
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
static void grpc_run_batch_stack_init(run_batch_stack* st) {
static void grpc_run_batch_stack_init(run_batch_stack *st) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
@ -435,7 +434,7 @@ static void grpc_run_batch_stack_init(run_batch_stack* st) {
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
* cleaned up */
static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
grpc_metadata_array_destroy(&st->send_metadata);
grpc_metadata_array_destroy(&st->send_trailing_metadata);
grpc_metadata_array_destroy(&st->recv_metadata);
@ -447,7 +446,7 @@ static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
/* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
* ops_hash */
static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
VALUE this_op = Qnil;
VALUE this_value = Qnil;
VALUE ops_ary = rb_ary_new();
@ -460,7 +459,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
switch(NUM2INT(this_op)) {
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
@ -471,18 +470,16 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
st->send_metadata.metadata;
break;
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message =
grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
RSTRING_LEN(this_value));
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
/* N.B. later there is no need to explicitly delete the metadata keys
* and values, they are references to data in ruby objects. */
grpc_rb_op_update_status_from_server(&st->ops[st->op_num],
&st->send_trailing_metadata,
this_value);
grpc_rb_op_update_status_from_server(
&st->ops[st->op_num], &st->send_trailing_metadata, this_value);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
st->ops[st->op_num].data.recv_initial_metadata = &st->recv_metadata;
@ -516,12 +513,12 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
/* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
after the results have run */
static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
size_t i = 0;
VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
Qnil, Qnil, Qnil, Qnil, NULL);
for (i = 0; i < st->op_num; i++) {
switch(st->ops[i].op) {
switch (st->ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
rb_struct_aset(result, sym_send_metadata, Qtrue);
break;
@ -544,13 +541,11 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
break;
case GRPC_OP_RECV_STATUS_ON_CLIENT:
rb_struct_aset(
result,
sym_status,
rb_struct_new(grpc_rb_sStatus,
UINT2NUM(st->recv_status),
result, sym_status,
rb_struct_new(grpc_rb_sStatus, UINT2NUM(st->recv_status),
(st->recv_status_details == NULL
? Qnil
: rb_str_new2(st->recv_status_details)),
? Qnil
: rb_str_new2(st->recv_status_details)),
grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
NULL));
break;
@ -682,8 +677,7 @@ static void Init_grpc_error_codes() {
static void Init_grpc_op_codes() {
/* Constants representing operation type codes in grpc.h */
VALUE grpc_rb_mCallOps =
rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
@ -709,14 +703,14 @@ void Init_grpc_call() {
grpc_rb_eOutOfTime =
rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
grpc_rb_cMdAry = rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray",
rb_cObject);
grpc_rb_cMdAry =
rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
/* Prevent allocation or inialization of the Call class */
rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
rb_define_method(grpc_rb_cCall, "initialize_copy",
grpc_rb_cannot_init_copy, 1);
rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
1);
/* Add ruby analogues of the Call methods. */
rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
@ -746,16 +740,8 @@ void Init_grpc_call() {
/* The Struct used to return the run_batch result. */
grpc_rb_sBatchResult = rb_struct_define(
"BatchResult",
"send_message",
"send_metadata",
"send_close",
"send_status",
"message",
"metadata",
"status",
"cancelled",
NULL);
"BatchResult", "send_message", "send_metadata", "send_close",
"send_status", "message", "metadata", "status", "cancelled", NULL);
/* The hash for reference counting calls, to ensure they can't be destroyed
* more than once */

@ -55,7 +55,8 @@ static void channel_init_func(grpc_channel_element *elem,
}
static void call_init_func(grpc_call_element *elem,
const void *server_transport_data, grpc_transport_op *initial_op) {
const void *server_transport_data,
grpc_transport_op *initial_op) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
}
@ -66,8 +67,7 @@ static void call_destroy_func(grpc_call_element *elem) {
++*(int *)(elem->channel_data);
}
static void call_func(grpc_call_element *elem,
grpc_transport_op *op) {
static void call_func(grpc_call_element *elem, grpc_transport_op *op) {
++*(int *)(elem->call_data);
}
@ -78,9 +78,8 @@ static void channel_func(grpc_channel_element *elem,
static void test_create_channel_stack(void) {
const grpc_channel_filter filter = {
call_func, channel_func, sizeof(int),
call_init_func, call_destroy_func, sizeof(int),
channel_init_func, channel_destroy_func, "some_test_filter"};
call_func, channel_func, sizeof(int), call_init_func, call_destroy_func,
sizeof(int), channel_init_func, channel_destroy_func, "some_test_filter"};
const grpc_channel_filter *filters = &filter;
grpc_channel_stack *channel_stack;
grpc_call_stack *call_stack;

@ -59,7 +59,8 @@
static grpc_transport_setup_result server_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {&grpc_http_server_filter};
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
return grpc_server_setup_transport(f->server, transport, extra_filters,
GPR_ARRAY_SIZE(extra_filters), mdctx);
}
@ -73,9 +74,9 @@ static grpc_transport_setup_result client_setup_transport(
void *ts, grpc_transport *transport, grpc_mdctx *mdctx) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {
&grpc_client_surface_filter, &grpc_http_client_filter,
&grpc_connected_channel_filter};
const grpc_channel_filter *filters[] = {&grpc_client_surface_filter,
&grpc_http_client_filter,
&grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
grpc_channel *channel = grpc_channel_create_from_filters(
filters, nfilters, cs->client_args, mdctx, 1);

Loading…
Cancel
Save