From 8eb9d471cd68c1a14a76d8b34cbe5859cb00c911 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 27 Jan 2015 17:00:03 -0800 Subject: [PATCH] Rework call into a chttp2 style transaction system Simplifies locking and cross request chatter significantly --- src/core/surface/call.c | 458 +++++++++++++++++++++++----------------- 1 file changed, 270 insertions(+), 188 deletions(-) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index e0ce4b8cb74..ff0adb8fd3c 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -45,7 +45,7 @@ #include #include -#define INVALID_TAG ((void *)0xdeadbeef) +#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0) typedef struct { size_t md_out_count; @@ -63,7 +63,24 @@ typedef struct { void *finished_tag; } legacy_state; +typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; + +typedef enum { + SEND_NOTHING, + SEND_INITIAL_METADATA, + SEND_MESSAGE, + SEND_TRAILING_METADATA, + SEND_FINISH +} send_action; + +typedef struct { + grpc_ioreq_completion_func on_complete; + void *user_data; + grpc_op_error status; +} completed_request; + typedef struct reqinfo { + req_state state; grpc_ioreq_data data; struct reqinfo *master; grpc_ioreq_completion_func on_complete; @@ -85,8 +102,11 @@ struct grpc_call { gpr_uint8 read_closed; gpr_uint8 stream_closed; gpr_uint8 got_status_code; + gpr_uint8 sending; + gpr_uint8 num_completed_requests; reqinfo requests[GRPC_IOREQ_OP_COUNT]; + completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; grpc_byte_buffer_array buffered_messages; grpc_metadata_array buffered_initial_metadata; grpc_metadata_array buffered_trailing_metadata; @@ -116,10 +136,9 @@ struct grpc_call { y = temp; \ } while (0) -#define TOMBSTONE_MASTER ((void *)1) -#define IS_LIVE_MASTER(x) ((x) != NULL && (x) != TOMBSTONE_MASTER) - static void do_nothing(void *ignored, grpc_op_error also_ignored) {} +static send_action choose_send_action(grpc_call *call); +static void enact_send_action(grpc_call *call, send_action sa); grpc_call *grpc_call_create(grpc_channel *channel, const void *server_transport_data) { @@ -183,166 +202,205 @@ static void request_more_data(grpc_call *call) { grpc_call_execute_op(call, &op); } -#define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0) +static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } -static void start_next_step_and_unlock(grpc_call *call, reqinfo *master); +static void unlock(grpc_call *call) { + send_action sa; + completed_request completed_requests[GRPC_IOREQ_OP_COUNT]; + int num_completed_requests = call->num_completed_requests; + int i; + + if (num_completed_requests != 0) { + memcpy(completed_requests, call->completed_requests, + sizeof(completed_requests)); + call->num_completed_requests = 0; + } + + if (!call->sending) { + sa = choose_send_action(call); + if (sa != SEND_NOTHING) { + call->sending = 1; + } + } + + gpr_mu_unlock(&call->mu); + + if (sa != SEND_NOTHING) { + enact_send_action(call, sa); + } + + for (i = 0; i < num_completed_requests; i++) { + completed_requests[i].on_complete(call, completed_requests[i].status, + completed_requests[i].user_data); + } +} static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error status) { reqinfo *master = call->requests[op].master; + completed_request *cr; size_t i; - if (master == NULL || master == TOMBSTONE_MASTER) { - return; /* inactive */ - } - master->complete_mask |= 1 << op; - if (master->complete_mask == master->need_mask || status == GRPC_OP_ERROR) { - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->requests[i].master == master) { - call->requests[i].master = - (i == GRPC_IOREQ_SEND_MESSAGES || i == GRPC_IOREQ_RECV_MESSAGES) - ? NULL - : TOMBSTONE_MASTER; + switch (call->requests[op].state) { + case REQ_INITIAL: /* not started yet */ + return; + case REQ_DONE: /* already finished */ + abort(); + return; + case REQ_READY: + master->complete_mask |= 1 << op; + call->requests[op].state = + (op == GRPC_IOREQ_SEND_MESSAGES || op == GRPC_IOREQ_RECV_MESSAGES) + ? REQ_INITIAL + : REQ_DONE; + if (master->complete_mask == master->need_mask || + status == GRPC_OP_ERROR) { + for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { + if (call->requests[i].master == master) { + call->requests[i].master = NULL; + } + } + cr = &call->completed_requests[call->num_completed_requests++]; + cr->status = status; + cr->on_complete = master->on_complete; + cr->user_data = master->user_data; } - } - master->on_complete(call, status, master->user_data); } } static void finish_write_step(void *pc, grpc_op_error error) { grpc_call *call = pc; - gpr_mu_lock(&call->mu); + lock(call); if (error == GRPC_OP_OK) { if (call->write_index == call->requests[GRPC_IOREQ_SEND_MESSAGES].data.send_messages.count) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_OK); } - start_next_step_and_unlock(call, - call->requests[GRPC_IOREQ_SEND_MESSAGES].master); } else { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGES, GRPC_OP_ERROR); - gpr_mu_unlock(&call->mu); } + call->sending = 0; + unlock(call); } static void finish_finish_step(void *pc, grpc_op_error error) { grpc_call *call = pc; + lock(call); if (error == GRPC_OP_OK) { - gpr_mu_lock(&call->mu); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); - start_next_step_and_unlock(call, - call->requests[GRPC_IOREQ_SEND_CLOSE].master); } else { gpr_log(GPR_ERROR, "not implemented"); abort(); } + call->sending = 0; + unlock(call); } static void finish_start_step(void *pc, grpc_op_error error) { grpc_call *call = pc; + lock(call); if (error == GRPC_OP_OK) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK); - start_next_step_and_unlock( - call, call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].master); } else { gpr_log(GPR_ERROR, "not implemented"); abort(); } -} - -static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { - reqinfo *requests = call->requests; - grpc_byte_buffer *send_message = NULL; - size_t i; + call->sending = 0; + unlock(call); +} + +static send_action choose_send_action(grpc_call *call) { + switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].state) { + case REQ_INITIAL: + return SEND_NOTHING; + case REQ_READY: + return SEND_INITIAL_METADATA; + case REQ_DONE: + break; + } + switch (call->requests[GRPC_IOREQ_SEND_MESSAGES].state) { + case REQ_INITIAL: + return SEND_NOTHING; + case REQ_READY: + return SEND_MESSAGE; + case REQ_DONE: + break; + } + switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].state) { + case REQ_INITIAL: + return SEND_NOTHING; + case REQ_READY: + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); + return SEND_TRAILING_METADATA; + case REQ_DONE: + break; + } + switch (call->requests[GRPC_IOREQ_SEND_CLOSE].state) { + default: + return SEND_NOTHING; + case REQ_READY: + return SEND_FINISH; + } +} + +static void enact_send_action(grpc_call *call, send_action sa) { + grpc_ioreq_data data; grpc_call_op op; - gpr_uint32 incomplete; - gpr_uint8 send_initial_metadata = 0; - gpr_uint8 send_trailing_metadata = 0; - gpr_uint8 send_blocked = 0; - gpr_uint8 send_finished = 0; - - if (!IS_LIVE_MASTER(master)) { - gpr_mu_unlock(&call->mu); - return; - } - - incomplete = master->need_mask & ~master->complete_mask; - - if (!send_blocked && - OP_IN_MASK(GRPC_IOREQ_SEND_INITIAL_METADATA, incomplete)) { - send_initial_metadata = 1; - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK); - master->complete_mask |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - send_blocked = 1; - } - - if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_MESSAGES, incomplete)) { - grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_MESSAGES].data; - send_message = data.send_messages.messages[call->write_index]; - send_blocked = 1; - call->write_index++; - } - - if (!send_blocked && - OP_IN_MASK(GRPC_IOREQ_SEND_TRAILING_METADATA, incomplete)) { - send_trailing_metadata = 1; - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); - } - - if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) { - send_finished = 1; - send_blocked = 1; - } - - gpr_mu_unlock(&call->mu); - - if (send_initial_metadata) { - grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - grpc_call_element_send_metadata( - CALL_ELEM_FROM_CALL(call, 0), - grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length)); - } - op.type = GRPC_SEND_START; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_start_step; - op.user_data = call; - grpc_call_execute_op(call, &op); - } - - if (send_message) { - op.type = GRPC_SEND_MESSAGE; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.data.message = send_message; - op.done_cb = finish_write_step; - op.user_data = call; - grpc_call_execute_op(call, &op); - } - - if (send_trailing_metadata) { - grpc_ioreq_data data = requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; - for (i = 0; i < data.send_metadata.count; i++) { - const grpc_metadata *md = &data.send_metadata.metadata[i]; - grpc_call_element_send_metadata( - CALL_ELEM_FROM_CALL(call, 0), - grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, - (const gpr_uint8 *)md->value, - md->value_length)); - } - } - - if (send_finished) { - grpc_call_op op; - op.type = GRPC_SEND_FINISH; - op.dir = GRPC_CALL_DOWN; - op.flags = 0; - op.done_cb = finish_finish_step; - op.user_data = call; - grpc_call_execute_op(call, &op); + int i; + + switch (sa) { + case SEND_NOTHING: + abort(); + break; + case SEND_INITIAL_METADATA: + data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + grpc_call_element_send_metadata( + CALL_ELEM_FROM_CALL(call, 0), + grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length)); + } + op.type = GRPC_SEND_START; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.start.pollset = grpc_cq_pollset(call->cq); + op.done_cb = finish_start_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + break; + case SEND_MESSAGE: + data = call->requests[GRPC_IOREQ_SEND_MESSAGES].data; + op.type = GRPC_SEND_MESSAGE; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.message = data.send_messages.messages[call->write_index]; + op.done_cb = finish_write_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + break; + case SEND_TRAILING_METADATA: + data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data; + for (i = 0; i < data.send_metadata.count; i++) { + const grpc_metadata *md = &data.send_metadata.metadata[i]; + grpc_call_element_send_metadata( + CALL_ELEM_FROM_CALL(call, 0), + grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key, + (const gpr_uint8 *)md->value, + md->value_length)); + } + lock(call); + call->sending = 0; + unlock(call); + break; + case SEND_FINISH: + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = finish_finish_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + break; } } @@ -355,13 +413,13 @@ static grpc_call_error start_ioreq_error(grpc_call *call, call->requests[i].master = NULL; } } - gpr_mu_unlock(&call->mu); return ret; } -static grpc_call_error start_ioreq_and_unlock( - grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, - grpc_ioreq_completion_func completion, void *user_data) { +static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, + size_t nreqs, + grpc_ioreq_completion_func completion, + void *user_data) { size_t i; gpr_uint32 have_ops = 0; gpr_uint32 precomplete = 0; @@ -376,6 +434,16 @@ static grpc_call_error start_ioreq_and_unlock( return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); } + switch (requests[op].state) { + case REQ_INITIAL: + break; + case REQ_READY: + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS); + case REQ_DONE: + return start_ioreq_error(call, have_ops, + GRPC_CALL_ERROR_ALREADY_INVOKED); + } if (master == NULL) { master = &requests[op]; } @@ -391,6 +459,7 @@ static grpc_call_error start_ioreq_and_unlock( SWAP(grpc_byte_buffer_array, *data.recv_messages, call->buffered_messages); precomplete |= 1 << op; + abort(); } break; case GRPC_IOREQ_SEND_MESSAGES: @@ -398,6 +467,7 @@ static grpc_call_error start_ioreq_and_unlock( break; } + requests[op].state = REQ_READY; requests[op].data = data; requests[op].master = master; } @@ -408,8 +478,6 @@ static grpc_call_error start_ioreq_and_unlock( master->on_complete = completion; master->user_data = user_data; - start_next_step_and_unlock(call, master); - if (OP_IN_MASK(GRPC_IOREQ_RECV_MESSAGES, have_ops & ~precomplete)) { request_more_data(call); } @@ -424,15 +492,21 @@ static void call_start_ioreq_done(grpc_call *call, grpc_op_error status, grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, void *tag) { - gpr_mu_lock(&call->mu); - return start_ioreq_and_unlock(call, reqs, nreqs, call_start_ioreq_done, tag); + grpc_call_error err; + lock(call); + err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag); + unlock(call); + return err; } grpc_call_error grpc_call_start_ioreq_and_call_back( grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) { - gpr_mu_lock(&call->mu); - return start_ioreq_and_unlock(call, reqs, nreqs, on_complete, user_data); + grpc_call_error err; + lock(call); + err = start_ioreq(call, reqs, nreqs, on_complete, user_data); + unlock(call); + return err; } void grpc_call_destroy(grpc_call *c) { @@ -505,7 +579,7 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, legacy_state *ls; grpc_metadata *mdout; - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); if (ls->md_out_count == ls->md_out_capacity) { @@ -520,7 +594,7 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, mdout->value_length = metadata->value_length; memcpy(mdout->value, metadata->value, metadata->value_length); - gpr_mu_unlock(&call->mu); + unlock(call); return GRPC_CALL_OK; } @@ -528,9 +602,9 @@ grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { legacy_state *ls; - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); - gpr_mu_unlock(&call->mu); + unlock(call); if (status == GRPC_OP_OK) { grpc_cq_end_finished(call->cq, tag, call, do_nothing, NULL, @@ -546,7 +620,7 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status, void *tag) { legacy_state *ls; - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); if (status == GRPC_OP_OK) { grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, @@ -556,7 +630,7 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status, grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, NULL); } - gpr_mu_unlock(&call->mu); + unlock(call); } static void finish_send_metadata(grpc_call *call, grpc_op_error status, @@ -564,39 +638,30 @@ static void finish_send_metadata(grpc_call *call, grpc_op_error status, grpc_ioreq reqs[2]; legacy_state *ls; + lock(call); if (status == GRPC_OP_OK) { - /* Initially I thought about refactoring so that I could acquire this mutex - only - once, and then I remembered this API surface is deprecated and I moved - on. */ - - gpr_mu_lock(&call->mu); ls = get_legacy_state(call); reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; reqs[0].data.recv_metadata = &ls->md_in; - GPR_ASSERT(GRPC_CALL_OK == start_ioreq_and_unlock(call, reqs, 1, - finish_recv_metadata, - metadata_read_tag)); + GPR_ASSERT(GRPC_CALL_OK == start_ioreq(call, reqs, 1, finish_recv_metadata, + metadata_read_tag)); - gpr_mu_lock(&call->mu); ls = get_legacy_state(call); reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; reqs[0].data.recv_metadata = &ls->trail_md_in; reqs[1].op = GRPC_IOREQ_RECV_STATUS; reqs[1].data.recv_status = &ls->status_in; - GPR_ASSERT(GRPC_CALL_OK != - start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs), - finish_status, ls->finished_tag)); + GPR_ASSERT(GRPC_CALL_OK == + start_ioreq(call, reqs, 2, finish_status, ls->finished_tag)); } else { - gpr_mu_lock(&call->mu); ls = get_legacy_state(call); grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, do_nothing, NULL, 0, NULL); grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, GRPC_STATUS_UNKNOWN, "Failed to read initial metadata", NULL, 0); - gpr_mu_unlock(&call->mu); } + unlock(call); } grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, @@ -609,31 +674,36 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); grpc_cq_begin_op(cq, call, GRPC_FINISHED); - gpr_mu_lock(&call->mu); + lock(call); err = bind_cq(call, cq); if (err != GRPC_CALL_OK) return err; req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; req.data.send_metadata.count = ls->md_out_count; req.data.send_metadata.metadata = ls->md_out; - return start_ioreq_and_unlock(call, &req, 1, finish_send_metadata, - metadata_read_tag); + err = start_ioreq(call, &req, 1, finish_send_metadata, metadata_read_tag); + unlock(call); + return err; } grpc_call_error grpc_call_server_accept(grpc_call *call, grpc_completion_queue *cq, void *finished_tag) { grpc_ioreq req; + grpc_call_error err; /* inform the completion queue of an incoming operation (corresponding to finished_tag) */ grpc_cq_begin_op(cq, call, GRPC_FINISHED); - bind_cq(call, cq); + err = bind_cq(call, cq); + if (err != GRPC_CALL_OK) return err; req.op = GRPC_IOREQ_RECV_STATUS; req.data.recv_status = &get_legacy_state(call)->status_in; - return start_ioreq_and_unlock(call, &req, 1, finish_status, finished_tag); + err = start_ioreq(call, &req, 1, finish_status, finished_tag); + unlock(call); + return err; } grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, @@ -644,15 +714,15 @@ grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call, void grpc_call_client_initial_metadata_complete( grpc_call_element *surface_element) { grpc_call *call = grpc_call_from_top_element(surface_element); - gpr_mu_lock(&call->mu); + lock(call); call->got_initial_metadata = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); - gpr_mu_unlock(&call->mu); + unlock(call); } static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { legacy_state *ls; - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); if (ls->msg_in.count == 0) { grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL); @@ -660,29 +730,31 @@ static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, ls->msg_in.buffers[ls->msg_in_read_idx++]); } - gpr_mu_unlock(&call->mu); + unlock(call); } grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) { legacy_state *ls; grpc_ioreq req; + grpc_call_error err; grpc_cq_begin_op(call->cq, call, GRPC_READ); - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); if (ls->msg_in_read_idx == ls->msg_in.count) { ls->msg_in_read_idx = 0; req.op = GRPC_IOREQ_RECV_MESSAGES; req.data.recv_messages = &ls->msg_in; - return start_ioreq_and_unlock(call, &req, 1, finish_read, tag); + err = start_ioreq(call, &req, 1, finish_read, tag); + } else { + err = GRPC_CALL_OK; + grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, + ls->msg_in.buffers[ls->msg_in_read_idx++]); } - - grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, - ls->msg_in.buffers[ls->msg_in_read_idx++]); - gpr_mu_unlock(&call->mu); - return GRPC_CALL_OK; + unlock(call); + return err; } static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { @@ -694,16 +766,20 @@ grpc_call_error grpc_call_start_write(grpc_call *call, gpr_uint32 flags) { grpc_ioreq req; legacy_state *ls; + grpc_call_error err; grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - gpr_mu_lock(&call->mu); + lock(call); ls = get_legacy_state(call); ls->msg_out = byte_buffer; req.op = GRPC_IOREQ_SEND_MESSAGES; req.data.send_messages.count = 1; req.data.send_messages.messages = &ls->msg_out; - return start_ioreq_and_unlock(call, &req, 1, finish_write, tag); + err = start_ioreq(call, &req, 1, finish_write, tag); + unlock(call); + + return err; } static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { @@ -712,24 +788,32 @@ static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) { grpc_ioreq req; + grpc_call_error err; grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - gpr_mu_lock(&call->mu); + lock(call); req.op = GRPC_IOREQ_SEND_CLOSE; - return start_ioreq_and_unlock(call, &req, 1, finish_finish, tag); + err = start_ioreq(call, &req, 1, finish_finish, tag); + unlock(call); + + return err; } grpc_call_error grpc_call_start_write_status(grpc_call *call, grpc_status_code status, const char *details, void *tag) { grpc_ioreq req; + grpc_call_error err; grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - gpr_mu_lock(&call->mu); + lock(call); req.op = GRPC_IOREQ_SEND_CLOSE; req.data.send_close.status = status; req.data.send_close.details = details; - return start_ioreq_and_unlock(call, &req, 1, finish_finish, tag); + err = start_ioreq(call, &req, 1, finish_finish, tag); + unlock(call); + + return err; } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -762,18 +846,18 @@ void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) { void grpc_call_read_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - gpr_mu_lock(&call->mu); + lock(call); GPR_ASSERT(!call->read_closed); call->read_closed = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); - gpr_mu_unlock(&call->mu); + unlock(call); } void grpc_call_stream_closed(grpc_call_element *elem) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); - gpr_mu_lock(&call->mu); + lock(call); if (!call->read_closed) { call->read_closed = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); @@ -782,7 +866,7 @@ void grpc_call_stream_closed(grpc_call_element *elem) { } call->stream_closed = 1; finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); - gpr_mu_unlock(&call->mu); + unlock(call); } /* we offset status by a small amount when storing it into transport metadata @@ -812,7 +896,7 @@ void grpc_call_recv_message(grpc_call_element *elem, grpc_byte_buffer *byte_buffer) { grpc_call *call = CALL_FROM_TOP_ELEM(elem); grpc_byte_buffer_array *dest; - gpr_mu_lock(&call->mu); + lock(call); if (call->requests[GRPC_IOREQ_RECV_MESSAGES].master != NULL) { dest = call->requests[GRPC_IOREQ_RECV_MESSAGES].data.recv_messages; } else { @@ -825,7 +909,7 @@ void grpc_call_recv_message(grpc_call_element *elem, } dest->buffers[dest->count++] = byte_buffer; finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK); - gpr_mu_unlock(&call->mu); + unlock(call); } void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { @@ -834,7 +918,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_metadata_array *dest; grpc_metadata *mdusr; - gpr_mu_lock(&call->mu); + lock(call); if (key == grpc_channel_get_status_string(call->channel)) { maybe_set_status_code(call, decode_status(md)); grpc_mdelem_unref(md); @@ -843,14 +927,12 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { grpc_mdelem_unref(md); } else { if (!call->got_initial_metadata) { - dest = IS_LIVE_MASTER( - call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].master) + dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] .data.recv_metadata : &call->buffered_initial_metadata; } else { - dest = IS_LIVE_MASTER( - call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].master) + dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA] .data.recv_metadata : &call->buffered_trailing_metadata; @@ -865,7 +947,7 @@ void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) { mdusr->value = (char *)grpc_mdstr_as_c_string(md->value); mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice); } - gpr_mu_unlock(&call->mu); + unlock(call); } grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {