Add support for Trailers-Only responses.

- When receiving a Trailers-Only response, return the metadata as
  trailing metadata instead of initial metadata.
- Send Trailers-Only response when we have no non-default initial metadata,
  no message to send, and trailing metadata to send.
pull/11499/head
Mark D. Roth 8 years ago
parent 4d5f30d9db
commit bd3b93b4b5
  1. 2
      src/core/ext/transport/chttp2/transport/chttp2_transport.c
  2. 2
      src/core/ext/transport/chttp2/transport/frame_rst_stream.c
  3. 14
      src/core/ext/transport/chttp2/transport/hpack_encoder.c
  4. 2
      src/core/ext/transport/chttp2/transport/hpack_encoder.h
  5. 1
      src/core/ext/transport/chttp2/transport/internal.h
  6. 12
      src/core/ext/transport/chttp2/transport/parsing.c
  7. 87
      src/core/ext/transport/chttp2/transport/writing.c
  8. 53
      src/core/lib/surface/call.c
  9. 4
      src/core/lib/transport/transport.h
  10. 6
      test/core/transport/chttp2/hpack_encoder_test.c
  11. 2
      test/cpp/microbenchmarks/bm_chttp2_hpack.cc

@ -1403,6 +1403,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op,
op_payload->recv_initial_metadata.recv_initial_metadata_ready;
s->recv_initial_metadata =
op_payload->recv_initial_metadata.recv_initial_metadata;
s->trailing_metadata_available =
op_payload->recv_initial_metadata.trailing_metadata_available;
grpc_chttp2_maybe_complete_recv_initial_metadata(exec_ctx, t, s);
}

@ -93,7 +93,7 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(grpc_exec_ctx *exec_ctx,
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]));
grpc_error *error = GRPC_ERROR_NONE;
if (reason != GRPC_HTTP2_NO_ERROR || s->header_frames_received < 2) {
if (reason != GRPC_HTTP2_NO_ERROR || s->metadata_buffer[1].size == 0) {
char *message;
gpr_asprintf(&message, "Received RST_STREAM with error code %d", reason);
error = grpc_error_set_int(

@ -608,15 +608,14 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
grpc_mdelem **extra_headers,
size_t extra_headers_size,
grpc_metadata_batch *metadata,
const grpc_encode_header_options *options,
grpc_slice_buffer *outbuf) {
framer_state st;
grpc_linked_mdelem *l;
gpr_timespec deadline;
GPR_ASSERT(options->stream_id != 0);
framer_state st;
st.seen_regular_header = 0;
st.stream_id = options->stream_id;
st.output = outbuf;
@ -633,11 +632,14 @@ void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
if (c->advertise_table_size_change != 0) {
emit_advertise_table_size_change(c, &st);
}
for (size_t i = 0; i < extra_headers_size; ++i) {
hpack_enc(exec_ctx, c, *extra_headers[i], &st);
}
grpc_metadata_batch_assert_ok(metadata);
for (l = metadata->list.head; l; l = l->next) {
for (grpc_linked_mdelem *l = metadata->list.head; l; l = l->next) {
hpack_enc(exec_ctx, c, l->md, &st);
}
deadline = metadata->deadline;
gpr_timespec deadline = metadata->deadline;
if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) != 0) {
deadline_enc(exec_ctx, c, deadline, &st);
}

@ -85,6 +85,8 @@ typedef struct {
void grpc_chttp2_encode_header(grpc_exec_ctx *exec_ctx,
grpc_chttp2_hpack_compressor *c,
grpc_mdelem **extra_headers,
size_t extra_headers_size,
grpc_metadata_batch *metadata,
const grpc_encode_header_options *options,
grpc_slice_buffer *outbuf);

@ -447,6 +447,7 @@ struct grpc_chttp2_stream {
grpc_metadata_batch *recv_initial_metadata;
grpc_closure *recv_initial_metadata_ready;
bool *trailing_metadata_available;
grpc_byte_stream **recv_message;
grpc_closure *recv_message_ready;
grpc_metadata_batch *recv_trailing_metadata;

@ -681,9 +681,19 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
t->parser_data = &t->hpack_parser;
switch (s->header_frames_received) {
case 0:
t->hpack_parser.on_header = on_initial_header;
if (t->is_client && t->header_eof) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing Trailers-Only"));
if (s->trailing_metadata_available != NULL) {
*s->trailing_metadata_available = true;
}
t->hpack_parser.on_header = on_trailing_header;
} else {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing initial_metadata"));
t->hpack_parser.on_header = on_initial_header;
}
break;
case 1:
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "parsing trailing_metadata"));
t->hpack_parser.on_header = on_trailing_header;
break;
case 2:

@ -162,6 +162,20 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) {
return 1024 * 1024;
}
// Returns true if initial_metadata contains only default headers.
//
// TODO(roth): The fact that we hard-code these particular headers here
// is fairly ugly. Need some better way to know which headers are
// default, maybe via a bit in the static metadata table?
static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) {
int num_default_fields =
(initial_metadata->idx.named.status != NULL) +
(initial_metadata->idx.named.content_type != NULL) +
(initial_metadata->idx.named.grpc_encoding != NULL) +
(initial_metadata->idx.named.grpc_accept_encoding != NULL);
return (size_t)num_default_fields == initial_metadata->list.count;
}
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
grpc_chttp2_stream *s;
@ -218,31 +232,59 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
t->is_client ? "CLIENT" : "SERVER", s->id, sent_initial_metadata,
s->send_initial_metadata != NULL, s->announce_window));
grpc_mdelem *extra_headers_for_trailing_metadata[2];
size_t num_extra_headers_for_trailing_metadata = 0;
/* send initial metadata if it's available */
if (!sent_initial_metadata && s->send_initial_metadata) {
grpc_encode_header_options hopt = {
.stream_id = s->id,
.is_eof = false,
.use_true_binary_metadata =
t->settings
[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0,
.max_frame_size = t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor,
s->send_initial_metadata, &hopt, &t->outbuf);
if (!sent_initial_metadata && s->send_initial_metadata != NULL) {
// We skip this on the server side if there is no custom initial
// metadata, there are no messages to send, and we are also sending
// trailing metadata. This results in a Trailers-Only response,
// which is required for retries, as per:
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#when-retries-are-valid
if (t->is_client || s->fetching_send_message != NULL ||
s->flow_controlled_buffer.length != 0 ||
s->send_trailing_metadata == NULL ||
!is_default_initial_metadata(s->send_initial_metadata)) {
grpc_encode_header_options hopt = {
.stream_id = s->id,
.is_eof = false,
.use_true_binary_metadata =
t->settings
[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA] != 0,
.max_frame_size = t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor, NULL, 0,
s->send_initial_metadata, &hopt, &t->outbuf);
now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
} else {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
// When sending Trailers-Only, we need to move the :status and
// content-type headers to the trailers.
if (s->send_initial_metadata->idx.named.status != NULL) {
extra_headers_for_trailing_metadata
[num_extra_headers_for_trailing_metadata++] =
&s->send_initial_metadata->idx.named.status->md;
}
if (s->send_initial_metadata->idx.named.content_type != NULL) {
extra_headers_for_trailing_metadata
[num_extra_headers_for_trailing_metadata++] =
&s->send_initial_metadata->idx.named.content_type->md;
}
}
s->send_initial_metadata = NULL;
s->sent_initial_metadata = true;
sent_initial_metadata = true;
now_writing = true;
t->ping_state.pings_before_data_required =
t->ping_policy.max_pings_without_data;
if (!t->is_client) {
t->ping_recv_state.last_ping_recv_time =
gpr_inf_past(GPR_CLOCK_MONOTONIC);
t->ping_recv_state.ping_strikes = 0;
}
}
/* send any window updates */
if (s->announce_window > 0) {
@ -320,6 +362,7 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
if (s->send_trailing_metadata != NULL &&
s->fetching_send_message == NULL &&
s->flow_controlled_buffer.length == 0) {
GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "sending trailing_metadata"));
if (grpc_metadata_batch_is_empty(s->send_trailing_metadata)) {
grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, 0, true,
&s->stats.outgoing, &t->outbuf);
@ -337,6 +380,8 @@ grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
.stats = &s->stats.outgoing};
grpc_chttp2_encode_header(exec_ctx, &t->hpack_compressor,
extra_headers_for_trailing_metadata,
num_extra_headers_for_trailing_metadata,
s->send_trailing_metadata, &hopt,
&t->outbuf);
}

@ -929,33 +929,6 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
return algorithm;
}
static void recv_common_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
if (b->idx.named.grpc_status != NULL) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
grpc_error *error =
status_code == GRPC_STATUS_OK
? GRPC_ERROR_NONE
: grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error received from peer"),
GRPC_ERROR_INT_GRPC_STATUS,
(intptr_t)status_code);
if (b->idx.named.grpc_message != NULL) {
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
} else if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
}
}
static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
int is_trailing) {
if (b->list.count == 0) return;
@ -980,8 +953,6 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
recv_common_filter(exec_ctx, call, b);
if (b->idx.named.grpc_encoding != NULL) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
@ -989,7 +960,6 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
GPR_TIMER_END("incoming_compression_algorithm", 0);
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
}
if (b->idx.named.grpc_accept_encoding != NULL) {
GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
set_encodings_accepted_by_peer(exec_ctx, call,
@ -997,14 +967,33 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
publish_app_metadata(call, b, false);
}
static void recv_trailing_filter(grpc_exec_ctx *exec_ctx, void *args,
grpc_metadata_batch *b) {
grpc_call *call = args;
recv_common_filter(exec_ctx, call, b);
if (b->idx.named.grpc_status != NULL) {
uint32_t status_code = decode_status(b->idx.named.grpc_status->md);
grpc_error *error =
status_code == GRPC_STATUS_OK
? GRPC_ERROR_NONE
: grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Error received from peer"),
GRPC_ERROR_INT_GRPC_STATUS,
(intptr_t)status_code);
if (b->idx.named.grpc_message != NULL) {
error = grpc_error_set_str(
error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_message);
} else if (error != GRPC_ERROR_NONE) {
error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_empty_slice());
}
set_status_from_error(exec_ctx, call, STATUS_FROM_WIRE, error);
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_status);
}
publish_app_metadata(call, b, true);
}

@ -165,6 +165,10 @@ struct grpc_transport_stream_op_batch_payload {
uint32_t *recv_flags;
/** Should be enqueued when initial metadata is ready to be processed. */
grpc_closure *recv_initial_metadata_ready;
// If not NULL, will be set to true if trailing metadata is
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool *trailing_metadata_available;
} recv_initial_metadata;
struct {

@ -95,7 +95,8 @@ static void verify(grpc_exec_ctx *exec_ctx, size_t window_available, bool eof,
.max_frame_size = 16384,
.stats = &stats,
};
grpc_chttp2_encode_header(exec_ctx, &g_compressor, &b, &hopt, &output);
grpc_chttp2_encode_header(exec_ctx, &g_compressor, NULL, 0, &b, &hopt,
&output);
merged = grpc_slice_merge(output.slices, output.count);
grpc_slice_buffer_destroy_internal(exec_ctx, &output);
grpc_metadata_batch_destroy(exec_ctx, &b);
@ -213,7 +214,8 @@ static void verify_table_size_change_match_elem_size(grpc_exec_ctx *exec_ctx,
.use_true_binary_metadata = false,
.max_frame_size = 16384,
.stats = &stats};
grpc_chttp2_encode_header(exec_ctx, &g_compressor, &b, &hopt, &output);
grpc_chttp2_encode_header(exec_ctx, &g_compressor, NULL, 0, &b, &hopt,
&output);
grpc_slice_buffer_destroy_internal(exec_ctx, &output);
grpc_metadata_batch_destroy(exec_ctx, &b);

@ -82,7 +82,7 @@ static void BM_HpackEncoderEncodeHeader(benchmark::State &state) {
(size_t)state.range(1),
&stats,
};
grpc_chttp2_encode_header(&exec_ctx, &c, &b, &hopt, &outbuf);
grpc_chttp2_encode_header(&exec_ctx, &c, NULL, 0, &b, &hopt, &outbuf);
if (!logged_representative_output && state.iterations() > 3) {
logged_representative_output = true;
for (size_t i = 0; i < outbuf.count; i++) {

Loading…
Cancel
Save