Merge branch 'c++api' of github.com:ctiller/grpc into c++api

pull/501/head
Yang Gao 10 years ago
commit 02a6e3a76a
  1. 12
      include/grpc/grpc.h
  2. 5
      src/core/iomgr/tcp_server.h
  3. 11
      src/core/iomgr/tcp_server_posix.c
  4. 5
      src/core/security/server_secure_chttp2.c
  5. 79
      src/core/surface/server.c
  6. 2
      src/core/surface/server.h
  7. 4
      src/core/surface/server_chttp2.c
  8. 2
      src/cpp/server/server.cc
  9. 2
      test/core/end2end/tests/cancel_after_accept.c
  10. 2
      test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c
  11. 2
      test/core/end2end/tests/request_response_with_metadata_and_payload.c
  12. 2
      test/core/end2end/tests/request_response_with_payload.c
  13. 2
      test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c
  14. 2
      test/core/end2end/tests/request_with_large_metadata.c
  15. 2
      test/core/end2end/tests/request_with_payload.c
  16. 2
      test/core/end2end/tests/simple_delayed_request.c
  17. 2
      test/core/end2end/tests/simple_request.c
  18. 6
      test/core/iomgr/tcp_server_posix_test.c

@ -550,15 +550,25 @@ void grpc_call_destroy(grpc_call *call);
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new);
/* Request notification of a new call */
grpc_call_error grpc_server_request_call(
grpc_server *server, grpc_call **call, grpc_call_details *details,
grpc_metadata_array *request_metadata,
grpc_completion_queue *cq_bound_to_call,
void *tag_new);
/* Registers a method in the server.
Methods to this (host, method) pair will not be reported by
grpc_server_request_call, but instead be reported by
grpc_server_request_registered_call when passed the appropriate
registered_method (as returned by this function).
Must be called before grpc_server_start.
Returns NULL on failure. */
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host);
const char *host,
grpc_completion_queue *new_call_cq);
/* Request notification of a new pre-registered call */
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *request_metadata,

@ -46,8 +46,9 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
grpc_tcp_server *grpc_tcp_server_create(void);
/* Start listening to bound ports */
void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset,
grpc_tcp_server_cb cb, void *cb_arg);
void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets,
size_t pollset_count, grpc_tcp_server_cb cb,
void *cb_arg);
/* Add a port to the server, returning port number on success, or negative
on failure.

@ -353,9 +353,10 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) {
return (index < s->nports) ? s->ports[index].fd : -1;
}
void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
grpc_tcp_server_cb cb, void *cb_arg) {
size_t i;
void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
size_t pollset_count, grpc_tcp_server_cb cb,
void *cb_arg) {
size_t i, j;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->cb);
@ -363,8 +364,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
if (pollset) {
grpc_pollset_add_fd(pollset, s->ports[i].emfd);
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
}
grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
s->active_ports++;

@ -76,9 +76,10 @@ static void on_accept(void *server, grpc_endpoint *tcp) {
/* Note: the following code is the same with server_chttp2.c */
/* Server callback: start listening on our ports */
static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets,
size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_start(tcp, pollset, on_accept, server);
grpc_tcp_server_start(tcp, pollsets, pollset_count, on_accept, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further

@ -53,7 +53,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
void (*start)(grpc_server *server, void *arg, grpc_pollset **pollsets, size_t pollset_count);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@ -101,6 +101,7 @@ struct registered_method {
char *host;
call_data *pending;
requested_call_array requested;
grpc_completion_queue *cq;
registered_method *next;
};
@ -127,7 +128,11 @@ struct grpc_server {
size_t channel_filter_count;
const grpc_channel_filter **channel_filters;
grpc_channel_args *channel_args;
grpc_completion_queue *cq;
grpc_completion_queue *unregistered_cq;
grpc_completion_queue **cqs;
grpc_pollset **pollsets;
size_t cq_count;
gpr_mu mu;
@ -169,6 +174,7 @@ struct call_data {
grpc_mdstr *host;
legacy_data *legacy;
grpc_completion_queue *cq_new;
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
@ -496,7 +502,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
int i;
size_t i;
gpr_mu_lock(&chand->server->mu);
for (i = 0; i < CALL_LIST_COUNT; i++) {
@ -504,7 +510,9 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
if (chand->server->shutdown && chand->server->have_shutdown_tag &&
chand->server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(chand->server->cq, chand->server->shutdown_tag);
for (i = 0; i < chand->server->cq_count; i++) {
grpc_cq_end_server_shutdown(chand->server->cqs[i], chand->server->shutdown_tag);
}
}
gpr_mu_unlock(&chand->server->mu);
@ -557,6 +565,16 @@ static const grpc_channel_filter server_surface_filter = {
sizeof(channel_data), init_channel_elem, destroy_channel_elem, "server",
};
static void addcq(grpc_server *server, grpc_completion_queue *cq) {
size_t i, n;
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
n = server->cq_count++;
server->cqs = gpr_realloc(server->cqs, server->cq_count * sizeof(grpc_completion_queue*));
server->cqs[n] = cq;
}
grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_channel_filter **filters,
size_t filter_count,
@ -566,10 +584,11 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
grpc_server *server = gpr_malloc(sizeof(grpc_server));
memset(server, 0, sizeof(grpc_server));
if (cq) addcq(server, cq);
gpr_mu_init(&server->mu);
server->cq = cq;
server->unregistered_cq = cq;
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
server->root_channel_data.next = server->root_channel_data.prev =
@ -605,7 +624,7 @@ static int streq(const char *a, const char *b) {
}
void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
const char *host, grpc_completion_queue *cq_new_rpc) {
registered_method *m;
if (!method) {
gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
@ -618,20 +637,28 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
return NULL;
}
}
addcq(server, cq_new_rpc);
m = gpr_malloc(sizeof(registered_method));
memset(m, 0, sizeof(*m));
m->method = gpr_strdup(method);
m->host = gpr_strdup(host);
m->next = server->registered_methods;
m->cq = cq_new_rpc;
server->registered_methods = m;
return m;
}
void grpc_server_start(grpc_server *server) {
listener *l;
size_t i;
server->pollsets = gpr_malloc(sizeof(grpc_pollset*) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
}
for (l = server->listeners; l; l = l->next) {
l->start(server, l->arg, grpc_cq_pollset(server->cq));
l->start(server, l->arg, server->pollsets, server->cq_count);
}
}
@ -664,7 +691,9 @@ grpc_transport_setup_result grpc_server_setup_transport(
}
filters[i] = &grpc_connected_channel_filter;
grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cq));
for (i = 0; i < s->cq_count; i++) {
grpc_transport_add_to_pollset(transport, grpc_cq_pollset(s->cqs[i]));
}
channel = grpc_channel_create_from_filters(filters, num_filters,
s->channel_args, mdctx, 0);
@ -765,9 +794,11 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
server->have_shutdown_tag = have_shutdown_tag;
server->shutdown_tag = shutdown_tag;
if (have_shutdown_tag) {
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_SHUTDOWN);
if (server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(server->cq, shutdown_tag);
for (i = 0; i < server->cq_count; i++) {
grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
if (server->lists[ALL_CALLS] == NULL) {
grpc_cq_end_server_shutdown(server->cqs[i], shutdown_tag);
}
}
}
gpr_mu_unlock(&server->mu);
@ -826,7 +857,7 @@ void grpc_server_destroy(grpc_server *server) {
void grpc_server_add_listener(grpc_server *server, void *arg,
void (*start)(grpc_server *server, void *arg,
grpc_pollset *pollset),
grpc_pollset **pollsets, size_t pollset_count),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
@ -878,7 +909,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.data.batch.cq_bind = cq_bind;
@ -889,12 +920,13 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
}
grpc_call_error grpc_server_request_registered_call(
grpc_server *server, void *registered_method, grpc_call **call,
grpc_server *server, void *rm, grpc_call **call,
gpr_timespec *deadline, grpc_metadata_array *initial_metadata,
grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
grpc_cq_begin_op(server->cq, NULL, GRPC_OP_COMPLETE);
registered_method *registered_method = rm;
grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.data.registered.cq_bind = cq_bind;
@ -909,7 +941,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_call_error grpc_server_request_call_old(grpc_server *server,
void *tag_new) {
requested_call rc;
grpc_cq_begin_op(server->cq, NULL, GRPC_SERVER_RPC_NEW);
grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_SERVER_RPC_NEW);
rc.type = LEGACY_CALL;
rc.tag = tag_new;
return queue_call_request(server, &rc);
@ -965,6 +997,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
r->data.recv_metadata = rc->data.batch.initial_metadata;
r++;
calld->cq_new = server->unregistered_cq;
publish = publish_registered_or_batch;
break;
case REGISTERED_CALL:
@ -979,6 +1012,7 @@ static void begin_call(grpc_server *server, call_data *calld,
r->data.recv_message = rc->data.registered.optional_payload;
r++;
}
calld->cq_new = rc->data.registered.registered_method->cq;
publish = publish_registered_or_batch;
break;
}
@ -991,19 +1025,19 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc) {
switch (rc->type) {
case LEGACY_CALL:
grpc_cq_end_new_rpc(server->cq, rc->tag, NULL, do_nothing, NULL, NULL,
grpc_cq_end_new_rpc(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, NULL,
NULL, gpr_inf_past, 0, NULL);
break;
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
grpc_cq_end_op_complete(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
GRPC_OP_ERROR);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op_complete(server->cq, rc->tag, NULL, do_nothing, NULL,
grpc_cq_end_op_complete(rc->data.registered.registered_method->cq, rc->tag, NULL, do_nothing, NULL,
GRPC_OP_ERROR);
break;
}
@ -1017,7 +1051,7 @@ static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) {
grpc_server *server = chand->server;
if (status == GRPC_OP_OK) {
grpc_cq_end_new_rpc(server->cq, tag, call, do_nothing, NULL,
grpc_cq_end_new_rpc(server->unregistered_cq, tag, call, do_nothing, NULL,
grpc_mdstr_as_c_string(calld->path),
grpc_mdstr_as_c_string(calld->host), calld->deadline,
calld->legacy->initial_metadata.count,
@ -1032,9 +1066,8 @@ static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
channel_data *chand = elem->channel_data;
grpc_server *server = chand->server;
grpc_cq_end_op_complete(server->cq, tag, call, do_nothing, NULL, status);
call_data *calld = elem->call_data;
grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {

@ -48,7 +48,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
void (*start)(grpc_server *server, void *arg,
grpc_pollset *pollset),
grpc_pollset **pollsets, size_t npollsets),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the

@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
static void start(grpc_server *server, void *tcpp, grpc_pollset **pollsets, size_t pollset_count) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_start(tcp, pollset, new_transport, server);
grpc_tcp_server_start(tcp, pollsets, pollset_count, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further

@ -84,7 +84,7 @@ Server::~Server() {
bool Server::RegisterService(RpcService *service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
void *tag = grpc_server_register_method(server_, method->name(), nullptr);
void *tag = grpc_server_register_method(server_, method->name(), nullptr, cq_.cq());
if (!tag) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());

@ -166,7 +166,7 @@ static void test_cancel_after_accept(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(2)));
cq_expect_completion(v_server, tag(2), GRPC_OP_OK);
cq_verify(v_server);

@ -175,7 +175,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -168,7 +168,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -162,7 +162,7 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -169,7 +169,7 @@ static void test_request_response_with_metadata_and_payload(
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -166,7 +166,7 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -157,7 +157,7 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -144,7 +144,7 @@ static void simple_delayed_request_body(grpc_end2end_test_config config,
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s,
&call_details,
&request_metadata_recv,
f->server_cq, f->server_cq,
f->server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -150,7 +150,7 @@ static void simple_request_body(grpc_end2end_test_fixture f) {
GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s,
&call_details,
&request_metadata_recv,
f.server_cq, f.server_cq,
f.server_cq,
tag(101)));
cq_expect_completion(v_server, tag(101), GRPC_OP_OK);
cq_verify(v_server);

@ -66,7 +66,7 @@ static void test_no_op(void) {
static void test_no_op_with_start(void) {
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
grpc_tcp_server_start(s, NULL, on_connect, NULL);
grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@ -93,7 +93,7 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
grpc_tcp_server_start(s, NULL, on_connect, NULL);
grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
grpc_tcp_server_destroy(s);
}
@ -120,7 +120,7 @@ static void test_connect(int n) {
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));
grpc_tcp_server_start(s, NULL, on_connect, NULL);
grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
for (i = 0; i < n; i++) {
deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(10000000));

Loading…
Cancel
Save