diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3352e427cd2..c7631c7d328 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -142,8 +142,6 @@ struct grpc_call { bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; - /** bitmask of live batches */ - uint8_t used_batches; /** which ops are in-flight */ bool sent_initial_metadata; bool sending_message; @@ -991,25 +989,23 @@ static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) { return !(flags & invalid_positions); } -static batch_control *allocate_batch_control(grpc_call *call) { - size_t i; - for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) { - if ((call->used_batches & (1 << i)) == 0) { - call->used_batches = (uint8_t)(call->used_batches | (uint8_t)(1 << i)); - return &call->active_batches[i]; - } +static int batch_slot_for_op(grpc_op_type type) { return (int)type; } + +static batch_control *allocate_batch_control(grpc_call *call, + const grpc_op *ops, + size_t num_ops) { + int slot = batch_slot_for_op(ops[0].op); + for (size_t i = 1; i < num_ops; i++) { + int op_slot = batch_slot_for_op(ops[i].op); + slot = GPR_MIN(slot, op_slot); } - return NULL; + return &call->active_batches[slot]; } static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, grpc_cq_completion *storage) { batch_control *bctl = user_data; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - call->used_batches = (uint8_t)( - call->used_batches & ~(uint8_t)(1 << (bctl - call->active_batches))); - gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } @@ -1093,11 +1089,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ grpc_closure_run(exec_ctx, bctl->notify_tag, error); - gpr_mu_lock(&call->mu); - bctl->call->used_batches = - (uint8_t)(bctl->call->used_batches & - ~(uint8_t)(1 << (bctl - bctl->call->active_batches))); - gpr_mu_unlock(&call->mu); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1309,6 +1300,11 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, finish_batch_step(exec_ctx, bctl); } +static void free_no_op_completion(grpc_exec_ctx *exec_ctx, void *p, + grpc_cq_completion *completion) { + gpr_free(completion); +} + static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_op *ops, size_t nops, void *notify_tag, @@ -1323,32 +1319,31 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_metadata compression_md; GPR_TIMER_BEGIN("grpc_call_start_batch", 0); - GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); + if (nops == 0) { + if (!is_notify_tag_closure) { + grpc_cq_begin_op(call->cq, notify_tag); + grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE, + free_no_op_completion, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + } + error = GRPC_CALL_OK; + goto done; + } + /* TODO(ctiller): this feels like it could be made lock-free */ - gpr_mu_lock(&call->mu); - bctl = allocate_batch_control(call); + bctl = allocate_batch_control(call, ops, nops); memset(bctl, 0, sizeof(*bctl)); bctl->call = call; bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); + gpr_mu_lock(&call->mu); grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); stream_op->covered_by_poller = true; - if (nops == 0) { - GRPC_CALL_INTERNAL_REF(call, "completion"); - if (!is_notify_tag_closure) { - grpc_cq_begin_op(call->cq, notify_tag); - } - gpr_mu_unlock(&call->mu); - post_batch_completion(exec_ctx, bctl); - error = GRPC_CALL_OK; - goto done; - } - /* rewrite batch ops into a transport op */ for (i = 0; i < nops; i++) { op = &ops[i];