From 787a92491ff38654ec9ad3e8439096a607cedd3e Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Tue, 10 Feb 2015 16:39:40 -0800 Subject: [PATCH 01/12] Updated ping_pong_request to new API --- test/core/fling/client.c | 41 +++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/test/core/fling/client.c b/test/core/fling/client.c index a91dfba9b0e..096dc097724 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -49,22 +49,21 @@ static grpc_byte_buffer *the_buffer; static grpc_channel *channel; static grpc_completion_queue *cq; static grpc_call *call; +static grpc_op ops[6]; +static grpc_metadata_array initial_metadata_recv; +static grpc_metadata_array trailing_metadata_recv; +static grpc_byte_buffer *response_payload_recv = NULL; +static grpc_call_details call_details; +static grpc_status_code status; +static char *details = NULL; +static size_t details_capacity = 0; static void init_ping_pong_request(void) {} static void step_ping_pong_request(void) { - call = grpc_channel_create_call_old(channel, "/Reflector/reflectUnary", + call = grpc_channel_create_call(channel, cq, "/Reflector/reflectUnary", "localhost", gpr_inf_future); - GPR_ASSERT(grpc_call_invoke_old(call, cq, (void *)1, (void *)1, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - GPR_ASSERT(grpc_call_start_write_old(call, the_buffer, (void *)1, - GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK); - grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); - GPR_ASSERT(grpc_call_start_read_old(call, (void *)1) == GRPC_CALL_OK); - GPR_ASSERT(grpc_call_writes_done_old(call, (void *)1) == GRPC_CALL_OK); - grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); - grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); - grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, 6, (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); grpc_call_destroy(call); call = NULL; @@ -148,6 +147,26 @@ int main(int argc, char **argv) { cq = grpc_completion_queue_create(); the_buffer = grpc_byte_buffer_create(&slice, payload_size); histogram = gpr_histogram_create(0.01, 60e9); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_call_details_init(&call_details); + + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[0].data.send_initial_metadata.count = 0; + ops[1].op = GRPC_OP_SEND_MESSAGE; + ops[1].data.send_message = the_buffer; + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &initial_metadata_recv; + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message = &response_payload_recv; + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + ops[5].data.recv_status_on_client.status = &status; + ops[5].data.recv_status_on_client.status_details = &details; + ops[5].data.recv_status_on_client.status_details_capacity = &details_capacity; + sc.init(); for (i = 0; i < 1000; i++) { From 3af24651d8ee2a176324871518dbd1438164ed1c Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Wed, 11 Feb 2015 10:46:20 -0800 Subject: [PATCH 02/12] Updated rest of client to new API --- test/core/fling/client.c | 64 +++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 096dc097724..2cd08f81976 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -50,6 +50,8 @@ static grpc_channel *channel; static grpc_completion_queue *cq; static grpc_call *call; static grpc_op ops[6]; +static grpc_op stream_init_op; +static grpc_op stream_step_ops[2]; static grpc_metadata_array initial_metadata_recv; static grpc_metadata_array trailing_metadata_recv; static grpc_byte_buffer *response_payload_recv = NULL; @@ -58,7 +60,26 @@ static grpc_status_code status; static char *details = NULL; static size_t details_capacity = 0; -static void init_ping_pong_request(void) {} +static void init_ping_pong_request(void) { + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_call_details_init(&call_details); + + ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + ops[0].data.send_initial_metadata.count = 0; + ops[1].op = GRPC_OP_SEND_MESSAGE; + ops[1].data.send_message = the_buffer; + ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[3].data.recv_initial_metadata = &initial_metadata_recv; + ops[4].op = GRPC_OP_RECV_MESSAGE; + ops[4].data.recv_message = &response_payload_recv; + ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[5].data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + ops[5].data.recv_status_on_client.status = &status; + ops[5].data.recv_status_on_client.status_details = &details; + ops[5].data.recv_status_on_client.status_details_capacity = &details_capacity; +} static void step_ping_pong_request(void) { call = grpc_channel_create_call(channel, cq, "/Reflector/reflectUnary", @@ -70,18 +91,25 @@ static void step_ping_pong_request(void) { } static void init_ping_pong_stream(void) { - call = grpc_channel_create_call_old(channel, "/Reflector/reflectStream", - "localhost", gpr_inf_future); - GPR_ASSERT(grpc_call_invoke_old(call, cq, (void *)1, (void *)1, 0) == - GRPC_CALL_OK); + call = grpc_channel_create_call(channel, cq, "/Reflector/reflectStream", + "localhost", gpr_inf_future); + stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA; + stream_init_op.data.send_initial_metadata.count = 0; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &stream_init_op, 1, + (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); + + grpc_metadata_array_init(&initial_metadata_recv); + + stream_step_ops[0].op = GRPC_OP_SEND_MESSAGE; + stream_step_ops[0].data.send_message = the_buffer; + stream_step_ops[1].op = GRPC_OP_RECV_MESSAGE; + stream_step_ops[1].data.recv_message = &response_payload_recv; } static void step_ping_pong_stream(void) { - GPR_ASSERT(grpc_call_start_write_old(call, the_buffer, (void *)1, 0) == - GRPC_CALL_OK); - GPR_ASSERT(grpc_call_start_read_old(call, (void *)1) == GRPC_CALL_OK); - grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, stream_step_ops, 2, + (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); } @@ -148,24 +176,6 @@ int main(int argc, char **argv) { the_buffer = grpc_byte_buffer_create(&slice, payload_size); histogram = gpr_histogram_create(0.01, 60e9); - grpc_metadata_array_init(&initial_metadata_recv); - grpc_metadata_array_init(&trailing_metadata_recv); - grpc_call_details_init(&call_details); - - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[0].data.send_initial_metadata.count = 0; - ops[1].op = GRPC_OP_SEND_MESSAGE; - ops[1].data.send_message = the_buffer; - ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &initial_metadata_recv; - ops[4].op = GRPC_OP_RECV_MESSAGE; - ops[4].data.recv_message = &response_payload_recv; - ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[5].data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; - ops[5].data.recv_status_on_client.status = &status; - ops[5].data.recv_status_on_client.status_details = &details; - ops[5].data.recv_status_on_client.status_details_capacity = &details_capacity; sc.init(); From 4151cac01355669a52f60ffd65ce875531b74bf6 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 13 Feb 2015 09:29:55 -0800 Subject: [PATCH 03/12] Initial draft of server using new async API --- test/core/fling/server.c | 105 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/test/core/fling/server.c b/test/core/fling/server.c index ba5e96ddc04..fae4cc89871 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -52,17 +52,77 @@ static grpc_completion_queue *cq; static grpc_server *server; +static grpc_call *call; +static grpc_call_details call_details; +static grpc_metadata_array request_metadata_recv; +static grpc_metadata_array initial_metadata_send; +static grpc_byte_buffer *payload_buffer = NULL; + + +static grpc_op read_op; +static grpc_op metadata_send_op; +static grpc_op write_op; +static grpc_op status_op[2]; +static int was_cancelled = 2; + static int got_sigint = 0; +static void *tag(gpr_intptr t) { return (void *)t; } + typedef struct { gpr_refcount pending_ops; gpr_uint32 flags; } call_state; static void request_call(void) { + /* call_state *s = gpr_malloc(sizeof(call_state)); gpr_ref_init(&s->pending_ops, 2); - grpc_server_request_call_old(server, s); + */ + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, + cq, tag(101)); +} + +static void send_initial_metadata(void) { + grpc_metadata_array_init(&initial_metadata_send); + metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; + metadata_send_op.data.send_initial_metadata.count = 0; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &metadata_send_op, 1, + tag(3))); +} + +static void start_read_op(void) { + /* Starting read at server */ + read_op.op = GRPC_OP_RECV_MESSAGE; + read_op.data.recv_message = &payload_buffer; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, + tag(1))); +} + +static void start_write_op(void) { + /* Starting write at server */ + write_op.op = GRPC_OP_SEND_MESSAGE; + if (payload_buffer == NULL) { + gpr_log(GPR_INFO, "NULL payload buffer !!!"); + } + write_op.data.send_message = payload_buffer; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &write_op, 1, + tag(2))); +} + + +static void start_send_status(void) { + status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK; + status_op[0].data.send_status_from_server.trailing_metadata_count = 0; + status_op[0].data.send_status_from_server.status_details = ""; + status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + status_op[1].data.recv_close_on_server.cancelled = &was_cancelled; + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, status_op, 2, + tag(4))); } static void sigint_handler(int x) { got_sigint = 1; } @@ -133,6 +193,49 @@ int main(int argc, char **argv) { if (!ev) continue; s = ev->tag; switch (ev->type) { + case GRPC_OP_COMPLETE: + switch ((gpr_intptr) s) { + case 101: + if(call != NULL) { + if (0 == strcmp(call_details.method, + "/Reflector/reflectStream")) { + /* Received streaming call. Send metadata here. */ + } else { + /* Received unary call. Can do all ops in one batch. */ + } + start_read_op(); + send_initial_metadata(); + } + else { + GPR_ASSERT(shutdown_started); + } + /* request_call(); + */ + break; + case 1: + if (payload_buffer != NULL) { + /* Received payload from client. */ + start_write_op(); + } + else { + /* Received end of stream from client. */ + start_send_status(); + } + break; + case 2: + /* Write completed at server */ + start_read_op(); + break; + case 3: + /* Metadata send completed at server */ + break; + case 4: + /* Send status and close completed at server */ + grpc_call_destroy(call); + request_call(); + break; + } + break; case GRPC_SERVER_RPC_NEW: if (ev->call != NULL) { /* initial ops are already started in request_call */ From 445612ec1829e469ccaf2fbb509b916c62da1bfd Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 13 Feb 2015 10:06:00 -0800 Subject: [PATCH 04/12] Btach handling of unary --- test/core/fling/server.c | 86 ++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/test/core/fling/server.c b/test/core/fling/server.c index fae4cc89871..2ac5500ee5c 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -57,6 +57,8 @@ static grpc_call_details call_details; static grpc_metadata_array request_metadata_recv; static grpc_metadata_array initial_metadata_send; static grpc_byte_buffer *payload_buffer = NULL; +/* Used to drain the terminal read in unary calls. */ +static grpc_byte_buffer *terminal_buffer = NULL; static grpc_op read_op; @@ -64,7 +66,7 @@ static grpc_op metadata_send_op; static grpc_op write_op; static grpc_op status_op[2]; static int was_cancelled = 2; - +static grpc_op unary_ops[6]; static int got_sigint = 0; static void *tag(gpr_intptr t) { return (void *)t; } @@ -75,16 +77,34 @@ typedef struct { } call_state; static void request_call(void) { - /* - call_state *s = gpr_malloc(sizeof(call_state)); - gpr_ref_init(&s->pending_ops, 2); - */ grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, cq, tag(101)); } +static void handle_unary_method(void) { + grpc_metadata_array_init(&initial_metadata_send); + unary_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; + unary_ops[0].data.send_initial_metadata.count = 0; + unary_ops[1].op = GRPC_OP_RECV_MESSAGE; + unary_ops[1].data.recv_message = &terminal_buffer; + unary_ops[2].op = GRPC_OP_SEND_MESSAGE; + if (payload_buffer == NULL) { + gpr_log(GPR_INFO, "NULL payload buffer !!!"); + } + unary_ops[2].data.send_message = payload_buffer; + unary_ops[3].op = GRPC_OP_SEND_STATUS_FROM_SERVER; + unary_ops[3].data.send_status_from_server.status = GRPC_STATUS_OK; + unary_ops[3].data.send_status_from_server.trailing_metadata_count = 0; + unary_ops[3].data.send_status_from_server.status_details = ""; + unary_ops[4].op = GRPC_OP_RECV_CLOSE_ON_SERVER; + unary_ops[4].data.recv_close_on_server.cancelled = &was_cancelled; + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, unary_ops, 5, + tag(6))); +} + static void send_initial_metadata(void) { grpc_metadata_array_init(&initial_metadata_send); metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; @@ -93,12 +113,12 @@ static void send_initial_metadata(void) { tag(3))); } -static void start_read_op(void) { +static void start_read_op(int t) { /* Starting read at server */ read_op.op = GRPC_OP_RECV_MESSAGE; read_op.data.recv_message = &payload_buffer; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, - tag(1))); + tag(t))); } static void start_write_op(void) { @@ -200,11 +220,12 @@ int main(int argc, char **argv) { if (0 == strcmp(call_details.method, "/Reflector/reflectStream")) { /* Received streaming call. Send metadata here. */ + start_read_op(1); + send_initial_metadata(); } else { /* Received unary call. Can do all ops in one batch. */ + start_read_op(5); } - start_read_op(); - send_initial_metadata(); } else { GPR_ASSERT(shutdown_started); @@ -224,7 +245,7 @@ int main(int argc, char **argv) { break; case 2: /* Write completed at server */ - start_read_op(); + start_read_op(1); break; case 3: /* Metadata send completed at server */ @@ -234,45 +255,26 @@ int main(int argc, char **argv) { grpc_call_destroy(call); request_call(); break; + case 5: + /* Finished payload read for unary. Start all reamaining + * unary ops in a batch. + */ + handle_unary_method(); + break; + case 6: + /* Finished unary call. */ + grpc_call_destroy(call); + request_call(); + break; } break; case GRPC_SERVER_RPC_NEW: - if (ev->call != NULL) { - /* initial ops are already started in request_call */ - if (0 == strcmp(ev->data.server_rpc_new.method, - "/Reflector/reflectStream")) { - s->flags = 0; - } else { - s->flags = GRPC_WRITE_BUFFER_HINT; - } - grpc_call_server_accept_old(ev->call, cq, s); - grpc_call_server_end_initial_metadata_old(ev->call, s->flags); - GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK); - request_call(); - } else { - GPR_ASSERT(shutdown_started); - gpr_free(s); - } - break; case GRPC_WRITE_ACCEPTED: - GPR_ASSERT(ev->data.write_accepted == GRPC_OP_OK); - GPR_ASSERT(grpc_call_start_read_old(ev->call, s) == GRPC_CALL_OK); - break; case GRPC_READ: - if (ev->data.read) { - GPR_ASSERT(grpc_call_start_write_old(ev->call, ev->data.read, s, - s->flags) == GRPC_CALL_OK); - } else { - GPR_ASSERT(grpc_call_start_write_status_old(ev->call, GRPC_STATUS_OK, - NULL, s) == GRPC_CALL_OK); - } - break; case GRPC_FINISH_ACCEPTED: case GRPC_FINISHED: - if (gpr_unref(&s->pending_ops)) { - grpc_call_destroy(ev->call); - gpr_free(s); - } + gpr_log(GPR_ERROR, "Unexpected event type."); + GPR_ASSERT(0); break; case GRPC_QUEUE_SHUTDOWN: GPR_ASSERT(shutdown_started); From e3dd33ff8c55c67ac858f8a6dff107c5596f9891 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 13 Feb 2015 10:07:42 -0800 Subject: [PATCH 05/12] clang formatting --- test/core/fling/client.c | 16 ++--- test/core/fling/server.c | 128 ++++++++++++++++++--------------------- 2 files changed, 68 insertions(+), 76 deletions(-) diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 2cd08f81976..76a0b2eed9f 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -83,7 +83,7 @@ static void init_ping_pong_request(void) { static void step_ping_pong_request(void) { call = grpc_channel_create_call(channel, cq, "/Reflector/reflectUnary", - "localhost", gpr_inf_future); + "localhost", gpr_inf_future); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, 6, (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); grpc_call_destroy(call); @@ -92,11 +92,11 @@ static void step_ping_pong_request(void) { static void init_ping_pong_stream(void) { call = grpc_channel_create_call(channel, cq, "/Reflector/reflectStream", - "localhost", gpr_inf_future); + "localhost", gpr_inf_future); stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA; stream_init_op.data.send_initial_metadata.count = 0; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &stream_init_op, 1, - (void *)1)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, &stream_init_op, 1, (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); grpc_metadata_array_init(&initial_metadata_recv); @@ -108,8 +108,8 @@ static void init_ping_pong_stream(void) { } static void step_ping_pong_stream(void) { - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, stream_step_ops, 2, - (void *)1)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, stream_step_ops, 2, (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); } @@ -126,7 +126,8 @@ typedef struct { static const scenario scenarios[] = { {"ping-pong-request", init_ping_pong_request, step_ping_pong_request}, - {"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream}, }; + {"ping-pong-stream", init_ping_pong_stream, step_ping_pong_stream}, +}; int main(int argc, char **argv) { gpr_slice slice = gpr_slice_from_copied_string("x"); @@ -176,7 +177,6 @@ int main(int argc, char **argv) { the_buffer = grpc_byte_buffer_create(&slice, payload_size); histogram = gpr_histogram_create(0.01, 60e9); - sc.init(); for (i = 0; i < 1000; i++) { diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 2ac5500ee5c..b1f3ca25888 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -60,7 +60,6 @@ static grpc_byte_buffer *payload_buffer = NULL; /* Used to drain the terminal read in unary calls. */ static grpc_byte_buffer *terminal_buffer = NULL; - static grpc_op read_op; static grpc_op metadata_send_op; static grpc_op write_op; @@ -80,7 +79,7 @@ static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, tag(101)); + cq, tag(101)); } static void handle_unary_method(void) { @@ -101,24 +100,22 @@ static void handle_unary_method(void) { unary_ops[4].op = GRPC_OP_RECV_CLOSE_ON_SERVER; unary_ops[4].data.recv_close_on_server.cancelled = &was_cancelled; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, unary_ops, 5, - tag(6))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, unary_ops, 5, tag(6))); } static void send_initial_metadata(void) { grpc_metadata_array_init(&initial_metadata_send); metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; metadata_send_op.data.send_initial_metadata.count = 0; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &metadata_send_op, 1, - tag(3))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, &metadata_send_op, 1, tag(3))); } static void start_read_op(int t) { /* Starting read at server */ read_op.op = GRPC_OP_RECV_MESSAGE; read_op.data.recv_message = &payload_buffer; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, - tag(t))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &read_op, 1, tag(t))); } static void start_write_op(void) { @@ -128,11 +125,9 @@ static void start_write_op(void) { gpr_log(GPR_INFO, "NULL payload buffer !!!"); } write_op.data.send_message = payload_buffer; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &write_op, 1, - tag(2))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &write_op, 1, tag(2))); } - static void start_send_status(void) { status_op[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; status_op[0].data.send_status_from_server.status = GRPC_STATUS_OK; @@ -141,8 +136,7 @@ static void start_send_status(void) { status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; status_op[1].data.recv_close_on_server.cancelled = &was_cancelled; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, status_op, 2, - tag(4))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, status_op, 2, tag(4))); } static void sigint_handler(int x) { got_sigint = 1; } @@ -214,66 +208,64 @@ int main(int argc, char **argv) { s = ev->tag; switch (ev->type) { case GRPC_OP_COMPLETE: - switch ((gpr_intptr) s) { - case 101: - if(call != NULL) { - if (0 == strcmp(call_details.method, - "/Reflector/reflectStream")) { - /* Received streaming call. Send metadata here. */ - start_read_op(1); - send_initial_metadata(); - } else { - /* Received unary call. Can do all ops in one batch. */ - start_read_op(5); - } - } - else { - GPR_ASSERT(shutdown_started); - } - /* request_call(); - */ - break; - case 1: - if (payload_buffer != NULL) { - /* Received payload from client. */ - start_write_op(); - } - else { - /* Received end of stream from client. */ - start_send_status(); - } - break; - case 2: - /* Write completed at server */ - start_read_op(1); - break; - case 3: - /* Metadata send completed at server */ - break; - case 4: - /* Send status and close completed at server */ - grpc_call_destroy(call); - request_call(); - break; - case 5: - /* Finished payload read for unary. Start all reamaining - * unary ops in a batch. - */ - handle_unary_method(); - break; - case 6: - /* Finished unary call. */ - grpc_call_destroy(call); - request_call(); - break; - } - break; + switch ((gpr_intptr)s) { + case 101: + if (call != NULL) { + if (0 == + strcmp(call_details.method, "/Reflector/reflectStream")) { + /* Received streaming call. Send metadata here. */ + start_read_op(1); + send_initial_metadata(); + } else { + /* Received unary call. Can do all ops in one batch. */ + start_read_op(5); + } + } else { + GPR_ASSERT(shutdown_started); + } + /* request_call(); + */ + break; + case 1: + if (payload_buffer != NULL) { + /* Received payload from client. */ + start_write_op(); + } else { + /* Received end of stream from client. */ + start_send_status(); + } + break; + case 2: + /* Write completed at server */ + start_read_op(1); + break; + case 3: + /* Metadata send completed at server */ + break; + case 4: + /* Send status and close completed at server */ + grpc_call_destroy(call); + request_call(); + break; + case 5: + /* Finished payload read for unary. Start all reamaining + * unary ops in a batch. + */ + handle_unary_method(); + break; + case 6: + /* Finished unary call. */ + grpc_call_destroy(call); + request_call(); + break; + } + break; case GRPC_SERVER_RPC_NEW: case GRPC_WRITE_ACCEPTED: case GRPC_READ: case GRPC_FINISH_ACCEPTED: case GRPC_FINISHED: - gpr_log(GPR_ERROR, "Unexpected event type."); + gpr_log(GPR_ERROR, "Unexpected event type."); GPR_ASSERT(0); break; case GRPC_QUEUE_SHUTDOWN: From c2340873123f8dbd51dc512cb4abb308e82e2c73 Mon Sep 17 00:00:00 2001 From: vjpai Date: Fri, 13 Feb 2015 11:12:36 -0800 Subject: [PATCH 06/12] Fix header file for Mac --- src/core/support/time_posix.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 7f0f028183e..3f2a81ec058 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -70,7 +70,9 @@ gpr_timespec gpr_now(void) { } #else /* For some reason Apple's OSes haven't implemented clock_gettime. */ -/* TODO(klempner): Add special handling for Apple. */ + +#include + gpr_timespec gpr_now(void) { gpr_timespec now; struct timeval now_tv; From a2d9ed0d9132ddda51b297ef79ab09e5808a1676 Mon Sep 17 00:00:00 2001 From: Abhishek Kumar Date: Fri, 13 Feb 2015 11:56:43 -0800 Subject: [PATCH 07/12] Addressed review comments --- test/core/fling/client.c | 40 +++++++++++-------- test/core/fling/server.c | 83 ++++++++++++++++++++++++++-------------- 2 files changed, 80 insertions(+), 43 deletions(-) diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 76a0b2eed9f..62626996314 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -59,32 +59,42 @@ static grpc_call_details call_details; static grpc_status_code status; static char *details = NULL; static size_t details_capacity = 0; +static grpc_op *op; static void init_ping_pong_request(void) { grpc_metadata_array_init(&initial_metadata_recv); grpc_metadata_array_init(&trailing_metadata_recv); grpc_call_details_init(&call_details); - ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; - ops[0].data.send_initial_metadata.count = 0; - ops[1].op = GRPC_OP_SEND_MESSAGE; - ops[1].data.send_message = the_buffer; - ops[2].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &initial_metadata_recv; - ops[4].op = GRPC_OP_RECV_MESSAGE; - ops[4].data.recv_message = &response_payload_recv; - ops[5].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[5].data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; - ops[5].data.recv_status_on_client.status = &status; - ops[5].data.recv_status_on_client.status_details = &details; - ops[5].data.recv_status_on_client.status_details_capacity = &details_capacity; + op = ops; + + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = the_buffer; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &response_payload_recv; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op++; } static void step_ping_pong_request(void) { call = grpc_channel_create_call(channel, cq, "/Reflector/reflectUnary", "localhost", gpr_inf_future); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, ops, 6, (void *)1)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, ops, op - ops, (void *)1)); grpc_event_finish(grpc_completion_queue_next(cq, gpr_inf_future)); grpc_call_destroy(call); call = NULL; diff --git a/test/core/fling/server.c b/test/core/fling/server.c index b1f3ca25888..bc52059b38a 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -70,6 +70,16 @@ static int got_sigint = 0; static void *tag(gpr_intptr t) { return (void *)t; } +typedef enum { + FLING_SERVER_NEW_REQUEST = 1, + FLING_SERVER_READ_FOR_UNARY, + FLING_SERVER_BATCH_OPS_FOR_UNARY, + FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING, + FLING_SERVER_READ_FOR_STREAMING, + FLING_SERVER_WRITE_FOR_STREAMING, + FLING_SERVER_SEND_STATUS_FOR_STREAMING +} fling_server_tags; + typedef struct { gpr_refcount pending_ops; gpr_uint32 flags; @@ -79,28 +89,39 @@ static void request_call(void) { grpc_metadata_array_init(&request_metadata_recv); grpc_call_details_init(&call_details); grpc_server_request_call(server, &call, &call_details, &request_metadata_recv, - cq, tag(101)); + cq, tag(FLING_SERVER_NEW_REQUEST)); } static void handle_unary_method(void) { + grpc_op *op; + grpc_metadata_array_init(&initial_metadata_send); - unary_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; - unary_ops[0].data.send_initial_metadata.count = 0; - unary_ops[1].op = GRPC_OP_RECV_MESSAGE; - unary_ops[1].data.recv_message = &terminal_buffer; - unary_ops[2].op = GRPC_OP_SEND_MESSAGE; + + op = unary_ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &terminal_buffer; + op++; + op->op = GRPC_OP_SEND_MESSAGE; if (payload_buffer == NULL) { gpr_log(GPR_INFO, "NULL payload buffer !!!"); } - unary_ops[2].data.send_message = payload_buffer; - unary_ops[3].op = GRPC_OP_SEND_STATUS_FROM_SERVER; - unary_ops[3].data.send_status_from_server.status = GRPC_STATUS_OK; - unary_ops[3].data.send_status_from_server.trailing_metadata_count = 0; - unary_ops[3].data.send_status_from_server.status_details = ""; - unary_ops[4].op = GRPC_OP_RECV_CLOSE_ON_SERVER; - unary_ops[4].data.recv_close_on_server.cancelled = &was_cancelled; + op->data.send_message = payload_buffer; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status_details = ""; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op++; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, unary_ops, 5, tag(6))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, unary_ops, op - unary_ops, + tag(FLING_SERVER_BATCH_OPS_FOR_UNARY))); } static void send_initial_metadata(void) { @@ -108,7 +129,9 @@ static void send_initial_metadata(void) { metadata_send_op.op = GRPC_OP_SEND_INITIAL_METADATA; metadata_send_op.data.send_initial_metadata.count = 0; GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_batch(call, &metadata_send_op, 1, tag(3))); + grpc_call_start_batch( + call, &metadata_send_op, 1, + tag(FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING))); } static void start_read_op(int t) { @@ -125,7 +148,9 @@ static void start_write_op(void) { gpr_log(GPR_INFO, "NULL payload buffer !!!"); } write_op.data.send_message = payload_buffer; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, &write_op, 1, tag(2))); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_batch(call, &write_op, 1, + tag(FLING_SERVER_WRITE_FOR_STREAMING))); } static void start_send_status(void) { @@ -136,7 +161,9 @@ static void start_send_status(void) { status_op[1].op = GRPC_OP_RECV_CLOSE_ON_SERVER; status_op[1].data.recv_close_on_server.cancelled = &was_cancelled; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call, status_op, 2, tag(4))); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch( + call, status_op, 2, + tag(FLING_SERVER_SEND_STATUS_FOR_STREAMING))); } static void sigint_handler(int x) { got_sigint = 1; } @@ -209,16 +236,16 @@ int main(int argc, char **argv) { switch (ev->type) { case GRPC_OP_COMPLETE: switch ((gpr_intptr)s) { - case 101: + case FLING_SERVER_NEW_REQUEST: if (call != NULL) { if (0 == strcmp(call_details.method, "/Reflector/reflectStream")) { /* Received streaming call. Send metadata here. */ - start_read_op(1); + start_read_op(FLING_SERVER_READ_FOR_STREAMING); send_initial_metadata(); } else { /* Received unary call. Can do all ops in one batch. */ - start_read_op(5); + start_read_op(FLING_SERVER_READ_FOR_UNARY); } } else { GPR_ASSERT(shutdown_started); @@ -226,7 +253,7 @@ int main(int argc, char **argv) { /* request_call(); */ break; - case 1: + case FLING_SERVER_READ_FOR_STREAMING: if (payload_buffer != NULL) { /* Received payload from client. */ start_write_op(); @@ -235,25 +262,25 @@ int main(int argc, char **argv) { start_send_status(); } break; - case 2: + case FLING_SERVER_WRITE_FOR_STREAMING: /* Write completed at server */ - start_read_op(1); + start_read_op(FLING_SERVER_READ_FOR_STREAMING); break; - case 3: + case FLING_SERVER_SEND_INIT_METADATA_FOR_STREAMING: /* Metadata send completed at server */ break; - case 4: + case FLING_SERVER_SEND_STATUS_FOR_STREAMING: /* Send status and close completed at server */ grpc_call_destroy(call); request_call(); break; - case 5: + case FLING_SERVER_READ_FOR_UNARY: /* Finished payload read for unary. Start all reamaining * unary ops in a batch. */ handle_unary_method(); break; - case 6: + case FLING_SERVER_BATCH_OPS_FOR_UNARY: /* Finished unary call. */ grpc_call_destroy(call); request_call(); @@ -266,7 +293,7 @@ int main(int argc, char **argv) { case GRPC_FINISH_ACCEPTED: case GRPC_FINISHED: gpr_log(GPR_ERROR, "Unexpected event type."); - GPR_ASSERT(0); + abort(); break; case GRPC_QUEUE_SHUTDOWN: GPR_ASSERT(shutdown_started); From faa5f51b4bb46ed652e1ad31119df3f9086f7453 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Fri, 13 Feb 2015 21:38:11 +0100 Subject: [PATCH 08/12] Renaming the docker image to grpc_clang, and optimizing the build a bit. --- tools/dockerfile/{msan_cxx => grpc_clang}/Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) rename tools/dockerfile/{msan_cxx => grpc_clang}/Dockerfile (87%) diff --git a/tools/dockerfile/msan_cxx/Dockerfile b/tools/dockerfile/grpc_clang/Dockerfile similarity index 87% rename from tools/dockerfile/msan_cxx/Dockerfile rename to tools/dockerfile/grpc_clang/Dockerfile index d12cee6b5d4..0928121755f 100644 --- a/tools/dockerfile/msan_cxx/Dockerfile +++ b/tools/dockerfile/grpc_clang/Dockerfile @@ -23,7 +23,10 @@ RUN mv libcxx llvm/projects RUN mv libcxxabi llvm/projects RUN mkdir llvm-build -RUN cd llvm-build && cmake ../llvm +RUN cd llvm-build && cmake \ + -DCMAKE_BUILD_TYPE:STRING=Release \ + -DLLVM_TARGETS_TO_BUILD:STRING=X86 \ + ../llvm RUN make -C llvm-build && make -C llvm-build install && rm -rf llvm-build CMD ["bash"] From 83edf3e8d369d1848bda8c8ae7136c1c15c5969f Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Fri, 13 Feb 2015 22:16:35 +0000 Subject: [PATCH 09/12] Change the interface of RPC Framework's Future interface. Now our Future interface is as-API-compatible-as-possible with Python's own concurrent.futures.Future. --- src/python/src/_framework/face/_calls.py | 195 ++++++++++++--- ...on_asynchronous_event_service_test_case.py | 84 ++++--- .../src/_framework/foundation/_later_test.py | 62 ++--- .../_framework/foundation/_timer_future.py | 116 +++++++-- .../_framework/foundation/callable_util.py | 39 ++- .../src/_framework/foundation/future.py | 232 +++++++++++------- 6 files changed, 509 insertions(+), 219 deletions(-) diff --git a/src/python/src/_framework/face/_calls.py b/src/python/src/_framework/face/_calls.py index 9128aef7c45..a7d8be5e432 100644 --- a/src/python/src/_framework/face/_calls.py +++ b/src/python/src/_framework/face/_calls.py @@ -29,6 +29,7 @@ """Utility functions for invoking RPCs.""" +import sys import threading from _framework.base import interfaces as base_interfaces @@ -79,20 +80,46 @@ def _stream_event_subscription(result_consumer, abortion_callback): _EventServicedIngestor(result_consumer, abortion_callback)) +# NOTE(nathaniel): This class has some extremely special semantics around +# cancellation that allow it to be used by both "blocking" APIs and "futures" +# APIs. +# +# Since futures.Future defines its own exception for cancellation, we want these +# objects, when returned by methods of a returning-Futures-from-other-methods +# object, to raise the same exception for cancellation. But that's weird in a +# blocking API - why should this object, also returned by methods of blocking +# APIs, raise exceptions from the "future" module? Should we do something like +# have this class be parameterized by the type of exception that it raises in +# cancellation circumstances? +# +# We don't have to take such a dramatic step: since blocking APIs define no +# cancellation semantics whatsoever, there is no supported way for +# blocking-API-users of these objects to cancel RPCs, and thus no supported way +# for them to see an exception the type of which would be weird to them. +# +# Bonus: in both blocking and futures APIs, this object still properly raises +# exceptions.CancellationError for any *server-side cancellation* of an RPC. class _OperationCancellableIterator(interfaces.CancellableIterator): """An interfaces.CancellableIterator for response-streaming operations.""" def __init__(self, rendezvous, operation): + self._lock = threading.Lock() self._rendezvous = rendezvous self._operation = operation + self._cancelled = False def __iter__(self): return self def next(self): + with self._lock: + if self._cancelled: + raise future.CancelledError() return next(self._rendezvous) def cancel(self): + with self._lock: + self._cancelled = True self._operation.cancel() self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) @@ -105,46 +132,126 @@ class _OperationFuture(future.Future): self._rendezvous = rendezvous self._operation = operation - self._outcome = None + self._cancelled = False + self._computed = False + self._payload = None + self._exception = None + self._traceback = None self._callbacks = [] def cancel(self): """See future.Future.cancel for specification.""" with self._condition: - if self._outcome is None: + if not self._cancelled and not self._computed: self._operation.cancel() - self._outcome = future.aborted() + self._cancelled = True self._condition.notify_all() return False def cancelled(self): """See future.Future.cancelled for specification.""" - return False + with self._condition: + return self._cancelled + + def running(self): + """See future.Future.running for specification.""" + with self._condition: + return not self._cancelled and not self._computed def done(self): """See future.Future.done for specification.""" with self._condition: - return (self._outcome is not None and - self._outcome.category is not future.ABORTED) + return self._cancelled or self._computed + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) - def outcome(self): - """See future.Future.outcome for specification.""" with self._condition: - while self._outcome is None: - self._condition.wait() - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._traceback + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() + + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._condition: if self._callbacks is not None: - self._callbacks.add(callback) + self._callbacks.add(fn) return - outcome = self._outcome - - callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) def on_operation_termination(self, operation_outcome): """Indicates to this object that the operation has terminated. @@ -154,34 +261,42 @@ class _OperationFuture(future.Future): outcome of the operation. """ with self._condition: - if (self._outcome is None and - operation_outcome is not base_interfaces.Outcome.COMPLETED): - self._outcome = future.raised( - _control.abortion_outcome_to_exception(operation_outcome)) - self._condition.notify_all() - - outcome = self._outcome - rendezvous = self._rendezvous - callbacks = list(self._callbacks) - self._callbacks = None - - if outcome is None: - try: - return_value = next(rendezvous) - except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + cancelled = self._cancelled + if cancelled: + callbacks = list(self._callbacks) + self._callbacks = None else: - outcome = future.returned(return_value) + rendezvous = self._rendezvous + + if not cancelled: + payload = None + exception = None + traceback = None + if operation_outcome == base_interfaces.Outcome.COMPLETED: + try: + payload = next(rendezvous) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] + else: + try: + # We raise and then immediately catch in order to create a traceback. + raise _control.abortion_outcome_to_exception(operation_outcome) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] with self._condition: - if self._outcome is None: - self._outcome = outcome - self._condition.notify_all() - else: - outcome = self._outcome + if not self._cancelled: + self._computed = True + self._payload = payload + self._exception = exception + self._traceback = traceback + callbacks = list(self._callbacks) + self._callbacks = None for callback in callbacks: callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callback, _DONE_CALLBACK_LOG_MESSAGE, self) class _Call(interfaces.Call): diff --git a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index cf8b2eeb959..939b238b66e 100644 --- a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -116,7 +116,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(request, response, self) @@ -144,7 +144,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with request_iterator.pause(): response_future = self.stub.future_stream_in_value_out( name, request_iterator, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(requests, response, self) @@ -173,13 +173,13 @@ class FutureInvocationAsynchronousEventServiceTestCase( first_response_future = self.stub.future_value_in_value_out( name, first_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value + first_response = first_response_future.result() test_messages.verify(first_request, first_response, self) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - second_response = second_response_future.outcome().return_value + second_response = second_response_future.result() test_messages.verify(second_request, second_response, self) @@ -192,10 +192,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -203,11 +203,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: request = test_messages.request() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( name, request, _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -218,10 +218,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -229,11 +229,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: requests = test_messages.requests() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( name, iter(requests), _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -244,13 +244,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -276,13 +278,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -310,8 +314,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, first_request, _TIMEOUT) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value - second_response = second_response_future.outcome().return_value + first_response = first_response_future.result() + second_response = second_response_future.result() test_messages.verify(first_request, first_response, self) test_messages.verify(second_request, second_response, self) @@ -329,10 +333,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -345,7 +349,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, request, _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) def testCancelledStreamRequestUnaryResponse(self): @@ -357,10 +361,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -373,5 +377,5 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, iter(requests), _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) diff --git a/src/python/src/_framework/foundation/_later_test.py b/src/python/src/_framework/foundation/_later_test.py index fbd17a4ad9e..50b67907db4 100644 --- a/src/python/src/_framework/foundation/_later_test.py +++ b/src/python/src/_framework/foundation/_later_test.py @@ -33,7 +33,6 @@ import threading import time import unittest -from _framework.foundation import future from _framework.foundation import later TICK = 0.1 @@ -44,10 +43,14 @@ class LaterTest(unittest.TestCase): def test_simple_delay(self): lock = threading.Lock() cell = [0] - def increment_cell(): + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + return return_value + computation_future = later.later(TICK * 2, computation) + self.assertFalse(computation_future.done()) self.assertFalse(computation_future.cancelled()) time.sleep(TICK) @@ -60,22 +63,21 @@ class LaterTest(unittest.TestCase): self.assertFalse(computation_future.cancelled()) with lock: self.assertEqual(1, cell[0]) - outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, outcome.category) + self.assertEqual(return_value, computation_future.result()) def test_callback(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: @@ -83,63 +85,67 @@ class LaterTest(unittest.TestCase): time.sleep(TICK * 2) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) callback_called[0] = False - outcome_passed_to_callback[0] = None + future_passed_to_callback[0] = None computation_future.add_done_callback(callback) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) def test_cancel(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: self.assertFalse(callback_called[0]) computation_future.cancel() self.assertTrue(computation_future.cancelled()) - self.assertFalse(computation_future.done()) - self.assertEqual(future.ABORTED, computation_future.outcome().category) + self.assertFalse(computation_future.running()) + self.assertTrue(computation_future.done()) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].cancelled()) - def test_outcome(self): + def test_result(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback_cell = [None] + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) - def callback(outcome): + return return_value + computation_future = later.later(TICK * 2, computation) + + def callback(future_passed_to_callback): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback_cell[0] = future_passed_to_callback computation_future.add_done_callback(callback) - returned_outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, returned_outcome.category) + returned_value = computation_future.result() + self.assertEqual(return_value, returned_value) # The callback may not yet have been called! Sleep a tick. time.sleep(TICK) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) if __name__ == '__main__': unittest.main() diff --git a/src/python/src/_framework/foundation/_timer_future.py b/src/python/src/_framework/foundation/_timer_future.py index 86bc073d562..4aa66991c5e 100644 --- a/src/python/src/_framework/foundation/_timer_future.py +++ b/src/python/src/_framework/foundation/_timer_future.py @@ -29,6 +29,7 @@ """Affords a Future implementation based on Python's threading.Timer.""" +import sys import threading import time @@ -52,7 +53,9 @@ class TimerFuture(future.Future): self._computing = False self._computed = False self._cancelled = False - self._outcome = None + self._result = None + self._exception = None + self._traceback = None self._waiting = [] def _compute(self): @@ -70,19 +73,24 @@ class TimerFuture(future.Future): self._computing = True try: - returned_value = self._computation() - outcome = future.returned(returned_value) + return_value = self._computation() + exception = None + traceback = None except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + return_value = None + exception = e + traceback = sys.exc_info()[2] with self._lock: self._computing = False self._computed = True - self._outcome = outcome + self._return_value = return_value + self._exception = exception + self._traceback = traceback waiting = self._waiting for callback in waiting: - callback(outcome) + callback(self) def start(self): """Starts this Future. @@ -104,13 +112,11 @@ class TimerFuture(future.Future): else: self._timer.cancel() self._cancelled = True - self._outcome = future.aborted() - outcome = self._outcome waiting = self._waiting for callback in waiting: try: - callback(outcome) + callback(self) except Exception: # pylint: disable=broad-except pass @@ -121,36 +127,102 @@ class TimerFuture(future.Future): with self._lock: return self._cancelled + def running(self): + """See future.Future.running for specification.""" + with self._lock: + return not self._computed and not self._cancelled + def done(self): """See future.Future.done for specification.""" with self._lock: - return self._computed + return self._computed or self._cancelled + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def outcome(self): - """See future.Future.outcome for specification.""" + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" with self._lock: - if self._computed or self._cancelled: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback condition = threading.Condition() - def notify_condition(unused_outcome): + def notify_condition(unused_future): with condition: condition.notify() self._waiting.append(notify_condition) with condition: - condition.wait() + condition.wait(timeout=timeout) with self._lock: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._lock: if not self._computed and not self._cancelled: - self._waiting.append(callback) + self._waiting.append(fn) return - else: - outcome = self._outcome - callback(outcome) + fn(self) diff --git a/src/python/src/_framework/foundation/callable_util.py b/src/python/src/_framework/foundation/callable_util.py index 1f7546cb76e..32b0751a01c 100644 --- a/src/python/src/_framework/foundation/callable_util.py +++ b/src/python/src/_framework/foundation/callable_util.py @@ -29,18 +29,47 @@ """Utilities for working with callables.""" +import abc +import collections +import enum import functools import logging -from _framework.foundation import future + +class Outcome(object): + """A sum type describing the outcome of some call. + + Attributes: + kind: One of Kind.RETURNED or Kind.RAISED respectively indicating that the + call returned a value or raised an exception. + return_value: The value returned by the call. Must be present if kind is + Kind.RETURNED. + exception: The exception raised by the call. Must be present if kind is + Kind.RAISED. + """ + __metaclass__ = abc.ABCMeta + + @enum.unique + class Kind(enum.Enum): + """Identifies the general kind of the outcome of some call.""" + + RETURNED = object() + RAISED = object() + + +class _EasyOutcome( + collections.namedtuple( + '_EasyOutcome', ['kind', 'return_value', 'exception']), + Outcome): + """A trivial implementation of Outcome.""" def _call_logging_exceptions(behavior, message, *args, **kwargs): try: - return future.returned(behavior(*args, **kwargs)) + return _EasyOutcome(Outcome.Kind.RETURNED, behavior(*args, **kwargs), None) except Exception as e: # pylint: disable=broad-except logging.exception(message) - return future.raised(e) + return _EasyOutcome(Outcome.Kind.RAISED, None, e) def with_exceptions_logged(behavior, message): @@ -72,7 +101,7 @@ def call_logging_exceptions(behavior, message, *args, **kwargs): **kwargs: Keyword arguments to pass to the given behavior. Returns: - A future.Outcome describing whether the given behavior returned a value or - raised an exception. + An Outcome describing whether the given behavior returned a value or raised + an exception. """ return _call_logging_exceptions(behavior, message, *args, **kwargs) diff --git a/src/python/src/_framework/foundation/future.py b/src/python/src/_framework/foundation/future.py index f00c503257a..bfc16fc1eaa 100644 --- a/src/python/src/_framework/foundation/future.py +++ b/src/python/src/_framework/foundation/future.py @@ -27,146 +27,210 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""The Future interface missing from Python's standard library. +"""A Future interface. -Python's concurrent.futures library defines a Future class very much like the -Future defined here, but since that class is concrete and without construction -semantics it is only available within the concurrent.futures library itself. -The Future class defined here is an entirely abstract interface that anyone may +Python doesn't have a Future interface in its standard library. In the absence +of such a standard, three separate, incompatible implementations +(concurrent.futures.Future, ndb.Future, and asyncio.Future) have appeared. This +interface attempts to be as compatible as possible with +concurrent.futures.Future. From ndb.Future it adopts a traceback-object accessor +method. + +Unlike the concrete and implemented Future classes listed above, the Future +class defined in this module is an entirely abstract interface that anyone may implement and use. + +The one known incompatibility between this interface and the interface of +concurrent.futures.Future is that this interface defines its own CancelledError +and TimeoutError exceptions rather than raising the implementation-private +concurrent.futures._base.CancelledError and the +built-in-but-only-in-3.3-and-later TimeoutError. """ import abc -import collections - -RETURNED = object() -RAISED = object() -ABORTED = object() - -class Outcome(object): - """A sum type describing the outcome of some computation. - - Attributes: - category: One of RETURNED, RAISED, or ABORTED, respectively indicating - that the computation returned a value, raised an exception, or was - aborted. - return_value: The value returned by the computation. Must be present if - category is RETURNED. - exception: The exception raised by the computation. Must be present if - category is RAISED. - """ - __metaclass__ = abc.ABCMeta +class TimeoutError(Exception): + """Indicates that a particular call timed out.""" -class _EasyOutcome( - collections.namedtuple('_EasyOutcome', - ['category', 'return_value', 'exception']), - Outcome): - """A trivial implementation of Outcome.""" -# All Outcomes describing abortion are indistinguishable so there might as well -# be only one. -_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None) +class CancelledError(Exception): + """Indicates that the computation underlying a Future was cancelled.""" -def aborted(): - """Returns an Outcome indicating that a computation was aborted. +class Future(object): + """A representation of a computation in another control flow. - Returns: - An Outcome indicating that a computation was aborted. + Computations represented by a Future may be yet to be begun, may be ongoing, + or may have already completed. """ - return _ABORTED_OUTCOME - - -def raised(exception): - """Returns an Outcome indicating that a computation raised an exception. - - Args: - exception: The exception raised by the computation. + __metaclass__ = abc.ABCMeta - Returns: - An Outcome indicating that a computation raised the given exception. - """ - return _EasyOutcome(RAISED, None, exception) + # NOTE(nathaniel): This isn't the return type that I would want to have if it + # were up to me. Were this interface being written from scratch, the return + # type of this method would probably be a sum type like: + # + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT + # COMPLETED + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + @abc.abstractmethod + def cancel(self): + """Attempts to cancel the computation. + This method does not block. -def returned(value): - """Returns an Outcome indicating that a computation returned a value. + Returns: + True if the computation has not yet begun, will not be allowed to take + place, and determination of both was possible without blocking. False + under all other circumstances including but not limited to the + computation's already having begun, the computation's already having + finished, and the computation's having been scheduled for execution on a + remote system for which a determination of whether or not it commenced + before being cancelled cannot be made without blocking. + """ + raise NotImplementedError() - Args: - value: The value returned by the computation. + # NOTE(nathaniel): Here too this isn't the return type that I'd want this + # method to have if it were up to me. I think I'd go with another sum type + # like: + # + # NOT_CANCELLED (this object's cancel method hasn't been called) + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT + # COMPLETED + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + # + # Notice how giving the cancel method the right semantics obviates most + # reasons for this method to exist. + @abc.abstractmethod + def cancelled(self): + """Describes whether the computation was cancelled. - Returns: - An Outcome indicating that a computation returned the given value. - """ - return _EasyOutcome(RETURNED, value, None) + This method does not block. + Returns: + True if the computation was cancelled any time before its result became + immediately available. False under all other circumstances including but + not limited to this object's cancel method not having been called and + the computation's result having become immediately available. + """ + raise NotImplementedError() -class Future(object): - """A representation of a computation happening in another control flow. + @abc.abstractmethod + def running(self): + """Describes whether the computation is taking place. - Computations represented by a Future may have already completed, may be - ongoing, or may be yet to be begun. + This method does not block. - Computations represented by a Future are considered uninterruptable; once - started they will be allowed to terminate either by returning or raising - an exception. - """ - __metaclass__ = abc.ABCMeta + Returns: + True if the computation is scheduled to take place in the future or is + taking place now, or False if the computation took place in the past or + was cancelled. + """ + raise NotImplementedError() + # NOTE(nathaniel): These aren't quite the semantics I'd like here either. I + # would rather this only returned True in cases in which the underlying + # computation completed successfully. A computation's having been cancelled + # conflicts with considering that computation "done". @abc.abstractmethod - def cancel(self): - """Attempts to cancel the computation. + def done(self): + """Describes whether the computation has taken place. + + This method does not block. Returns: - True if the computation will not be allowed to take place or False if - the computation has already taken place or is currently taking place. + True if the computation is known to have either completed or have been + unscheduled or interrupted. False if the computation may possibly be + executing or scheduled to execute later. """ raise NotImplementedError() @abc.abstractmethod - def cancelled(self): - """Describes whether the computation was cancelled. + def result(self, timeout=None): + """Accesses the outcome of the computation or raises its exception. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled, or None if this method should block until the + computation has finished or is cancelled no matter how long that takes. Returns: - True if the computation was cancelled and did not take place or False - if the computation took place, is taking place, or is scheduled to - take place in the future. + The return value of the computation. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will raise + the same exception. """ raise NotImplementedError() @abc.abstractmethod - def done(self): - """Describes whether the computation has taken place. + def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - True if the computation took place; False otherwise. + The exception raised by the computation, or None if the computation did + not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def outcome(self): - """Accesses the outcome of the computation. + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. - If the computation has not yet completed, this method blocks until it has. + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - An Outcome describing the outcome of the computation. + The traceback of the exception raised by the computation, or None if the + computation did not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def add_done_callback(self, callback): + def add_done_callback(self, fn): """Adds a function to be called at completion of the computation. - The callback will be passed an Outcome object describing the outcome of + The callback will be passed this Future object describing the outcome of the computation. If the computation has already completed, the callback will be called immediately. Args: - callback: A callable taking an Outcome as its single parameter. + fn: A callable taking a this Future object as its single parameter. """ raise NotImplementedError() From fbdd7abdca3a22b70576ab359f42710765735188 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sat, 14 Feb 2015 00:23:07 +0100 Subject: [PATCH 10/12] Cleaning up our posix definition / usage. -) Let's not use _POSIX_SOURCE. It usually implies too much C99. _BSD_SOURCE would be the right thing to do here. -) _BSD_SOURCE is getting deprecated by glibc, so we also have to define _DEFAULT_SOURCE under Linux. -) accept4 and eventfd arn't as old as we may think; let's detect for it. -) stdint.h interferes with all these definitions if included too early; let's move it down. --- include/grpc/support/port_platform.h | 33 +++++++++++++++++++++++----- src/core/iomgr/socket_utils_linux.c | 5 +---- src/core/support/file_posix.c | 13 ----------- src/core/support/log_posix.c | 10 --------- src/core/support/string_posix.c | 7 ------ src/core/support/sync_posix.c | 7 ------ src/core/support/time_posix.c | 8 ------- 7 files changed, 29 insertions(+), 54 deletions(-) diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index fbaefe6d099..5b9b3c47a6b 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -37,10 +37,6 @@ /* Override this file with one for your platform if you need to redefine things. */ -/* For a common case, assume that the platform has a C99-like stdint.h */ - -#include - #if !defined(GPR_NO_AUTODETECT_PLATFORM) #if defined(_WIN64) || defined(WIN64) #define GPR_WIN32 1 @@ -70,20 +66,40 @@ #define GPR_POSIX_TIME 1 #define GPR_GETPID_IN_UNISTD_H 1 #elif defined(__linux__) +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif +#ifndef _DEFAULT_SOURCE +#define _DEFAULT_SOURCE +#endif +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif #include #define GPR_CPU_LINUX 1 #define GPR_GCC_ATOMIC 1 #define GPR_LINUX 1 #define GPR_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GPR_POSIX_WAKEUP_FD 1 -#define GPR_LINUX_EVENTFD 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #ifdef __GLIBC_PREREQ +#if __GLIBC_PREREQ(2, 9) +#define GPR_LINUX_EVENTFD 1 +#endif +#if __GLIBC_PREREQ(2, 10) +#define GPR_LINUX_SOCKETUTILS 1 +#endif #if __GLIBC_PREREQ(2, 17) #define GPR_LINUX_ENV 1 #endif #endif +#ifndef GPR_LINUX_EVENTFD +#define GPR_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#endif +#ifndef GPR_LINUX_SOCKETUTILS +#define GPR_POSIX_SOCKETUTILS +#endif #ifndef GPR_LINUX_ENV #define GPR_POSIX_ENV 1 #endif @@ -98,6 +114,9 @@ #define GPR_ARCH_32 1 #endif /* _LP64 */ #elif defined(__APPLE__) +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif #define GPR_CPU_POSIX 1 #define GPR_GCC_ATOMIC 1 #define GPR_POSIX_LOG 1 @@ -123,6 +142,10 @@ #endif #endif /* GPR_NO_AUTODETECT_PLATFORM */ +/* For a common case, assume that the platform has a C99-like stdint.h */ + +#include + /* Cache line alignment */ #ifndef GPR_CACHELINE_SIZE #if defined(__i386__) || defined(__x86_64__) diff --git a/src/core/iomgr/socket_utils_linux.c b/src/core/iomgr/socket_utils_linux.c index 7ef58940c24..f3c22187d73 100644 --- a/src/core/iomgr/socket_utils_linux.c +++ b/src/core/iomgr/socket_utils_linux.c @@ -31,12 +31,9 @@ * */ -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif #include -#ifdef GPR_LINUX +#ifdef GPR_LINUX_SOCKETUTILS #include "src/core/iomgr/socket_utils_posix.h" diff --git a/src/core/support/file_posix.c b/src/core/support/file_posix.c index cb48b3d52f4..e1765666dbe 100644 --- a/src/core/support/file_posix.c +++ b/src/core/support/file_posix.c @@ -31,19 +31,6 @@ * */ -/* Posix code for gpr fdopen and mkstemp support. */ - -#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 200112L -#undef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 200112L -#endif - -/* Don't know why I have to do this for mkstemp, looks like _POSIX_C_SOURCE - should be enough... */ -#ifndef _BSD_SOURCE -#define _BSD_SOURCE -#endif - #include #ifdef GPR_POSIX_FILE diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c index 05f45de1308..36479baeed2 100644 --- a/src/core/support/log_posix.c +++ b/src/core/support/log_posix.c @@ -31,16 +31,6 @@ * */ -#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 200112L -#undef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 200112L -#endif - -/* FIXME: "posix" files probably shouldn't depend on _GNU_SOURCE */ -#ifndef _GNU_SOURCE -#define _GNU_SOURCE -#endif - #include #if defined(GPR_POSIX_LOG) diff --git a/src/core/support/string_posix.c b/src/core/support/string_posix.c index a6bb8058e6c..b6f0cd4af0c 100644 --- a/src/core/support/string_posix.c +++ b/src/core/support/string_posix.c @@ -31,13 +31,6 @@ * */ -/* Posix code for gpr snprintf support. */ - -#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 200112L -#undef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 200112L -#endif - #include #ifdef GPR_POSIX_STRING diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index a28a4c6bf46..94fc1b0bec3 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -31,13 +31,6 @@ * */ -/* Posix gpr synchroization support code. */ - -#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 199309L -#undef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 199309L -#endif - #include #ifdef GPR_POSIX_SYNC diff --git a/src/core/support/time_posix.c b/src/core/support/time_posix.c index 7f0f028183e..d206174edf3 100644 --- a/src/core/support/time_posix.c +++ b/src/core/support/time_posix.c @@ -31,14 +31,6 @@ * */ -/* Posix code for gpr time support. */ - -/* So we get nanosleep and clock_* */ -#if !defined _POSIX_C_SOURCE || _POSIX_C_SOURCE < 199309L -#undef _POSIX_C_SOURCE -#define _POSIX_C_SOURCE 199309L -#endif - #include #ifdef GPR_POSIX_TIME From d66cba2b93f9f91b87d380b23b8f571d10ab2bb6 Mon Sep 17 00:00:00 2001 From: "Nicolas \"Pixel\" Noble" Date: Sat, 14 Feb 2015 02:59:12 +0100 Subject: [PATCH 11/12] Properly msan-instrumenting protobuf and our C++ tests - but gflags isn't instrumented yet. --- INSTALL | 5 +++++ Makefile | 8 ++++---- templates/Makefile.template | 8 ++++---- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/INSTALL b/INSTALL index e3c05707db0..d2f08ec677e 100644 --- a/INSTALL +++ b/INSTALL @@ -90,6 +90,11 @@ these dependencies this way: # apt-get install autoconf libtool +If you want to run the tests using one of the sanitized configurations, you +will need clang and its instrumented libc++: + + # apt-get install clang libc++-dev + A word on OpenSSL ----------------- diff --git a/Makefile b/Makefile index fe0f7035e67..c7f7e9302dc 100644 --- a/Makefile +++ b/Makefile @@ -66,13 +66,13 @@ DEFINES_asan = NDEBUG VALID_CONFIG_msan = 1 REQUIRE_CUSTOM_LIBRARIES_msan = 1 CC_msan = clang -CXX_msan = clang++ +CXX_msan = clang++-libc++ LD_msan = clang -LDXX_msan = clang++ -CPPFLAGS_msan = -O1 -fsanitize=memory -fno-omit-frame-pointer +LDXX_msan = clang++-libc++ +CPPFLAGS_msan = -O1 -fsanitize=memory -fno-omit-frame-pointer -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 OPENSSL_CFLAGS_msan = -DPURIFY OPENSSL_CONFIG_msan = no-asm -LDFLAGS_msan = -fsanitize=memory +LDFLAGS_msan = -fsanitize=memory -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 DEFINES_msan = NDEBUG VALID_CONFIG_ubsan = 1 diff --git a/templates/Makefile.template b/templates/Makefile.template index b9ae217054e..992b9069328 100644 --- a/templates/Makefile.template +++ b/templates/Makefile.template @@ -83,13 +83,13 @@ DEFINES_asan = NDEBUG VALID_CONFIG_msan = 1 REQUIRE_CUSTOM_LIBRARIES_msan = 1 CC_msan = clang -CXX_msan = clang++ +CXX_msan = clang++-libc++ LD_msan = clang -LDXX_msan = clang++ -CPPFLAGS_msan = -O1 -fsanitize=memory -fno-omit-frame-pointer +LDXX_msan = clang++-libc++ +CPPFLAGS_msan = -O1 -fsanitize=memory -fno-omit-frame-pointer -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 OPENSSL_CFLAGS_msan = -DPURIFY OPENSSL_CONFIG_msan = no-asm -LDFLAGS_msan = -fsanitize=memory +LDFLAGS_msan = -fsanitize=memory -DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=1 DEFINES_msan = NDEBUG VALID_CONFIG_ubsan = 1 From 6d41a054f94badeba3f3d2c49daf61d437b2ac04 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Mon, 16 Feb 2015 02:12:48 +0000 Subject: [PATCH 12/12] Documentation tweaks in grpc.h. --- include/grpc/grpc.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 7b33a4d8619..9807de9f4bc 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -44,7 +44,7 @@ extern "C" { #endif -/* Completion Channels enable notification of the completion of asynchronous +/* Completion Queues enable notification of the completion of asynchronous actions. */ typedef struct grpc_completion_queue grpc_completion_queue; @@ -156,7 +156,8 @@ typedef enum grpc_op_error { struct grpc_byte_buffer; typedef struct grpc_byte_buffer grpc_byte_buffer; -/* Sample helpers to obtain byte buffers (these will certainly move place */ +/* Sample helpers to obtain byte buffers (these will certainly move + someplace else) */ grpc_byte_buffer *grpc_byte_buffer_create(gpr_slice *slices, size_t nslices); grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb); size_t grpc_byte_buffer_length(grpc_byte_buffer *bb); @@ -340,12 +341,12 @@ typedef struct grpc_op { /* Initialize the grpc library */ void grpc_init(void); -/* Shutdown the grpc library */ +/* Shut down the grpc library */ void grpc_shutdown(void); grpc_completion_queue *grpc_completion_queue_create(void); -/* Blocks until an event is available, the completion queue is being shutdown, +/* Blocks until an event is available, the completion queue is being shut down, or deadline is reached. Returns NULL on timeout, otherwise the event that occurred. Callers should call grpc_event_finish once they have processed the event. @@ -365,7 +366,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cq, grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, gpr_timespec deadline); -/* Cleanup any data owned by the event */ +/* Clean up any data owned by the event */ void grpc_event_finish(grpc_event *event); /* Begin destruction of a completion queue. Once all possible events are