diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 00ec9c7c9a7..e82d538d294 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -170,9 +170,6 @@ struct grpc_call { gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - /* have we received initial metadata */ - bool has_initial_md_been_received; - batch_control *active_batches[MAX_CONCURRENT_BATCHES]; grpc_transport_stream_op_batch_payload stream_op_payload; @@ -226,7 +223,10 @@ struct grpc_call { } server; } final_op; - void *saved_receiving_stream_ready_bctlp; + // Either 0 (no initial metadata and messages received), + // 1 (recieved initial metadata first) + // or a batch_control* (received messages first the lowest bit is 0) + gpr_atm saved_receiving_stream_ready_bctlp; }; grpc_tracer_flag grpc_call_error_trace = @@ -1290,11 +1290,10 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } - if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || - call->receiving_stream == NULL) { + if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL || + !gpr_atm_rel_cas(&call->saved_receiving_stream_ready_bctlp, 0, + (gpr_atm)bctlp)) { process_data_after_md(exec_ctx, bctlp); - } else { - call->saved_receiving_stream_ready_bctlp = bctlp; } } @@ -1384,12 +1383,28 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } - call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, - grpc_schedule_on_exec_ctx); - call->saved_receiving_stream_ready_bctlp = NULL; + grpc_closure *saved_rsr_closure = NULL; + while (true) { + gpr_atm rsr_bctlp = + gpr_atm_acq_load(&call->saved_receiving_stream_ready_bctlp); + /* Should only receive initial metadata once */ + GPR_ASSERT(rsr_bctlp != 1); + if (rsr_bctlp == 0) { + /* Not received initial metadata and messages */ + if (gpr_atm_no_barrier_cas(&call->saved_receiving_stream_ready_bctlp, 0, + 1)) { + break; + } + } else { + /* Already received messages */ + saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready, + (batch_control *)rsr_bctlp, + grpc_schedule_on_exec_ctx); + /* No need to modify saved_receiving_stream_ready_bctlp */ + break; + } + } + if (saved_rsr_closure != NULL) { GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); }