Merge pull request #23806 from yashykt/donotcancel

Do not cancel RPC if send metadata size if larger than peer's limit
pull/23834/head
Yash Tibrewal 4 years ago committed by GitHub
commit e49a1da86f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 144
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  2. 76
      test/core/bad_client/tests/large_metadata.cc
  3. 132
      test/core/end2end/tests/large_metadata.cc

@ -1412,64 +1412,44 @@ static void perform_stream_op_locked(void* stream_op,
s->send_initial_metadata_finished = add_closure_barrier(on_complete);
s->send_initial_metadata =
op_payload->send_initial_metadata.send_initial_metadata;
const size_t metadata_size =
grpc_metadata_batch_size(s->send_initial_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (t->is_client) {
s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
}
if (metadata_size > metadata_peer_limit) {
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"to-be-sent initial metadata size "
"exceeds peer limit"),
GRPC_ERROR_INT_SIZE,
static_cast<intptr_t>(metadata_size)),
GRPC_ERROR_INT_LIMIT,
static_cast<intptr_t>(metadata_peer_limit)),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
if (t->is_client) {
if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(t);
} else {
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
if (contains_non_ok_status(s->send_initial_metadata)) {
s->seen_error = true;
}
if (!s->write_closed) {
if (t->is_client) {
if (t->closed_with_error == GRPC_ERROR_NONE) {
GPR_ASSERT(s->id == 0);
grpc_chttp2_list_add_waiting_for_concurrency(t, s);
maybe_start_some_streams(t);
} else {
GPR_ASSERT(s->id != 0);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
(op->payload->send_message.send_message->flags() &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Transport closed", &t->closed_with_error, 1),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
}
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
"send_initial_metadata_finished");
GPR_ASSERT(s->id != 0);
grpc_chttp2_mark_stream_writable(t, s);
if (!(op->send_message &&
(op->payload->send_message.send_message->flags() &
GRPC_WRITE_BUFFER_HINT))) {
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
}
}
} else {
s->send_initial_metadata = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_initial_metadata_finished,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Attempt to send initial metadata after stream was closed",
&s->write_closed_error, 1),
"send_initial_metadata_finished");
}
if (op_payload->send_initial_metadata.peer_string != nullptr) {
gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
@ -1531,47 +1511,27 @@ static void perform_stream_op_locked(void* stream_op,
op_payload->send_trailing_metadata.send_trailing_metadata;
s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
s->write_buffering = false;
const size_t metadata_size =
grpc_metadata_batch_size(s->send_trailing_metadata);
const size_t metadata_peer_limit =
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
if (metadata_size > metadata_peer_limit) {
grpc_chttp2_cancel_stream(
t, s,
grpc_error_set_int(
grpc_error_set_int(
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"to-be-sent trailing metadata size "
"exceeds peer limit"),
GRPC_ERROR_INT_SIZE,
static_cast<intptr_t>(metadata_size)),
GRPC_ERROR_INT_LIMIT,
static_cast<intptr_t>(metadata_peer_limit)),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
} else {
if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_trailing_metadata_finished,
grpc_metadata_batch_is_empty(
op->payload->send_trailing_metadata.send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to send trailing metadata after "
"stream was closed"),
"send_trailing_metadata_finished");
} else if (s->id != 0) {
// TODO(ctiller): check if there's flow control for any outstanding
// bytes before going writable
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
if (contains_non_ok_status(s->send_trailing_metadata)) {
s->seen_error = true;
}
if (s->write_closed) {
s->send_trailing_metadata = nullptr;
s->sent_trailing_metadata_op = nullptr;
grpc_chttp2_complete_closure_step(
t, s, &s->send_trailing_metadata_finished,
grpc_metadata_batch_is_empty(
op->payload->send_trailing_metadata.send_trailing_metadata)
? GRPC_ERROR_NONE
: GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to send trailing metadata after "
"stream was closed"),
"send_trailing_metadata_finished");
} else if (s->id != 0) {
// TODO(ctiller): check if there's flow control for any outstanding
// bytes before going writable
grpc_chttp2_mark_stream_writable(t, s);
grpc_chttp2_initiate_write(
t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
}
}

@ -69,78 +69,6 @@
((sizeof(PFX_TOO_MUCH_METADATA_FROM_CLIENT_REQUEST) - 1) + \
(NUM_HEADERS * PFX_TOO_MUCH_METADATA_FROM_CLIENT_HEADER_SIZE) + 1)
#define PFX_TOO_MUCH_METADATA_FROM_SERVER_STR \
"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" /* settings frame: sets \
MAX_HEADER_LIST_SIZE to 8K */ \
"\x00\x00\x06\x04\x00\x00\x00\x00\x00\x00\x06\x00\x00\x20\x00" /* headers: \
generated \
from \
simple_request.headers \
in this \
directory \
*/ \
"\x00\x00\x00\x04\x01\x00\x00\x00\x00" \
"\x00\x00\xc9\x01\x04\x00\x00\x00\x01" \
"\x10\x05:path\x08/foo/bar" \
"\x10\x07:scheme\x04http" \
"\x10\x07:method\x04POST" \
"\x10\x0a:authority\x09localhost" \
"\x10\x0c" \
"content-type\x10" \
"application/grpc" \
"\x10\x14grpc-accept-encoding\x15" \
"deflate,identity,gzip" \
"\x10\x02te\x08trailers" \
"\x10\x0auser-agent\"bad-client grpc-c/0.12.0.0 (linux)"
static void* tag(intptr_t t) { return (void*)t; }
static void server_verifier_sends_too_much_metadata(
grpc_server* server, grpc_completion_queue* cq,
void* /*registered_method*/) {
grpc_call_error error;
grpc_call* s;
grpc_call_details call_details;
cq_verifier* cqv = cq_verifier_create(cq);
grpc_metadata_array request_metadata_recv;
grpc_call_details_init(&call_details);
grpc_metadata_array_init(&request_metadata_recv);
error = grpc_server_request_call(server, &s, &call_details,
&request_metadata_recv, cq, cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.host, "localhost"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo/bar"));
const size_t metadata_value_size = 8 * 1024;
grpc_metadata meta;
meta.key = grpc_slice_from_static_string("key");
meta.value = grpc_slice_malloc(metadata_value_size);
memset(GRPC_SLICE_START_PTR(meta.value), 'a', metadata_value_size);
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.data.send_initial_metadata.count = 1;
op.data.send_initial_metadata.metadata = &meta;
op.flags = 0;
op.reserved = nullptr;
error = grpc_call_start_batch(s, &op, 1, tag(102), nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 0); // Operation fails.
cq_verify(cqv);
grpc_slice_unref(meta.value);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
}
int main(int argc, char** argv) {
int i;
grpc_init();
@ -166,10 +94,6 @@ int main(int argc, char** argv) {
grpc_run_bad_client_test(server_verifier_request_call, args, 2, 0);
// Test sending more metadata than the client will accept.
GRPC_RUN_BAD_CLIENT_TEST(server_verifier_sends_too_much_metadata,
rst_stream_client_validator,
PFX_TOO_MUCH_METADATA_FROM_SERVER_STR, 0);
grpc_shutdown();
return 0;
}

@ -244,8 +244,140 @@ static void test_request_with_large_metadata(grpc_end2end_test_config config) {
config.tear_down_data(&f);
}
// Server responds with metadata larger than what the client accepts.
static void test_request_with_bad_large_metadata_response(
grpc_end2end_test_config config) {
grpc_call* c;
grpc_call* s;
grpc_metadata meta;
const size_t large_size = 64 * 1024;
grpc_arg arg;
arg.type = GRPC_ARG_INTEGER;
arg.key = const_cast<char*>(GRPC_ARG_MAX_METADATA_SIZE);
arg.value.integer = 1024;
grpc_channel_args args = {1, &arg};
grpc_end2end_test_fixture f = begin_test(
config, "test_request_with_bad_large_metadata_response", &args, &args);
cq_verifier* cqv = cq_verifier_create(f.cq);
grpc_op ops[6];
grpc_op* op;
grpc_metadata_array initial_metadata_recv;
grpc_metadata_array trailing_metadata_recv;
grpc_metadata_array request_metadata_recv;
grpc_call_details call_details;
grpc_status_code status;
grpc_call_error error;
grpc_slice details;
int was_cancelled = 2;
gpr_timespec deadline = five_seconds_from_now();
c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
grpc_slice_from_static_string("/foo"), nullptr,
deadline, nullptr);
GPR_ASSERT(c);
meta.key = grpc_slice_from_static_string("key");
meta.value = grpc_slice_malloc(large_size);
memset(GRPC_SLICE_START_PTR(meta.value), 'a', large_size);
grpc_metadata_array_init(&initial_metadata_recv);
grpc_metadata_array_init(&trailing_metadata_recv);
grpc_metadata_array_init(&request_metadata_recv);
grpc_call_details_init(&call_details);
memset(ops, 0, sizeof(ops));
// Client: send request.
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
op->data.recv_status_on_client.status = &status;
op->data.recv_status_on_client.status_details = &details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
error =
grpc_server_request_call(f.server, &s, &call_details,
&request_metadata_recv, f.cq, f.cq, tag(101));
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
cq_verify(cqv);
memset(ops, 0, sizeof(ops));
// Server: send large initial metadata
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 1;
op->data.send_initial_metadata.metadata = &meta;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
op->data.recv_close_on_server.cancelled = &was_cancelled;
op->flags = 0;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
op->data.send_status_from_server.trailing_metadata_count = 0;
op->data.send_status_from_server.status = GRPC_STATUS_OK;
grpc_slice status_details = grpc_slice_from_static_string("xyz");
op->data.send_status_from_server.status_details = &status_details;
op->flags = 0;
op->reserved = nullptr;
op++;
error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
nullptr);
GPR_ASSERT(GRPC_CALL_OK == error);
CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
cq_verify(cqv);
GPR_ASSERT(status == GRPC_STATUS_RESOURCE_EXHAUSTED);
GPR_ASSERT(0 == grpc_slice_str_cmp(
details, "received initial metadata size exceeds limit"));
GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
grpc_slice_unref(details);
grpc_metadata_array_destroy(&initial_metadata_recv);
grpc_metadata_array_destroy(&trailing_metadata_recv);
grpc_metadata_array_destroy(&request_metadata_recv);
grpc_call_details_destroy(&call_details);
grpc_call_unref(c);
grpc_call_unref(s);
cq_verifier_destroy(cqv);
grpc_slice_unref(meta.value);
end_test(&f);
config.tear_down_data(&f);
}
void large_metadata(grpc_end2end_test_config config) {
test_request_with_large_metadata(config);
// TODO(yashykt): Maybe add checks for metadata size in inproc transport too.
if (strcmp(config.name, "inproc") != 0) {
test_request_with_bad_large_metadata_response(config);
}
}
void large_metadata_pre_init(void) {}

Loading…
Cancel
Save