|
|
|
@ -25,6 +25,8 @@ |
|
|
|
|
#include "absl/log/log.h" |
|
|
|
|
#include "absl/types/variant.h" |
|
|
|
|
|
|
|
|
|
#include <grpc/support/log.h> |
|
|
|
|
|
|
|
|
|
#include "src/core/lib/event_engine/default_event_engine.h" |
|
|
|
|
#include "src/core/lib/gprpp/crash.h" |
|
|
|
|
|
|
|
|
@ -112,13 +114,15 @@ absl::Status WireWriterImpl::MakeBinderTransaction( |
|
|
|
|
if (static_cast<int32_t>(tx_code) >= kFirstCallId) { |
|
|
|
|
int64_t parcel_size = parcel->GetDataSize(); |
|
|
|
|
if (parcel_size > 2 * kBlockSize) { |
|
|
|
|
LOG(ERROR) << "Unexpected large transaction (possibly caused by a very " |
|
|
|
|
"large metadata). This might overflow the binder " |
|
|
|
|
"transaction buffer. Size: " |
|
|
|
|
<< parcel_size << " bytes"; |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"Unexpected large transaction (possibly caused by a very large " |
|
|
|
|
"metadata). This might overflow the binder " |
|
|
|
|
"transaction buffer. Size: %" PRId64 " bytes", |
|
|
|
|
parcel_size); |
|
|
|
|
} |
|
|
|
|
num_outgoing_bytes_ += parcel_size; |
|
|
|
|
LOG(INFO) << "Total outgoing bytes: " << num_outgoing_bytes_.load(); |
|
|
|
|
gpr_log(GPR_INFO, "Total outgoing bytes: %" PRId64, |
|
|
|
|
num_outgoing_bytes_.load()); |
|
|
|
|
} |
|
|
|
|
CHECK(!is_transacting_); |
|
|
|
|
is_transacting_ = true; |
|
|
|
@ -329,9 +333,10 @@ void WireWriterImpl::OnAckReceived(int64_t num_bytes) { |
|
|
|
|
num_acknowledged_bytes_ = std::max(num_acknowledged_bytes_, num_bytes); |
|
|
|
|
int64_t num_outgoing_bytes = num_outgoing_bytes_; |
|
|
|
|
if (num_acknowledged_bytes_ > num_outgoing_bytes) { |
|
|
|
|
LOG(ERROR) << "The other end of transport acked more bytes than we ever " |
|
|
|
|
"sent, " |
|
|
|
|
<< num_acknowledged_bytes_ << " > " << num_outgoing_bytes; |
|
|
|
|
gpr_log(GPR_ERROR, |
|
|
|
|
"The other end of transport acked more bytes than we ever sent, " |
|
|
|
|
"%" PRId64 " > %" PRId64, |
|
|
|
|
num_acknowledged_bytes_, num_outgoing_bytes); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
TryScheduleTransaction(); |
|
|
|
@ -361,9 +366,11 @@ void WireWriterImpl::TryScheduleTransaction() { |
|
|
|
|
int64_t num_non_acked_bytes_estimation = |
|
|
|
|
num_total_bytes_will_be_sent - num_acknowledged_bytes_; |
|
|
|
|
if (num_non_acked_bytes_estimation < 0) { |
|
|
|
|
LOG(ERROR) << "Something went wrong. `num_non_acked_bytes_estimation` " |
|
|
|
|
"should be non-negative but it is " |
|
|
|
|
<< num_non_acked_bytes_estimation; |
|
|
|
|
gpr_log( |
|
|
|
|
GPR_ERROR, |
|
|
|
|
"Something went wrong. `num_non_acked_bytes_estimation` should be " |
|
|
|
|
"non-negative but it is %" PRId64, |
|
|
|
|
num_non_acked_bytes_estimation); |
|
|
|
|
} |
|
|
|
|
// If we can schedule another transaction (which has size estimation of
|
|
|
|
|
// `kBlockSize`) without exceeding `kFlowControlWindowSize`, schedule it.
|
|
|
|
@ -378,12 +385,12 @@ void WireWriterImpl::TryScheduleTransaction() { |
|
|
|
|
// It is common to fill `kFlowControlWindowSize` completely because
|
|
|
|
|
// transactions are send at faster rate than the other end of transport
|
|
|
|
|
// can handle it, so here we use `GPR_DEBUG` log level.
|
|
|
|
|
VLOG(2) << "Some work cannot be scheduled yet due to slow ack from the " |
|
|
|
|
"other end of transport. This transport might be blocked if " |
|
|
|
|
"this number don't go down. pending_outgoing_tx_.size() = " |
|
|
|
|
<< pending_outgoing_tx_.size() |
|
|
|
|
<< " pending_outgoing_tx_.front() = " |
|
|
|
|
<< pending_outgoing_tx_.front(); |
|
|
|
|
gpr_log(GPR_DEBUG, |
|
|
|
|
"Some work cannot be scheduled yet due to slow ack from the " |
|
|
|
|
"other end of transport. This transport might be blocked if this " |
|
|
|
|
"number don't go down. pending_outgoing_tx_.size() = %zu " |
|
|
|
|
"pending_outgoing_tx_.front() = %p", |
|
|
|
|
pending_outgoing_tx_.size(), pending_outgoing_tx_.front()); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|