From 8e8fd89fafbab00bcb91c032692978320b8b1e6b Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 10 Feb 2015 17:02:08 -0800 Subject: [PATCH] Allow two completion queues on request call One for the new rpc notification, the other is bound to the new call. This will make async c++ soooo much easier. --- include/grpc/grpc | 1 + include/grpc/grpc.h | 7 +++- src/core/surface/server.c | 39 ++++++++++++------- test/core/end2end/tests/cancel_after_accept.c | 3 +- ...esponse_with_binary_metadata_and_payload.c | 3 +- ...quest_response_with_metadata_and_payload.c | 3 +- .../tests/request_response_with_payload.c | 3 +- ...ponse_with_trailing_metadata_and_payload.c | 3 +- .../tests/request_with_large_metadata.c | 3 +- .../core/end2end/tests/request_with_payload.c | 3 +- .../end2end/tests/simple_delayed_request.c | 3 +- test/core/end2end/tests/simple_request.c | 3 +- 12 files changed, 50 insertions(+), 24 deletions(-) create mode 120000 include/grpc/grpc diff --git a/include/grpc/grpc b/include/grpc/grpc new file mode 120000 index 00000000000..fc80ad1c867 --- /dev/null +++ b/include/grpc/grpc @@ -0,0 +1 @@ +/home/craig/grpc-ct/include/grpc \ No newline at end of file diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 4ccb5a4dd59..7733f8bb2ae 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -553,7 +553,9 @@ grpc_call_error grpc_server_request_call_old(grpc_server *server, grpc_call_error grpc_server_request_call( grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, - grpc_completion_queue *completion_queue, void *tag_new); + grpc_completion_queue *cq_when_rpc_available, + grpc_completion_queue *cq_bound_to_call, + void *tag_new); void *grpc_server_register_method(grpc_server *server, const char *method, const char *host); @@ -562,7 +564,8 @@ grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *request_metadata, grpc_byte_buffer **optional_payload, - grpc_completion_queue *completion_queue, void *tag_new); + grpc_completion_queue *cq_when_rpc_available, + grpc_completion_queue *cq_bound_to_call, void *tag_new); /* Create a server */ grpc_server *grpc_server_create(grpc_completion_queue *cq, diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 81eaf4fc940..b28a52bcbdd 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -74,13 +74,15 @@ typedef struct { void *tag; union { struct { - grpc_completion_queue *cq; + grpc_completion_queue *cq_new; + grpc_completion_queue *cq_bind; grpc_call **call; grpc_call_details *details; grpc_metadata_array *initial_metadata; } batch; struct { - grpc_completion_queue *cq; + grpc_completion_queue *cq_new; + grpc_completion_queue *cq_bind; grpc_call **call; registered_method *registered_method; gpr_timespec *deadline; @@ -172,6 +174,8 @@ struct call_data { call_data **root[CALL_LIST_COUNT]; call_link links[CALL_LIST_COUNT]; + + grpc_completion_queue *cq_new; }; #define SERVER_FROM_CALL_ELEM(elem) \ @@ -847,12 +851,14 @@ static grpc_call_error queue_call_request(grpc_server *server, grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *initial_metadata, - grpc_completion_queue *cq, void *tag) { + grpc_completion_queue *cq_new, + grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); rc.type = BATCH_CALL; rc.tag = tag; - rc.data.batch.cq = cq; + rc.data.batch.cq_new = cq_new; + rc.data.batch.cq_bind = cq_bind; rc.data.batch.call = call; rc.data.batch.details = details; rc.data.batch.initial_metadata = initial_metadata; @@ -862,12 +868,14 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_call_error grpc_server_request_registered_call( grpc_server *server, void *registered_method, grpc_call **call, gpr_timespec *deadline, grpc_metadata_array *initial_metadata, - grpc_byte_buffer **optional_payload, grpc_completion_queue *cq, void *tag) { + grpc_byte_buffer **optional_payload, grpc_completion_queue *cq_new, grpc_completion_queue *cq_bind, + void *tag) { requested_call rc; - grpc_cq_begin_op(cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(cq_new, NULL, GRPC_OP_COMPLETE); rc.type = REGISTERED_CALL; rc.tag = tag; - rc.data.registered.cq = cq; + rc.data.registered.cq_new = cq_new; + rc.data.registered.cq_bind = cq_bind; rc.data.registered.call = call; rc.data.registered.registered_method = registered_method; rc.data.registered.deadline = deadline; @@ -926,16 +934,17 @@ static void begin_call(grpc_server *server, call_data *calld, &rc->data.batch.details->host_capacity, calld->host); cpstr(&rc->data.batch.details->method, &rc->data.batch.details->method_capacity, calld->path); - grpc_call_set_completion_queue(calld->call, rc->data.batch.cq); + grpc_call_set_completion_queue(calld->call, rc->data.batch.cq_bind); *rc->data.batch.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.batch.initial_metadata; r++; + calld->cq_new = rc->data.batch.cq_new; publish = publish_registered_or_batch; break; case REGISTERED_CALL: *rc->data.registered.deadline = calld->deadline; - grpc_call_set_completion_queue(calld->call, rc->data.registered.cq); + grpc_call_set_completion_queue(calld->call, rc->data.registered.cq_bind); *rc->data.registered.call = calld->call; r->op = GRPC_IOREQ_RECV_INITIAL_METADATA; r->data.recv_metadata = rc->data.registered.initial_metadata; @@ -945,6 +954,7 @@ static void begin_call(grpc_server *server, call_data *calld, r->data.recv_message = rc->data.registered.optional_payload; r++; } + calld->cq_new = rc->data.registered.cq_new; publish = publish_registered_or_batch; break; } @@ -963,13 +973,13 @@ static void fail_call(grpc_server *server, requested_call *rc) { case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.batch.cq, rc->tag, NULL, do_nothing, + grpc_cq_end_op_complete(rc->data.batch.cq_new, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; - grpc_cq_end_op_complete(rc->data.registered.cq, rc->tag, NULL, do_nothing, + grpc_cq_end_op_complete(rc->data.registered.cq_new, rc->tag, NULL, do_nothing, NULL, GRPC_OP_ERROR); break; } @@ -996,7 +1006,10 @@ static void publish_legacy(grpc_call *call, grpc_op_error status, void *tag) { static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, void *tag) { - grpc_cq_end_op_complete(grpc_call_get_completion_queue(call), tag, call, + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + call_data *calld = elem->call_data; + grpc_cq_end_op_complete(calld->cq_new, tag, call, do_nothing, NULL, status); } diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index eb26ff14f00..ab7c683e452 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -166,7 +166,8 @@ static void test_cancel_after_accept(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(2))); + f.server_cq, f.server_cq, + tag(2))); cq_expect_completion(v_server, tag(2), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c index fa5df5f5260..cb477144d3f 100644 --- a/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_binary_metadata_and_payload.c @@ -175,7 +175,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_metadata_and_payload.c index ad01fe70813..0d4822ec91b 100644 --- a/test/core/end2end/tests/request_response_with_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_metadata_and_payload.c @@ -168,7 +168,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_payload.c b/test/core/end2end/tests/request_response_with_payload.c index 6b60c4da651..fe3f05fa954 100644 --- a/test/core/end2end/tests/request_response_with_payload.c +++ b/test/core/end2end/tests/request_response_with_payload.c @@ -162,7 +162,8 @@ static void request_response_with_payload(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c index 5878058c982..86ee405964b 100644 --- a/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c +++ b/test/core/end2end/tests/request_response_with_trailing_metadata_and_payload.c @@ -169,7 +169,8 @@ static void test_request_response_with_metadata_and_payload( GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_large_metadata.c b/test/core/end2end/tests/request_with_large_metadata.c index 7e7bec0160c..8e5b1014f54 100644 --- a/test/core/end2end/tests/request_with_large_metadata.c +++ b/test/core/end2end/tests/request_with_large_metadata.c @@ -166,7 +166,8 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index 2c23f37e0c3..67b15770142 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -157,7 +157,8 @@ static void test_invoke_request_with_payload(grpc_end2end_test_config config) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index 99d1a263864..5c9109f9629 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -144,7 +144,8 @@ static void simple_delayed_request_body(grpc_end2end_test_config config, GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, &s, &call_details, &request_metadata_recv, - f->server_cq, tag(101))); + f->server_cq, f->server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server); diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index 0f046ae2d23..280bf98c167 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -150,7 +150,8 @@ static void simple_request_body(grpc_end2end_test_fixture f) { GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f.server, &s, &call_details, &request_metadata_recv, - f.server_cq, tag(101))); + f.server_cq, f.server_cq, + tag(101))); cq_expect_completion(v_server, tag(101), GRPC_OP_OK); cq_verify(v_server);