|
|
@ -930,7 +930,9 @@ typedef struct client_channel_call_data { |
|
|
|
// Note: We inline the cache for the first 3 send_message ops and use
|
|
|
|
// Note: We inline the cache for the first 3 send_message ops and use
|
|
|
|
// dynamic allocation after that. This number was essentially picked
|
|
|
|
// dynamic allocation after that. This number was essentially picked
|
|
|
|
// at random; it could be changed in the future to tune performance.
|
|
|
|
// at random; it could be changed in the future to tune performance.
|
|
|
|
grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; |
|
|
|
grpc_core::ManualConstructor< |
|
|
|
|
|
|
|
grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> |
|
|
|
|
|
|
|
send_messages; |
|
|
|
// send_trailing_metadata
|
|
|
|
// send_trailing_metadata
|
|
|
|
bool seen_send_trailing_metadata; |
|
|
|
bool seen_send_trailing_metadata; |
|
|
|
grpc_linked_mdelem* send_trailing_metadata_storage; |
|
|
|
grpc_linked_mdelem* send_trailing_metadata_storage; |
|
|
@ -980,7 +982,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, |
|
|
|
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); |
|
|
|
gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); |
|
|
|
new (cache) grpc_core::ByteStreamCache( |
|
|
|
new (cache) grpc_core::ByteStreamCache( |
|
|
|
std::move(batch->payload->send_message.send_message)); |
|
|
|
std::move(batch->payload->send_message.send_message)); |
|
|
|
calld->send_messages.push_back(cache); |
|
|
|
calld->send_messages->push_back(cache); |
|
|
|
} |
|
|
|
} |
|
|
|
// Save metadata batch for send_trailing_metadata ops.
|
|
|
|
// Save metadata batch for send_trailing_metadata ops.
|
|
|
|
if (batch->send_trailing_metadata) { |
|
|
|
if (batch->send_trailing_metadata) { |
|
|
@ -1014,7 +1016,7 @@ static void free_cached_send_op_data_after_commit( |
|
|
|
"]", |
|
|
|
"]", |
|
|
|
chand, calld, i); |
|
|
|
chand, calld, i); |
|
|
|
} |
|
|
|
} |
|
|
|
calld->send_messages[i]->Destroy(); |
|
|
|
(*calld->send_messages)[i]->Destroy(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (retry_state->completed_send_trailing_metadata) { |
|
|
|
if (retry_state->completed_send_trailing_metadata) { |
|
|
|
grpc_metadata_batch_destroy(&calld->send_trailing_metadata); |
|
|
|
grpc_metadata_batch_destroy(&calld->send_trailing_metadata); |
|
|
@ -1038,7 +1040,7 @@ static void free_cached_send_op_data_for_completed_batch( |
|
|
|
"]", |
|
|
|
"]", |
|
|
|
chand, calld, retry_state->completed_send_message_count - 1); |
|
|
|
chand, calld, retry_state->completed_send_message_count - 1); |
|
|
|
} |
|
|
|
} |
|
|
|
calld->send_messages[retry_state->completed_send_message_count - 1] |
|
|
|
(*calld->send_messages)[retry_state->completed_send_message_count - 1] |
|
|
|
->Destroy(); |
|
|
|
->Destroy(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (batch_data->batch.send_trailing_metadata) { |
|
|
|
if (batch_data->batch.send_trailing_metadata) { |
|
|
@ -1286,7 +1288,8 @@ static bool pending_batch_is_completed( |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
if (pending->batch->send_message && |
|
|
|
if (pending->batch->send_message && |
|
|
|
retry_state->completed_send_message_count < calld->send_messages.size()) { |
|
|
|
retry_state->completed_send_message_count < |
|
|
|
|
|
|
|
calld->send_messages->size()) { |
|
|
|
return false; |
|
|
|
return false; |
|
|
|
} |
|
|
|
} |
|
|
|
if (pending->batch->send_trailing_metadata && |
|
|
|
if (pending->batch->send_trailing_metadata && |
|
|
@ -1321,7 +1324,7 @@ static bool pending_batch_is_unstarted( |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
if (pending->batch->send_message && |
|
|
|
if (pending->batch->send_message && |
|
|
|
retry_state->started_send_message_count < calld->send_messages.size()) { |
|
|
|
retry_state->started_send_message_count < calld->send_messages->size()) { |
|
|
|
return true; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
if (pending->batch->send_trailing_metadata && |
|
|
|
if (pending->batch->send_trailing_metadata && |
|
|
@ -1821,7 +1824,7 @@ static void add_closures_for_replay_or_pending_send_ops( |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
channel_data* chand = static_cast<channel_data*>(elem->channel_data); |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
call_data* calld = static_cast<call_data*>(elem->call_data); |
|
|
|
bool have_pending_send_message_ops = |
|
|
|
bool have_pending_send_message_ops = |
|
|
|
retry_state->started_send_message_count < calld->send_messages.size(); |
|
|
|
retry_state->started_send_message_count < calld->send_messages->size(); |
|
|
|
bool have_pending_send_trailing_metadata_op = |
|
|
|
bool have_pending_send_trailing_metadata_op = |
|
|
|
calld->seen_send_trailing_metadata && |
|
|
|
calld->seen_send_trailing_metadata && |
|
|
|
!retry_state->started_send_trailing_metadata; |
|
|
|
!retry_state->started_send_trailing_metadata; |
|
|
@ -2137,7 +2140,7 @@ static void add_retriable_send_message_op( |
|
|
|
chand, calld, retry_state->started_send_message_count); |
|
|
|
chand, calld, retry_state->started_send_message_count); |
|
|
|
} |
|
|
|
} |
|
|
|
grpc_core::ByteStreamCache* cache = |
|
|
|
grpc_core::ByteStreamCache* cache = |
|
|
|
calld->send_messages[retry_state->started_send_message_count]; |
|
|
|
(*calld->send_messages)[retry_state->started_send_message_count]; |
|
|
|
++retry_state->started_send_message_count; |
|
|
|
++retry_state->started_send_message_count; |
|
|
|
batch_data->send_message.Init(cache); |
|
|
|
batch_data->send_message.Init(cache); |
|
|
|
batch_data->batch.send_message = true; |
|
|
|
batch_data->batch.send_message = true; |
|
|
@ -2258,7 +2261,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( |
|
|
|
} |
|
|
|
} |
|
|
|
// send_message.
|
|
|
|
// send_message.
|
|
|
|
// Note that we can only have one send_message op in flight at a time.
|
|
|
|
// Note that we can only have one send_message op in flight at a time.
|
|
|
|
if (retry_state->started_send_message_count < calld->send_messages.size() && |
|
|
|
if (retry_state->started_send_message_count < calld->send_messages->size() && |
|
|
|
retry_state->started_send_message_count == |
|
|
|
retry_state->started_send_message_count == |
|
|
|
retry_state->completed_send_message_count && |
|
|
|
retry_state->completed_send_message_count && |
|
|
|
!calld->pending_send_message) { |
|
|
|
!calld->pending_send_message) { |
|
|
@ -2278,7 +2281,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( |
|
|
|
// to start, since we can't send down any more send_message ops after
|
|
|
|
// to start, since we can't send down any more send_message ops after
|
|
|
|
// send_trailing_metadata.
|
|
|
|
// send_trailing_metadata.
|
|
|
|
if (calld->seen_send_trailing_metadata && |
|
|
|
if (calld->seen_send_trailing_metadata && |
|
|
|
retry_state->started_send_message_count == calld->send_messages.size() && |
|
|
|
retry_state->started_send_message_count == calld->send_messages->size() && |
|
|
|
!retry_state->started_send_trailing_metadata && |
|
|
|
!retry_state->started_send_trailing_metadata && |
|
|
|
!calld->pending_send_trailing_metadata) { |
|
|
|
!calld->pending_send_trailing_metadata) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
|
if (grpc_client_channel_trace.enabled()) { |
|
|
@ -2329,7 +2332,7 @@ static void add_subchannel_batches_for_pending_batches( |
|
|
|
// send_message ops after send_trailing_metadata.
|
|
|
|
// send_message ops after send_trailing_metadata.
|
|
|
|
if (batch->send_trailing_metadata && |
|
|
|
if (batch->send_trailing_metadata && |
|
|
|
(retry_state->started_send_message_count + batch->send_message < |
|
|
|
(retry_state->started_send_message_count + batch->send_message < |
|
|
|
calld->send_messages.size() || |
|
|
|
calld->send_messages->size() || |
|
|
|
retry_state->started_send_trailing_metadata)) { |
|
|
|
retry_state->started_send_trailing_metadata)) { |
|
|
|
continue; |
|
|
|
continue; |
|
|
|
} |
|
|
|
} |
|
|
@ -2981,6 +2984,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, |
|
|
|
calld->deadline); |
|
|
|
calld->deadline); |
|
|
|
} |
|
|
|
} |
|
|
|
calld->enable_retries = chand->enable_retries; |
|
|
|
calld->enable_retries = chand->enable_retries; |
|
|
|
|
|
|
|
calld->send_messages.Init(); |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
return GRPC_ERROR_NONE; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -3015,6 +3019,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, |
|
|
|
calld->pick.subchannel_call_context[i].value); |
|
|
|
calld->pick.subchannel_call_context[i].value); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
calld->send_messages.Destroy(); |
|
|
|
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); |
|
|
|
GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|