Merge pull request #357 from ctiller/async-api

Back-end for new core API
pull/383/head^2
Yang Gao 10 years ago
commit 82a45834ec
  1. 7
      Makefile
  2. 2
      build.json
  3. 70
      include/grpc/grpc.h
  4. 16
      src/core/channel/channel_stack.c
  5. 8
      src/core/channel/connected_channel.c
  6. 4
      src/core/iomgr/pollset_posix.c
  7. 78
      src/core/surface/byte_buffer_queue.c
  8. 59
      src/core/surface/byte_buffer_queue.h
  9. 1668
      src/core/surface/call.c
  10. 72
      src/core/surface/call.h
  11. 18
      src/core/surface/channel.c
  12. 12
      src/core/surface/client.c
  13. 25
      src/core/surface/completion_queue.c
  14. 4
      src/core/surface/completion_queue.h
  15. 6
      src/core/surface/event_string.c
  16. 16
      src/core/surface/lame_client.c
  17. 261
      src/core/surface/server.c
  18. 2
      src/core/transport/chttp2/stream_encoder.c
  19. 2
      src/core/transport/chttp2_transport.c
  20. 1
      src/cpp/client/channel.cc
  21. 2
      templates/Makefile.template
  22. 16
      test/core/end2end/cq_verifier.c
  23. 1
      test/core/end2end/dualstack_socket_test.c
  24. 1
      test/core/end2end/tests/census_simple_request.c
  25. 3
      test/core/end2end/tests/max_concurrent_streams.c
  26. 5
      test/core/end2end/tests/simple_request.c
  27. 27
      test/core/surface/completion_queue_test.c
  28. 3
      tools/dockerfile/grpc_node_base/Dockerfile
  29. 3
      tools/dockerfile/grpc_php_base/Dockerfile
  30. 3
      tools/dockerfile/grpc_ruby_base/Dockerfile
  31. 3
      vsprojects/vs2013/grpc.vcxproj
  32. 6
      vsprojects/vs2013/grpc.vcxproj.filters
  33. 3
      vsprojects/vs2013/grpc_unsecure.vcxproj
  34. 6
      vsprojects/vs2013/grpc_unsecure.vcxproj.filters

@ -189,11 +189,13 @@ OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/ope
ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
PERFTOOLS_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/perftools.c -lprofiler $(LDFLAGS)
ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
HAS_SYSTEM_PERFTOOLS = $(shell $(PERFTOOLS_CHECK_CMD) 2> /dev/null && echo true || echo false)
ifeq ($(HAS_SYSTEM_PERFTOOLS),true)
DEFINES += GRPC_HAVE_PERFTOOLS
LIBS += profiler
endif
endif
ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)
@ -1449,6 +1451,7 @@ LIBGRPC_SRC = \
src/core/statistics/hash_table.c \
src/core/statistics/window_stats.c \
src/core/surface/byte_buffer.c \
src/core/surface/byte_buffer_queue.c \
src/core/surface/byte_buffer_reader.c \
src/core/surface/call.c \
src/core/surface/channel.c \
@ -1575,6 +1578,7 @@ src/core/statistics/census_tracing.c: $(OPENSSL_DEP)
src/core/statistics/hash_table.c: $(OPENSSL_DEP)
src/core/statistics/window_stats.c: $(OPENSSL_DEP)
src/core/surface/byte_buffer.c: $(OPENSSL_DEP)
src/core/surface/byte_buffer_queue.c: $(OPENSSL_DEP)
src/core/surface/byte_buffer_reader.c: $(OPENSSL_DEP)
src/core/surface/call.c: $(OPENSSL_DEP)
src/core/surface/channel.c: $(OPENSSL_DEP)
@ -1723,6 +1727,7 @@ objs/$(CONFIG)/src/core/statistics/census_tracing.o:
objs/$(CONFIG)/src/core/statistics/hash_table.o:
objs/$(CONFIG)/src/core/statistics/window_stats.o:
objs/$(CONFIG)/src/core/surface/byte_buffer.o:
objs/$(CONFIG)/src/core/surface/byte_buffer_queue.o:
objs/$(CONFIG)/src/core/surface/byte_buffer_reader.o:
objs/$(CONFIG)/src/core/surface/call.o:
objs/$(CONFIG)/src/core/surface/channel.o:
@ -1890,6 +1895,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/statistics/hash_table.c \
src/core/statistics/window_stats.c \
src/core/surface/byte_buffer.c \
src/core/surface/byte_buffer_queue.c \
src/core/surface/byte_buffer_reader.c \
src/core/surface/call.c \
src/core/surface/channel.c \
@ -2021,6 +2027,7 @@ objs/$(CONFIG)/src/core/statistics/census_tracing.o:
objs/$(CONFIG)/src/core/statistics/hash_table.o:
objs/$(CONFIG)/src/core/statistics/window_stats.o:
objs/$(CONFIG)/src/core/surface/byte_buffer.o:
objs/$(CONFIG)/src/core/surface/byte_buffer_queue.o:
objs/$(CONFIG)/src/core/surface/byte_buffer_reader.o:
objs/$(CONFIG)/src/core/surface/call.o:
objs/$(CONFIG)/src/core/surface/channel.o:

@ -73,6 +73,7 @@
"src/core/statistics/census_tracing.h",
"src/core/statistics/hash_table.h",
"src/core/statistics/window_stats.h",
"src/core/surface/byte_buffer_queue.h",
"src/core/surface/call.h",
"src/core/surface/channel.h",
"src/core/surface/client.h",
@ -159,6 +160,7 @@
"src/core/statistics/hash_table.c",
"src/core/statistics/window_stats.c",
"src/core/surface/byte_buffer.c",
"src/core/surface/byte_buffer_queue.c",
"src/core/surface/byte_buffer_reader.c",
"src/core/surface/call.c",
"src/core/surface/channel.c",

@ -183,17 +183,16 @@ typedef struct grpc_metadata {
} grpc_metadata;
typedef enum grpc_completion_type {
GRPC_QUEUE_SHUTDOWN, /* Shutting down */
GRPC_READ, /* A read has completed */
GRPC_INVOKE_ACCEPTED, /* An invoke call has been accepted by flow
control */
GRPC_WRITE_ACCEPTED, /* A write has been accepted by
flow control */
GRPC_QUEUE_SHUTDOWN, /* Shutting down */
GRPC_IOREQ, /* grpc_call_ioreq completion */
GRPC_READ, /* A read has completed */
GRPC_WRITE_ACCEPTED, /* A write has been accepted by
flow control */
GRPC_FINISH_ACCEPTED, /* writes_done or write_status has been accepted */
GRPC_CLIENT_METADATA_READ, /* The metadata array sent by server received at
client */
GRPC_FINISHED, /* An RPC has finished. The event contains status.
On the server this will be OK or Cancelled. */
GRPC_FINISHED, /* An RPC has finished. The event contains status.
On the server this will be OK or Cancelled. */
GRPC_SERVER_RPC_NEW, /* A new RPC has arrived at the server */
GRPC_SERVER_SHUTDOWN, /* The server has finished shutting down */
GRPC_COMPLETION_DO_NOT_USE /* must be last, forces users to include
@ -213,6 +212,7 @@ typedef struct grpc_event {
grpc_op_error write_accepted;
grpc_op_error finish_accepted;
grpc_op_error invoke_accepted;
grpc_op_error ioreq;
struct {
size_t count;
grpc_metadata *elements;
@ -233,6 +233,57 @@ typedef struct grpc_event {
} data;
} grpc_event;
typedef struct {
size_t count;
size_t capacity;
grpc_metadata *metadata;
} grpc_metadata_array;
typedef struct {
const char *method;
const char *host;
gpr_timespec deadline;
} grpc_call_details;
typedef enum {
GRPC_OP_SEND_INITIAL_METADATA = 0,
GRPC_OP_SEND_MESSAGE,
GRPC_OP_SEND_CLOSE_FROM_CLIENT,
GRPC_OP_SEND_STATUS_FROM_SERVER,
GRPC_OP_RECV_INITIAL_METADATA,
GRPC_OP_RECV_MESSAGES,
GRPC_OP_RECV_STATUS_ON_CLIENT,
GRPC_OP_RECV_CLOSE_ON_SERVER
} grpc_op_type;
typedef struct grpc_op {
grpc_op_type op;
union {
struct {
size_t count;
const grpc_metadata *metadata;
} send_initial_metadata;
grpc_byte_buffer *send_message;
struct {
size_t trailing_metadata_count;
grpc_metadata *trailing_metadata;
grpc_status_code status;
const char *status_details;
} send_status_from_server;
grpc_metadata_array *recv_initial_metadata;
grpc_byte_buffer **recv_message;
struct {
grpc_metadata_array *trailing_metadata;
grpc_status_code *status;
char **status_details;
size_t *status_details_capacity;
} recv_status_on_client;
struct {
int *cancelled;
} recv_close_on_server;
} data;
} grpc_op;
/* Initialize the grpc library */
void grpc_init(void);
@ -279,6 +330,9 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
const char *method, const char *host,
gpr_timespec deadline);
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t nops, void *tag);
/* Create a client channel */
grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_args *args);

@ -210,6 +210,7 @@ void grpc_call_element_recv_metadata(grpc_call_element *cur_elem,
metadata_op.dir = GRPC_CALL_UP;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
@ -221,6 +222,7 @@ void grpc_call_element_send_metadata(grpc_call_element *cur_elem,
metadata_op.dir = GRPC_CALL_DOWN;
metadata_op.done_cb = do_nothing;
metadata_op.user_data = NULL;
metadata_op.flags = 0;
metadata_op.data.metadata = mdelem;
grpc_call_next_op(cur_elem, &metadata_op);
}
@ -231,14 +233,16 @@ void grpc_call_element_send_cancel(grpc_call_element *cur_elem) {
cancel_op.dir = GRPC_CALL_DOWN;
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
cancel_op.flags = 0;
grpc_call_next_op(cur_elem, &cancel_op);
}
void grpc_call_element_send_finish(grpc_call_element *cur_elem) {
grpc_call_op cancel_op;
cancel_op.type = GRPC_SEND_FINISH;
cancel_op.dir = GRPC_CALL_DOWN;
cancel_op.done_cb = do_nothing;
cancel_op.user_data = NULL;
grpc_call_next_op(cur_elem, &cancel_op);
grpc_call_op finish_op;
finish_op.type = GRPC_SEND_FINISH;
finish_op.dir = GRPC_CALL_DOWN;
finish_op.done_cb = do_nothing;
finish_op.user_data = NULL;
finish_op.flags = 0;
grpc_call_next_op(cur_elem, &finish_op);
}

@ -298,10 +298,6 @@ static void recv_error(channel_data *chand, call_data *calld, int line,
static void do_nothing(void *calldata, grpc_op_error error) {}
static void done_message(void *user_data, grpc_op_error error) {
grpc_byte_buffer_destroy(user_data);
}
static void finish_message(channel_data *chand, call_data *calld) {
grpc_call_element *elem = calld->elem;
grpc_call_op call_op;
@ -309,9 +305,9 @@ static void finish_message(channel_data *chand, call_data *calld) {
call_op.flags = 0;
/* if we got all the bytes for this message, call up the stack */
call_op.type = GRPC_RECV_MESSAGE;
call_op.done_cb = done_message;
call_op.done_cb = do_nothing;
/* TODO(ctiller): this could be a lot faster if coded directly */
call_op.user_data = call_op.data.message = grpc_byte_buffer_create(
call_op.data.message = grpc_byte_buffer_create(
calld->incoming_message.slices, calld->incoming_message.count);
gpr_slice_buffer_reset_and_unref(&calld->incoming_message);

@ -80,9 +80,7 @@ void grpc_pollset_kick(grpc_pollset *p) {
}
}
void grpc_pollset_force_kick(grpc_pollset *p) {
grpc_pollset_kick_kick(&p->kick_state);
}
void grpc_pollset_force_kick(grpc_pollset *p) { grpc_pollset_kick_kick(&p->kick_state); }
/* global state management */

@ -0,0 +1,78 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "src/core/surface/byte_buffer_queue.h"
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
static void bba_destroy(grpc_bbq_array *array) { gpr_free(array->data); }
/* Append an operation to an array, expanding as needed */
static void bba_push(grpc_bbq_array *a, grpc_byte_buffer *buffer) {
if (a->count == a->capacity) {
a->capacity = GPR_MAX(a->capacity * 2, 8);
a->data = gpr_realloc(a->data, sizeof(grpc_byte_buffer *) * a->capacity);
}
a->data[a->count++] = buffer;
}
void grpc_bbq_destroy(grpc_byte_buffer_queue *q) {
bba_destroy(&q->filling);
bba_destroy(&q->draining);
}
int grpc_bbq_empty(grpc_byte_buffer_queue *q) {
return (q->drain_pos == q->draining.count && q->filling.count == 0);
}
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *buffer) {
bba_push(&q->filling, buffer);
}
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q) {
grpc_bbq_array temp_array;
if (q->drain_pos == q->draining.count) {
if (q->filling.count == 0) {
return NULL;
}
q->draining.count = 0;
q->drain_pos = 0;
/* swap arrays */
temp_array = q->filling;
q->filling = q->draining;
q->draining = temp_array;
}
return q->draining.data[q->drain_pos++];
}

@ -0,0 +1,59 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
#define __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__
#include <grpc/byte_buffer.h>
/* TODO(ctiller): inline an element or two into this struct to avoid per-call
allocations */
typedef struct {
grpc_byte_buffer **data;
size_t count;
size_t capacity;
} grpc_bbq_array;
/* should be initialized by zeroing memory */
typedef struct {
size_t drain_pos;
grpc_bbq_array filling;
grpc_bbq_array draining;
} grpc_byte_buffer_queue;
void grpc_bbq_destroy(grpc_byte_buffer_queue *q);
grpc_byte_buffer *grpc_bbq_pop(grpc_byte_buffer_queue *q);
int grpc_bbq_empty(grpc_byte_buffer_queue *q);
void grpc_bbq_push(grpc_byte_buffer_queue *q, grpc_byte_buffer *bb);
#endif /* __GRPC_INTERNAL_SURFACE_BYTE_BUFFER_QUEUE_H__ */

File diff suppressed because it is too large Load Diff

@ -38,27 +38,73 @@
#include "src/core/channel/metadata_buffer.h"
#include <grpc/grpc.h>
/* Primitive operation types - grpc_op's get rewritten into these */
typedef enum {
GRPC_IOREQ_RECV_INITIAL_METADATA,
GRPC_IOREQ_RECV_MESSAGE,
GRPC_IOREQ_RECV_TRAILING_METADATA,
GRPC_IOREQ_RECV_STATUS,
GRPC_IOREQ_RECV_CLOSE,
GRPC_IOREQ_SEND_INITIAL_METADATA,
GRPC_IOREQ_SEND_MESSAGE,
GRPC_IOREQ_SEND_TRAILING_METADATA,
GRPC_IOREQ_SEND_STATUS,
GRPC_IOREQ_SEND_CLOSE,
GRPC_IOREQ_OP_COUNT
} grpc_ioreq_op;
typedef struct {
grpc_status_code *code;
char **details;
size_t *details_capacity;
} grpc_recv_status_args;
typedef union {
grpc_metadata_array *recv_metadata;
grpc_byte_buffer **recv_message;
grpc_recv_status_args recv_status;
struct {
size_t count;
grpc_metadata *metadata;
} send_metadata;
grpc_byte_buffer *send_message;
struct {
grpc_status_code code;
char *details;
} send_status;
} grpc_ioreq_data;
typedef struct {
grpc_ioreq_op op;
grpc_ioreq_data data;
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
grpc_op_error status,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel,
const void *server_transport_data);
void grpc_call_internal_ref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call);
void grpc_call_internal_unref(grpc_call *call, int allow_immediate_deletion);
/* Helpers for grpc_client, grpc_server filters to publish received data to
the completion queue/surface layer */
void grpc_call_recv_metadata(grpc_call_element *surface_element,
grpc_call_op *op);
void grpc_call_recv_message(
grpc_call_element *surface_element, grpc_byte_buffer *message,
void (*on_finish)(void *user_data, grpc_op_error error), void *user_data);
void grpc_call_recv_finish(grpc_call_element *surface_element,
int is_full_close);
grpc_mdelem *md);
void grpc_call_recv_message(grpc_call_element *surface_element,
grpc_byte_buffer *message);
void grpc_call_read_closed(grpc_call_element *surface_element);
void grpc_call_stream_closed(grpc_call_element *surface_element);
void grpc_call_execute_op(grpc_call *call, grpc_call_op *op);
grpc_call_error grpc_call_start_ioreq_and_call_back(
grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
grpc_ioreq_completion_func on_complete, void *user_data);
/* Called when it's known that the initial batch of metadata is complete on the
client side (must not be called on the server) */
void grpc_call_client_initial_metadata_complete(
/* Called when it's known that the initial batch of metadata is complete */
void grpc_call_initial_metadata_complete(
grpc_call_element *surface_element);
void grpc_call_set_deadline(grpc_call_element *surface_element,
@ -69,10 +115,4 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call);
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
/* Get the metadata buffer. */
grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call);
void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem,
gpr_uint32 flags);
#endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */

@ -51,7 +51,7 @@ struct grpc_channel {
grpc_mdstr *authority_string;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1))
grpc_channel *grpc_channel_create_from_filters(
const grpc_channel_filter **filters, size_t num_filters,
@ -80,6 +80,7 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
grpc_call *call;
grpc_mdelem *path_mdelem;
grpc_mdelem *authority_mdelem;
grpc_call_op op;
if (!channel->is_client) {
gpr_log(GPR_ERROR, "Cannot create a call on the server.");
@ -91,20 +92,25 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel,
/* Add :path and :authority headers. */
/* TODO(klempner): Consider optimizing this by stashing mdelems for common
values of method and host. */
grpc_mdstr_ref(channel->path_string);
path_mdelem = grpc_mdelem_from_metadata_strings(
channel->metadata_context, channel->path_string,
channel->metadata_context, grpc_mdstr_ref(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method));
grpc_call_add_mdelem(call, path_mdelem, 0);
op.type = GRPC_SEND_METADATA;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;
op.data.metadata = path_mdelem;
op.done_cb = do_nothing;
op.user_data = NULL;
grpc_call_execute_op(call, &op);
grpc_mdstr_ref(channel->authority_string);
authority_mdelem = grpc_mdelem_from_metadata_strings(
channel->metadata_context, channel->authority_string,
grpc_mdstr_from_string(channel->metadata_context, host));
grpc_call_add_mdelem(call, authority_mdelem, 0);
op.data.metadata = authority_mdelem;
grpc_call_execute_op(call, &op);
if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) {
grpc_call_op op;
op.type = GRPC_SEND_DEADLINE;
op.dir = GRPC_CALL_DOWN;
op.flags = 0;

@ -56,23 +56,23 @@ static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_next_op(elem, op);
break;
case GRPC_RECV_METADATA:
grpc_call_recv_metadata(elem, op);
grpc_call_recv_metadata(elem, op->data.metadata);
break;
case GRPC_RECV_DEADLINE:
gpr_log(GPR_ERROR, "Deadline received by client (ignored)");
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message, op->done_cb,
op->user_data);
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_RECV_HALF_CLOSE:
grpc_call_recv_finish(elem, 0);
grpc_call_read_closed(elem);
break;
case GRPC_RECV_FINISH:
grpc_call_recv_finish(elem, 1);
grpc_call_stream_closed(elem);
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
grpc_call_client_initial_metadata_complete(elem);
grpc_call_initial_metadata_complete(elem);
break;
default:
GPR_ASSERT(op->dir == GRPC_CALL_DOWN);

@ -173,18 +173,6 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
@ -197,6 +185,17 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
ev = add_locked(cc, GRPC_IOREQ, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_IOREQ);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_call *call,
grpc_event_finish_func on_finish,
@ -389,7 +388,7 @@ void grpc_event_finish(grpc_event *base) {
event *ev = (event *)base;
ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
if (ev->base.call) {
grpc_call_internal_unref(ev->base.call);
grpc_call_internal_unref(ev->base.call, 1);
}
gpr_free(ev);
}

@ -97,6 +97,10 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements);
void grpc_cq_end_ioreq(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_op_error error);
void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
/* disable polling for some tests */

@ -87,10 +87,10 @@ char *grpc_event_string(grpc_event *ev) {
gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
}
break;
case GRPC_INVOKE_ACCEPTED:
gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: "));
case GRPC_IOREQ:
gpr_strvec_add(&buf, gpr_strdup("IOREQ: "));
addhdr(&buf, ev);
adderr(&buf, ev->data.invoke_accepted);
adderr(&buf, ev->data.ioreq);
break;
case GRPC_WRITE_ACCEPTED:
gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));

@ -50,26 +50,16 @@ typedef struct {
grpc_mdelem *message;
} channel_data;
static void do_nothing(void *data, grpc_op_error error) {}
static void call_op(grpc_call_element *elem, grpc_call_element *from_elem,
grpc_call_op *op) {
channel_data *channeld = elem->channel_data;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_SEND_START: {
grpc_call_op set_status_op;
grpc_mdelem_ref(channeld->message);
memset(&set_status_op, 0, sizeof(grpc_call_op));
set_status_op.dir = GRPC_CALL_UP;
set_status_op.type = GRPC_RECV_METADATA;
set_status_op.done_cb = do_nothing;
set_status_op.data.metadata = channeld->message;
grpc_call_recv_metadata(elem, &set_status_op);
grpc_call_recv_finish(elem, 1);
case GRPC_SEND_START:
grpc_call_recv_metadata(elem, grpc_mdelem_ref(channeld->message));
grpc_call_stream_closed(elem);
break;
}
case GRPC_SEND_METADATA:
grpc_mdelem_unref(op->data.metadata);
break;

@ -44,6 +44,7 @@
#include "src/core/surface/call.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/completion_queue.h"
#include "src/core/transport/metadata.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
@ -63,11 +64,24 @@ typedef struct channel_data channel_data;
struct channel_data {
grpc_server *server;
grpc_channel *channel;
grpc_mdstr *path_key;
grpc_mdstr *authority_key;
/* linked list of all channels on a server */
channel_data *next;
channel_data *prev;
};
typedef void (*new_call_cb)(grpc_server *server, grpc_completion_queue *cq,
grpc_metadata_array *initial_metadata,
call_data *calld, void *user_data);
typedef struct {
void *user_data;
grpc_completion_queue *cq;
grpc_metadata_array *initial_metadata;
new_call_cb cb;
} requested_call;
struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
@ -76,9 +90,9 @@ struct grpc_server {
gpr_mu mu;
void **tags;
size_t ntags;
size_t tag_cap;
requested_call *requested_calls;
size_t requested_call_count;
size_t requested_call_capacity;
gpr_uint8 shutdown;
gpr_uint8 have_shutdown_tag;
@ -107,11 +121,17 @@ typedef enum {
ZOMBIED
} call_state;
typedef struct legacy_data { grpc_metadata_array *initial_metadata; } legacy_data;
struct call_data {
grpc_call *call;
call_state state;
gpr_timespec deadline;
grpc_mdstr *path;
grpc_mdstr *host;
legacy_data *legacy;
gpr_uint8 included[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@ -179,7 +199,7 @@ static void server_unref(grpc_server *server) {
grpc_channel_args_destroy(server->channel_args);
gpr_mu_destroy(&server->mu);
gpr_free(server->channel_filters);
gpr_free(server->tags);
gpr_free(server->requested_calls);
gpr_free(server);
}
}
@ -210,62 +230,37 @@ static void destroy_channel(channel_data *chand) {
grpc_iomgr_add_callback(finish_destroy_channel, chand);
}
static void queue_new_rpc(grpc_server *server, call_data *calld, void *tag) {
grpc_call *call = calld->call;
grpc_metadata_buffer *mdbuf = grpc_call_get_metadata_buffer(call);
size_t count = grpc_metadata_buffer_count(mdbuf);
grpc_metadata *elements = grpc_metadata_buffer_extract_elements(mdbuf);
const char *host = NULL;
const char *method = NULL;
size_t i;
for (i = 0; i < count; i++) {
if (0 == strcmp(elements[i].key, ":authority")) {
host = elements[i].value;
} else if (0 == strcmp(elements[i].key, ":path")) {
method = elements[i].value;
}
}
grpc_call_internal_ref(call);
grpc_cq_end_new_rpc(server->cq, tag, call,
grpc_metadata_buffer_cleanup_elements, elements, method,
host, calld->deadline, count, elements);
}
static void start_new_rpc(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_server *server = chand->server;
gpr_mu_lock(&server->mu);
if (server->ntags) {
if (server->requested_call_count > 0) {
requested_call rc = server->requested_calls[--server->requested_call_count];
calld->state = ACTIVATED;
queue_new_rpc(server, calld, server->tags[--server->ntags]);
gpr_mu_unlock(&server->mu);
rc.cb(server, rc.cq, rc.initial_metadata, calld, rc.user_data);
} else {
calld->state = PENDING;
call_list_join(server, calld, PENDING_START);
gpr_mu_unlock(&server->mu);
}
gpr_mu_unlock(&server->mu);
}
static void kill_zombie(void *elem, int success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
static void finish_rpc(grpc_call_element *elem, int is_full_close) {
static void stream_closed(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->server->mu);
switch (calld->state) {
case ACTIVATED:
grpc_call_recv_finish(elem, is_full_close);
grpc_call_stream_closed(elem);
break;
case PENDING:
if (!is_full_close) {
grpc_call_recv_finish(elem, is_full_close);
break;
}
call_list_remove(chand->server, calld, PENDING_START);
/* fallthrough intended */
case NOT_STARTED:
@ -278,25 +273,57 @@ static void finish_rpc(grpc_call_element *elem, int is_full_close) {
gpr_mu_unlock(&chand->server->mu);
}
static void read_closed(grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->server->mu);
switch (calld->state) {
case ACTIVATED:
case PENDING:
grpc_call_read_closed(elem);
break;
case NOT_STARTED:
calld->state = ZOMBIED;
grpc_iomgr_add_callback(kill_zombie, elem);
break;
case ZOMBIED:
break;
}
gpr_mu_unlock(&chand->server->mu);
}
static void call_op(grpc_call_element *elem, grpc_call_element *from_elemn,
grpc_call_op *op) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_mdelem *md;
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
switch (op->type) {
case GRPC_RECV_METADATA:
grpc_call_recv_metadata(elem, op);
md = op->data.metadata;
if (md->key == chand->path_key) {
calld->path = grpc_mdstr_ref(md->value);
grpc_mdelem_unref(md);
} else if (md->key == chand->authority_key) {
calld->host = grpc_mdstr_ref(md->value);
grpc_mdelem_unref(md);
} else {
grpc_call_recv_metadata(elem, md);
}
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
start_new_rpc(elem);
grpc_call_initial_metadata_complete(elem);
break;
case GRPC_RECV_MESSAGE:
grpc_call_recv_message(elem, op->data.message, op->done_cb,
op->user_data);
grpc_call_recv_message(elem, op->data.message);
op->done_cb(op->user_data, GRPC_OP_OK);
break;
case GRPC_RECV_HALF_CLOSE:
finish_rpc(elem, 0);
read_closed(elem);
break;
case GRPC_RECV_FINISH:
finish_rpc(elem, 1);
stream_closed(elem);
break;
case GRPC_RECV_DEADLINE:
grpc_call_set_deadline(elem, op->data.deadline);
@ -371,6 +398,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
int i;
gpr_mu_lock(&chand->server->mu);
@ -383,6 +411,19 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
gpr_mu_unlock(&chand->server->mu);
if (calld->host) {
grpc_mdstr_unref(calld->host);
}
if (calld->path) {
grpc_mdstr_unref(calld->path);
}
if (calld->legacy) {
gpr_free(calld->legacy->initial_metadata->metadata);
gpr_free(calld->legacy->initial_metadata);
gpr_free(calld->legacy);
}
server_unref(chand->server);
}
@ -395,6 +436,8 @@ static void init_channel_elem(grpc_channel_element *elem,
GPR_ASSERT(!is_last);
chand->server = NULL;
chand->channel = NULL;
chand->path_key = grpc_mdstr_from_string(metadata_context, ":path");
chand->authority_key = grpc_mdstr_from_string(metadata_context, ":authority");
chand->next = chand->prev = chand;
}
@ -406,6 +449,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
gpr_mu_unlock(&chand->server->mu);
grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key);
server_unref(chand->server);
}
}
@ -413,17 +458,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
static const grpc_channel_filter server_surface_filter = {
call_op, channel_op, sizeof(call_data),
init_call_elem, destroy_call_elem, sizeof(channel_data),
init_channel_elem, destroy_channel_elem, "server", };
static void early_terminate_requested_calls(grpc_completion_queue *cq,
void **tags, size_t ntags) {
size_t i;
for (i = 0; i < ntags; i++) {
grpc_cq_end_new_rpc(cq, tags[i], NULL, do_nothing, NULL, NULL, NULL,
gpr_inf_past, 0, NULL);
}
}
init_channel_elem, destroy_channel_elem, "server",
};
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_channel_filter **filters,
@ -517,8 +553,8 @@ grpc_transport_setup_result grpc_server_setup_transport(
void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
void *shutdown_tag) {
listener *l;
void **tags;
size_t ntags;
requested_call *requested_calls;
size_t requested_call_count;
channel_data **channels;
channel_data *c;
size_t nchannels;
@ -547,10 +583,10 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
i++;
}
tags = server->tags;
ntags = server->ntags;
server->tags = NULL;
server->ntags = 0;
requested_calls = server->requested_calls;
requested_call_count = server->requested_call_count;
server->requested_calls = NULL;
server->requested_call_count = 0;
server->shutdown = 1;
server->have_shutdown_tag = have_shutdown_tag;
@ -579,8 +615,12 @@ void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_free(channels);
/* terminate all the requested calls */
early_terminate_requested_calls(server->cq, tags, ntags);
gpr_free(tags);
for (i = 0; i < requested_call_count; i++) {
requested_calls[i].cb(server, requested_calls[i].cq,
requested_calls[i].initial_metadata, NULL,
requested_calls[i].user_data);
}
gpr_free(requested_calls);
/* Shutdown listeners */
for (l = server->listeners; l; l = l->next) {
@ -625,36 +665,105 @@ void grpc_server_add_listener(grpc_server *server, void *arg,
server->listeners = l;
}
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
static grpc_call_error queue_call_request(grpc_server *server,
grpc_completion_queue *cq,
grpc_metadata_array *initial_metadata,
new_call_cb cb, void *user_data) {
call_data *calld;
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
requested_call *rc;
gpr_mu_lock(&server->mu);
if (server->shutdown) {
gpr_mu_unlock(&server->mu);
early_terminate_requested_calls(server->cq, &tag_new, 1);
cb(server, cq, initial_metadata, NULL, user_data);
return GRPC_CALL_OK;
}
calld = call_list_remove_head(server, PENDING_START);
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
queue_new_rpc(server, calld, tag_new);
gpr_mu_unlock(&server->mu);
cb(server, cq, initial_metadata, calld, user_data);
return GRPC_CALL_OK;
} else {
if (server->tag_cap == server->ntags) {
server->tag_cap = GPR_MAX(3 * server->tag_cap / 2, server->tag_cap + 1);
server->tags =
gpr_realloc(server->tags, sizeof(void *) * server->tag_cap);
if (server->requested_call_count == server->requested_call_capacity) {
server->requested_call_capacity =
GPR_MAX(server->requested_call_capacity + 8,
server->requested_call_capacity * 2);
server->requested_calls =
gpr_realloc(server->requested_calls,
sizeof(requested_call) * server->requested_call_capacity);
}
server->tags[server->ntags++] = tag_new;
rc = &server->requested_calls[server->requested_call_count++];
rc->cb = cb;
rc->cq = cq;
rc->user_data = user_data;
rc->initial_metadata = initial_metadata;
gpr_mu_unlock(&server->mu);
return GRPC_CALL_OK;
}
gpr_mu_unlock(&server->mu);
}
static void begin_request(grpc_server *server, grpc_completion_queue *cq,
grpc_metadata_array *initial_metadata,
call_data *call_data, void *tag) {
abort();
}
return GRPC_CALL_OK;
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call_details *details,
grpc_metadata_array *initial_metadata, grpc_completion_queue *cq,
void *tag) {
grpc_cq_begin_op(cq, NULL, GRPC_IOREQ);
return queue_call_request(server, cq, initial_metadata, begin_request, tag);
}
static void publish_legacy_request(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
calld->legacy->initial_metadata->count,
calld->legacy->initial_metadata->metadata);
} else {
abort();
}
}
static void begin_legacy_request(grpc_server *server, grpc_completion_queue *cq,
grpc_metadata_array *initial_metadata,
call_data *calld, void *tag) {
grpc_ioreq req;
if (!calld) {
gpr_free(initial_metadata);
grpc_cq_end_new_rpc(cq, tag, NULL, do_nothing, NULL, NULL, NULL,
gpr_inf_past, 0, NULL);
return;
}
req.op = GRPC_IOREQ_RECV_INITIAL_METADATA;
req.data.recv_metadata = initial_metadata;
calld->legacy = gpr_malloc(sizeof(legacy_data));
memset(calld->legacy, 0, sizeof(legacy_data));
calld->legacy->initial_metadata = initial_metadata;
grpc_call_internal_ref(calld->call);
grpc_call_start_ioreq_and_call_back(calld->call, &req, 1,
publish_legacy_request, tag);
}
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
grpc_metadata_array *client_metadata =
gpr_malloc(sizeof(grpc_metadata_array));
memset(client_metadata, 0, sizeof(*client_metadata));
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
return queue_call_request(server, server->cq, client_metadata,
begin_legacy_request, tag_new);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {

@ -432,7 +432,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem,
static void deadline_enc(grpc_chttp2_hpack_compressor *c, gpr_timespec deadline,
framer_state *st) {
char timeout_str[32];
char timeout_str[GRPC_CHTTP2_TIMEOUT_ENCODE_MIN_BUFSIZE];
grpc_chttp2_encode_timeout(gpr_time_sub(deadline, gpr_now()), timeout_str);
hpack_enc(c, grpc_mdelem_from_metadata_strings(
c->mdctx, grpc_mdstr_ref(c->timeout_key_str),

@ -957,7 +957,7 @@ static void send_batch(grpc_transport *gt, grpc_stream *gs, grpc_stream_op *ops,
stream_list_join(t, s, WRITABLE);
}
} else {
grpc_stream_ops_unref_owned_objects(ops, ops_count);
grpc_sopb_append(&t->nuke_later_sopb, ops, ops_count);
}
if (is_last && s->outgoing_sopb.nops == 0 && s->read_closed) {
stream_list_join(t, s, PENDING_CALLBACKS);

@ -102,6 +102,7 @@ Status Channel::StartBlockingRpc(const RpcMethod &method,
grpc_call *call = grpc_channel_create_call_old(
c_channel_, method.name(), target_.c_str(), context->RawDeadline());
context->set_call(call);
grpc_event *ev;
void *finished_tag = reinterpret_cast<char *>(call);
void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;

@ -206,11 +206,13 @@ OPENSSL_ALPN_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/ope
ZLIB_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/zlib.c -lz $(LDFLAGS)
PERFTOOLS_CHECK_CMD = $(CC) $(CFLAGS) $(CPPFLAGS) -o /dev/null test/build/perftools.c -lprofiler $(LDFLAGS)
ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
HAS_SYSTEM_PERFTOOLS = $(shell $(PERFTOOLS_CHECK_CMD) 2> /dev/null && echo true || echo false)
ifeq ($(HAS_SYSTEM_PERFTOOLS),true)
DEFINES += GRPC_HAVE_PERFTOOLS
LIBS += profiler
endif
endif
ifndef REQUIRE_CUSTOM_LIBRARIES_$(CONFIG)
HAS_SYSTEM_OPENSSL_ALPN = $(shell $(OPENSSL_ALPN_CHECK_CMD) 2> /dev/null && echo true || echo false)

@ -70,6 +70,7 @@ typedef struct expectation {
union {
grpc_op_error finish_accepted;
grpc_op_error write_accepted;
grpc_op_error ioreq;
struct {
const char *method;
const char *host;
@ -180,9 +181,6 @@ static void verify_matches(expectation *e, grpc_event *ev) {
case GRPC_WRITE_ACCEPTED:
GPR_ASSERT(e->data.write_accepted == ev->data.write_accepted);
break;
case GRPC_INVOKE_ACCEPTED:
abort();
break;
case GRPC_SERVER_RPC_NEW:
GPR_ASSERT(string_equivalent(e->data.server_rpc_new.method,
ev->data.server_rpc_new.method));
@ -222,6 +220,9 @@ static void verify_matches(expectation *e, grpc_event *ev) {
GPR_ASSERT(ev->data.read == NULL);
}
break;
case GRPC_IOREQ:
GPR_ASSERT(e->data.ioreq == ev->data.ioreq);
break;
case GRPC_SERVER_SHUTDOWN:
break;
case GRPC_COMPLETION_DO_NOT_USE:
@ -242,7 +243,9 @@ static void metadata_expectation(gpr_strvec *buf, metadata *md) {
gpr_asprintf(&tmp, "%c%s:%s", i ? ',' : '{', md->keys[i], md->values[i]);
gpr_strvec_add(buf, tmp);
}
gpr_strvec_add(buf, gpr_strdup("}"));
if (md->count) {
gpr_strvec_add(buf, gpr_strdup("}"));
}
}
}
@ -261,8 +264,9 @@ static void expectation_to_strvec(gpr_strvec *buf, expectation *e) {
e->data.write_accepted);
gpr_strvec_add(buf, tmp);
break;
case GRPC_INVOKE_ACCEPTED:
gpr_strvec_add(buf, gpr_strdup("GRPC_INVOKE_ACCEPTED"));
case GRPC_IOREQ:
gpr_asprintf(&tmp, "GRPC_IOREQ result=%d", e->data.ioreq);
gpr_strvec_add(buf, tmp);
break;
case GRPC_SERVER_RPC_NEW:
timeout = gpr_time_sub(e->data.server_rpc_new.deadline, gpr_now());

@ -142,7 +142,6 @@ void test_connect(const char *server_host, const char *client_host, int port,
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);

@ -135,7 +135,6 @@ static void test_body(grpc_end2end_test_fixture f) {
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);
grpc_call_destroy(c);

@ -138,7 +138,6 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);
@ -207,7 +206,7 @@ static void test_max_concurrent_streams(grpc_end2end_test_config config) {
/* The /alpha or /beta calls started above could be invoked (but NOT both);
* check this here */
/* We'll get tag 303 or 403, we want 300, 400 */
live_call = ((int)(gpr_intptr)ev->tag) - 3;
live_call = ((int)(gpr_intptr) ev->tag) - 3;
grpc_event_finish(ev);
cq_expect_server_rpc_new(v_server, &s1, tag(100),

@ -139,7 +139,6 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);
@ -180,16 +179,14 @@ static void simple_request_body2(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write_status_old(
s, GRPC_STATUS_UNIMPLEMENTED, "xyz", tag(5)));
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_verify(v_server);
cq_expect_client_metadata_read(v_client, tag(2), NULL);
cq_verify(v_client);
cq_expect_finished_with_status(v_client, tag(3), GRPC_STATUS_UNIMPLEMENTED,
"xyz", NULL);
cq_verify(v_client);
cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK);
cq_expect_finished(v_server, tag(102), NULL);
cq_verify(v_server);

@ -105,32 +105,6 @@ static void test_cq_end_read(void) {
shutdown_and_destroy(cc);
}
static void test_cq_end_invoke_accepted(void) {
grpc_event *ev;
grpc_completion_queue *cc;
int on_finish_called = 0;
void *tag = create_test_tag();
LOG_TEST();
cc = grpc_completion_queue_create();
grpc_cq_begin_op(cc, NULL, GRPC_INVOKE_ACCEPTED);
grpc_cq_end_invoke_accepted(cc, tag, NULL, increment_int_on_finish,
&on_finish_called, GRPC_OP_OK);
ev = grpc_completion_queue_next(cc, gpr_inf_past);
GPR_ASSERT(ev != NULL);
GPR_ASSERT(ev->type == GRPC_INVOKE_ACCEPTED);
GPR_ASSERT(ev->tag == tag);
GPR_ASSERT(ev->data.invoke_accepted == GRPC_OP_OK);
GPR_ASSERT(on_finish_called == 0);
grpc_event_finish(ev);
GPR_ASSERT(on_finish_called == 1);
shutdown_and_destroy(cc);
}
static void test_cq_end_write_accepted(void) {
grpc_event *ev;
grpc_completion_queue *cc;
@ -421,7 +395,6 @@ int main(int argc, char **argv) {
test_no_op();
test_wait_empty();
test_cq_end_read();
test_cq_end_invoke_accepted();
test_cq_end_write_accepted();
test_cq_end_finish_accepted();
test_cq_end_client_metadata_read();

@ -15,5 +15,8 @@ RUN cd /var/local/git/grpc && \
git pull --recurse-submodules && \
git submodule update --init --recursive
# Build the C core
RUN make static_c shared_c -j12 -C /var/local/git/grpc
# Define the default command.
CMD ["bash"]

@ -84,5 +84,8 @@ RUN wget https://phar.phpunit.de/phpunit.phar \
&& chmod +x phpunit.phar \
&& mv phpunit.phar /usr/local/bin/phpunit
# Build the C core
RUN make static_c shared_c -j12 -C /var/local/git/grpc
# Define the default command.
CMD ["bash"]

@ -53,3 +53,6 @@ RUN cd /var/local/git/grpc/third_party/protobuf && \
./autogen.sh && \
./configure --prefix=/usr && \
make -j12 && make check && make install && make clean
# Build the C core
RUN make static_c shared_c -j12 -C /var/local/git/grpc

@ -146,6 +146,7 @@
<ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
<ClInclude Include="..\..\src\core\statistics\hash_table.h" />
<ClInclude Include="..\..\src\core\statistics\window_stats.h" />
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />
<ClInclude Include="..\..\src\core\surface\channel.h" />
<ClInclude Include="..\..\src\core\surface\client.h" />
@ -312,6 +313,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\call.c">

@ -202,6 +202,9 @@
<ClCompile Include="..\..\src\core\surface\byte_buffer.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
<Filter>src\core\surface</Filter>
</ClCompile>
@ -521,6 +524,9 @@
<ClInclude Include="..\..\src\core\statistics\window_stats.h">
<Filter>src\core\statistics</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h">
<Filter>src\core\surface</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\surface\call.h">
<Filter>src\core\surface</Filter>
</ClInclude>

@ -146,6 +146,7 @@
<ClInclude Include="..\..\src\core\statistics\census_tracing.h" />
<ClInclude Include="..\..\src\core\statistics\hash_table.h" />
<ClInclude Include="..\..\src\core\statistics\window_stats.h" />
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h" />
<ClInclude Include="..\..\src\core\surface\call.h" />
<ClInclude Include="..\..\src\core\surface\channel.h" />
<ClInclude Include="..\..\src\core\surface\client.h" />
@ -312,6 +313,8 @@
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
</ClCompile>
<ClCompile Include="..\..\src\core\surface\call.c">

@ -163,6 +163,9 @@
<ClCompile Include="..\..\src\core\surface\byte_buffer.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_queue.c">
<Filter>src\core\surface</Filter>
</ClCompile>
<ClCompile Include="..\..\src\core\surface\byte_buffer_reader.c">
<Filter>src\core\surface</Filter>
</ClCompile>
@ -446,6 +449,9 @@
<ClInclude Include="..\..\src\core\statistics\window_stats.h">
<Filter>src\core\statistics</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\surface\byte_buffer_queue.h">
<Filter>src\core\surface</Filter>
</ClInclude>
<ClInclude Include="..\..\src\core\surface\call.h">
<Filter>src\core\surface</Filter>
</ClInclude>

Loading…
Cancel
Save