pull/12856/head
Craig Tiller 7 years ago
parent 527253f27d
commit a69912cb2c
  1. 35
      src/core/ext/transport/chttp2/transport/writing.cc
  2. 12
      tools/run_tests/python_utils/jobset.py

@ -418,27 +418,27 @@ class StreamWriteContext {
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
if (!t_->is_client && s_->fetching_send_message == nullptr &&
s_->flow_controlled_buffer.length == 0 &&
s_->send_trailing_metadata == nullptr &&
s_->compressed_data_buffer.length == 0 &&
s_->send_trailing_metadata != nullptr &&
is_default_initial_metadata(s_->send_initial_metadata)) {
ConvertInitialMetadataToTrailingMetadata();
return; // early out
} else {
grpc_encode_header_options hopt = {
s_->id, // stream_id
false, // is_eof
t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
0, // use_true_binary_metadata
t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
&s_->stats.outgoing // stats
};
grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, NULL, 0,
s_->send_initial_metadata, &hopt, &t_->outbuf);
write_context_->ResetPingRecvClock();
write_context_->IncInitialMetadataWrites();
}
grpc_encode_header_options hopt = {
s_->id, // stream_id
false, // is_eof
t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] !=
0, // use_true_binary_metadata
t_->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE], // max_frame_size
&s_->stats.outgoing // stats
};
grpc_chttp2_encode_header(exec_ctx, &t_->hpack_compressor, NULL, 0,
s_->send_initial_metadata, &hopt, &t_->outbuf);
stream_became_writable_ = true;
write_context_->ResetPingRecvClock();
write_context_->IncInitialMetadataWrites();
s_->send_initial_metadata = NULL;
s_->sent_initial_metadata = true;
sent_initial_metadata_ = true;
@ -532,6 +532,7 @@ class StreamWriteContext {
s_->send_trailing_metadata, &hopt, &t_->outbuf);
}
write_context_->IncTrailingMetadataWrites();
write_context_->ResetPingRecvClock();
SentLastFrame(exec_ctx);
write_context_->NoteScheduledResults();

@ -412,7 +412,7 @@ class Jobset(object):
if current_cpu_cost + spec.cpu_cost <= self._maxjobs:
if len(self._running) < self._maxjobs_cpu_agnostic:
break
self.reap()
self.reap(spec.shortname, spec.cpu_cost)
if self.cancelled(): return False
job = Job(spec,
self._newline_on_success,
@ -424,7 +424,7 @@ class Jobset(object):
self.resultset[job.GetSpec().shortname] = []
return True
def reap(self):
def reap(self, waiting_for=None, waiting_for_cost=None):
"""Collect the dead jobs."""
while self._running:
dead = set()
@ -452,8 +452,12 @@ class Jobset(object):
sofar = now - self._start_time
remaining = sofar / self._completed * (self._remaining + len(self._running))
rstr = 'ETA %.1f sec; %s' % (remaining, rstr)
message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
rstr, len(self._running), self._completed, self._failures))
if waiting_for is not None:
wstr = ' next: %s @ %.2f cpu' % (waiting_for, waiting_for_cost)
else:
wstr = ''
message('WAITING', '%s%d jobs running, %d complete, %d failed (load %.2f)%s' % (
rstr, len(self._running), self._completed, self._failures, self.cpu_cost(), wstr))
if platform_string() == 'windows':
time.sleep(0.1)
else:

Loading…
Cancel
Save