diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 3362778224f..e0ce4b8cb74 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -50,7 +50,7 @@ typedef struct { size_t md_out_count; size_t md_out_capacity; - grpc_mdelem **md_out; + grpc_metadata *md_out; grpc_byte_buffer *msg_out; /* input buffers */ @@ -203,8 +203,8 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, ? NULL : TOMBSTONE_MASTER; } - master->on_complete(call, status, master->user_data); } + master->on_complete(call, status, master->user_data); } } @@ -237,22 +237,42 @@ static void finish_finish_step(void *pc, grpc_op_error error) { } } +static void finish_start_step(void *pc, grpc_op_error error) { + grpc_call *call = pc; + if (error == GRPC_OP_OK) { + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK); + start_next_step_and_unlock( + call, call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].master); + } else { + gpr_log(GPR_ERROR, "not implemented"); + abort(); + } +} + static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { reqinfo *requests = call->requests; grpc_byte_buffer *send_message = NULL; size_t i; - gpr_uint32 incomplete = master->need_mask & ~master->complete_mask; + grpc_call_op op; + gpr_uint32 incomplete; gpr_uint8 send_initial_metadata = 0; gpr_uint8 send_trailing_metadata = 0; gpr_uint8 send_blocked = 0; gpr_uint8 send_finished = 0; - gpr_uint8 completed; + + if (!IS_LIVE_MASTER(master)) { + gpr_mu_unlock(&call->mu); + return; + } + + incomplete = master->need_mask & ~master->complete_mask; if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_INITIAL_METADATA, incomplete)) { send_initial_metadata = 1; finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_OK); master->complete_mask |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; + send_blocked = 1; } if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_MESSAGES, incomplete)) { @@ -262,29 +282,15 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { call->write_index++; } - if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) { - send_finished = 1; - send_blocked = 1; - } - if (!send_blocked && OP_IN_MASK(GRPC_IOREQ_SEND_TRAILING_METADATA, incomplete)) { send_trailing_metadata = 1; finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK); } - completed = !send_blocked && master->complete_mask == master->need_mask; - - if (completed) { - master->on_complete(call, GRPC_OP_OK, master->user_data); - for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { - if (call->requests[i].master == master) { - call->requests[i].master = - (i == GRPC_IOREQ_SEND_MESSAGES || i == GRPC_IOREQ_RECV_MESSAGES) - ? NULL - : TOMBSTONE_MASTER; - } - } + if (!send_blocked && (OP_IN_MASK(GRPC_IOREQ_SEND_CLOSE, incomplete))) { + send_finished = 1; + send_blocked = 1; } gpr_mu_unlock(&call->mu); @@ -299,25 +305,20 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { (const gpr_uint8 *)md->value, md->value_length)); } - } - - if (send_message) { - grpc_call_op op; - op.type = GRPC_SEND_MESSAGE; + op.type = GRPC_SEND_START; op.dir = GRPC_CALL_DOWN; op.flags = 0; - op.data.message = send_message; - op.done_cb = finish_write_step; + op.done_cb = finish_start_step; op.user_data = call; grpc_call_execute_op(call, &op); } - if (send_finished) { - grpc_call_op op; - op.type = GRPC_SEND_FINISH; + if (send_message) { + op.type = GRPC_SEND_MESSAGE; op.dir = GRPC_CALL_DOWN; op.flags = 0; - op.done_cb = finish_finish_step; + op.data.message = send_message; + op.done_cb = finish_write_step; op.user_data = call; grpc_call_execute_op(call, &op); } @@ -333,6 +334,16 @@ static void start_next_step_and_unlock(grpc_call *call, reqinfo *master) { md->value_length)); } } + + if (send_finished) { + grpc_call_op op; + op.type = GRPC_SEND_FINISH; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.done_cb = finish_finish_step; + op.user_data = call; + grpc_call_execute_op(call, &op); + } } static grpc_call_error start_ioreq_error(grpc_call *call, @@ -489,9 +500,13 @@ void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) { elem->filter->call_op(elem, NULL, op); } -void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, - gpr_uint32 flags) { - legacy_state *ls = get_legacy_state(call); +grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, + gpr_uint32 flags) { + legacy_state *ls; + grpc_metadata *mdout; + + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); if (ls->md_out_count == ls->md_out_capacity) { ls->md_out_capacity = @@ -499,16 +514,14 @@ void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, ls->md_out = gpr_realloc(ls->md_out, sizeof(grpc_mdelem *) * ls->md_out_capacity); } - ls->md_out[ls->md_out_count++] = mdelem; -} + mdout = &ls->md_out[ls->md_out_count++]; + mdout->key = gpr_strdup(metadata->key); + mdout->value = gpr_malloc(metadata->value_length); + mdout->value_length = metadata->value_length; + memcpy(mdout->value, metadata->value, metadata->value_length); + + gpr_mu_unlock(&call->mu); -grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata, - gpr_uint32 flags) { - grpc_call_add_mdelem( - call, grpc_mdelem_from_string_and_buffer( - call->metadata_context, metadata->key, - (gpr_uint8 *)metadata->value, metadata->value_length), - flags); return GRPC_CALL_OK; } @@ -531,7 +544,6 @@ static void finish_status(grpc_call *call, grpc_op_error status, void *tag) { static void finish_recv_metadata(grpc_call *call, grpc_op_error status, void *tag) { - grpc_ioreq reqs[2]; legacy_state *ls; gpr_mu_lock(&call->mu); @@ -540,24 +552,50 @@ static void finish_recv_metadata(grpc_call *call, grpc_op_error status, grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, ls->md_in.count, ls->md_in.metadata); + } else { + grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, + NULL); + } + gpr_mu_unlock(&call->mu); +} + +static void finish_send_metadata(grpc_call *call, grpc_op_error status, + void *metadata_read_tag) { + grpc_ioreq reqs[2]; + legacy_state *ls; + + if (status == GRPC_OP_OK) { + /* Initially I thought about refactoring so that I could acquire this mutex + only + once, and then I remembered this API surface is deprecated and I moved + on. */ + + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; + reqs[0].data.recv_metadata = &ls->md_in; + GPR_ASSERT(GRPC_CALL_OK == start_ioreq_and_unlock(call, reqs, 1, + finish_recv_metadata, + metadata_read_tag)); + + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; reqs[0].data.recv_metadata = &ls->trail_md_in; reqs[1].op = GRPC_IOREQ_RECV_STATUS; reqs[1].data.recv_status = &ls->status_in; - if (GRPC_CALL_OK != start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs), - finish_status, - ls->finished_tag)) { - grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - GRPC_STATUS_UNKNOWN, - "Failed to start reading status", NULL, 0); - } + GPR_ASSERT(GRPC_CALL_OK != + start_ioreq_and_unlock(call, reqs, GPR_ARRAY_SIZE(reqs), + finish_status, ls->finished_tag)); } else { - gpr_mu_unlock(&call->mu); - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, - NULL); + gpr_mu_lock(&call->mu); + ls = get_legacy_state(call); + grpc_cq_end_client_metadata_read(call->cq, metadata_read_tag, call, + do_nothing, NULL, 0, NULL); grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, GRPC_STATUS_UNKNOWN, "Failed to read initial metadata", NULL, 0); + gpr_mu_unlock(&call->mu); } } @@ -575,9 +613,10 @@ grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq, err = bind_cq(call, cq); if (err != GRPC_CALL_OK) return err; - req.op = GRPC_IOREQ_RECV_INITIAL_METADATA; - req.data.recv_metadata = &ls->md_in; - return start_ioreq_and_unlock(call, &req, 1, finish_recv_metadata, + req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; + req.data.send_metadata.count = ls->md_out_count; + req.data.send_metadata.metadata = ls->md_out; + return start_ioreq_and_unlock(call, &req, 1, finish_send_metadata, metadata_read_tag); } diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 2da40607c5f..10f8dbe6c8f 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -38,8 +38,9 @@ #include "src/core/channel/metadata_buffer.h" #include -typedef void (*grpc_ioreq_completion_func)(grpc_call *call, grpc_op_error status, - void *user_data); +typedef void (*grpc_ioreq_completion_func)(grpc_call *call, + grpc_op_error status, + void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, const void *server_transport_data); @@ -51,14 +52,15 @@ void grpc_call_internal_unref(grpc_call *call); the completion queue/surface layer */ void grpc_call_recv_metadata(grpc_call_element *surface_element, grpc_mdelem *md); -void grpc_call_recv_message( - grpc_call_element *surface_element, grpc_byte_buffer *message); +void grpc_call_recv_message(grpc_call_element *surface_element, + grpc_byte_buffer *message); void grpc_call_read_closed(grpc_call_element *surface_element); void grpc_call_stream_closed(grpc_call_element *surface_element); void grpc_call_execute_op(grpc_call *call, grpc_call_op *op); -grpc_call_error grpc_call_start_ioreq_and_call_back(grpc_call *call, const grpc_ioreq *reqs, - size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data); +grpc_call_error grpc_call_start_ioreq_and_call_back( + grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, + grpc_ioreq_completion_func on_complete, void *user_data); /* Called when it's known that the initial batch of metadata is complete on the client side (must not be called on the server) */ @@ -73,7 +75,4 @@ grpc_call_stack *grpc_call_get_call_stack(grpc_call *call); /* Given the top call_element, get the call object. */ grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element); -void grpc_call_add_mdelem(grpc_call *call, grpc_mdelem *mdelem, - gpr_uint32 flags); - #endif /* __GRPC_INTERNAL_SURFACE_CALL_H__ */ diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index d63fb4e141c..e47fea431fc 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -51,7 +51,7 @@ struct grpc_channel { grpc_mdstr *authority_string; }; -#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1)) +#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c)+1)) grpc_channel *grpc_channel_create_from_filters( const grpc_channel_filter **filters, size_t num_filters, @@ -74,12 +74,13 @@ grpc_channel *grpc_channel_create_from_filters( static void do_nothing(void *ignored, grpc_op_error error) {} -grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *method, - const char *host, - gpr_timespec absolute_deadline) { +grpc_call *grpc_channel_create_call_old(grpc_channel *channel, + const char *method, const char *host, + gpr_timespec absolute_deadline) { grpc_call *call; grpc_mdelem *path_mdelem; grpc_mdelem *authority_mdelem; + grpc_call_op op; if (!channel->is_client) { gpr_log(GPR_ERROR, "Cannot create a call on the server."); @@ -95,16 +96,22 @@ grpc_call *grpc_channel_create_call_old(grpc_channel *channel, const char *metho path_mdelem = grpc_mdelem_from_metadata_strings( channel->metadata_context, channel->path_string, grpc_mdstr_from_string(channel->metadata_context, method)); - grpc_call_add_mdelem(call, path_mdelem, 0); + op.type = GRPC_SEND_METADATA; + op.dir = GRPC_CALL_DOWN; + op.flags = 0; + op.data.metadata = path_mdelem; + op.done_cb = do_nothing; + op.user_data = NULL; + grpc_call_execute_op(call, &op); grpc_mdstr_ref(channel->authority_string); authority_mdelem = grpc_mdelem_from_metadata_strings( channel->metadata_context, channel->authority_string, grpc_mdstr_from_string(channel->metadata_context, host)); - grpc_call_add_mdelem(call, authority_mdelem, 0); + op.data.metadata = authority_mdelem; + grpc_call_execute_op(call, &op); if (0 != gpr_time_cmp(absolute_deadline, gpr_inf_future)) { - grpc_call_op op; op.type = GRPC_SEND_DEADLINE; op.dir = GRPC_CALL_DOWN; op.flags = 0;