[chttp2] Tarpit invalid requests (#34641)

If a request is invalid, take a random amount of time before sending the
RST_STREAM, so that MAX_CONCURRENT_STREAMS remaining becomes
unpredictable.
pull/34642/head
Craig Tiller 1 year ago committed by GitHub
parent 385583b28a
commit e527581a93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      bazel/experiments.bzl
  2. 38
      src/core/ext/filters/http/server/http_server_filter.cc
  3. 437
      src/core/ext/transport/chttp2/transport/chttp2_transport.cc
  4. 38
      src/core/ext/transport/chttp2/transport/internal.h
  5. 4
      src/core/ext/transport/chttp2/transport/parsing.cc
  6. 13
      src/core/lib/channel/promise_based_filter.cc
  7. 3
      src/core/lib/channel/promise_based_filter.h
  8. 12
      src/core/lib/experiments/experiments.cc
  9. 11
      src/core/lib/experiments/experiments.h
  10. 6
      src/core/lib/experiments/experiments.yaml
  11. 2
      src/core/lib/experiments/rollouts.yaml
  12. 12
      src/core/lib/transport/metadata_batch.h
  13. 6
      src/core/lib/transport/transport.h

@ -66,6 +66,7 @@ EXPERIMENTS = {
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
"tarpit",
],
"cpp_end2end_test": [
"chttp2_batch_requests",
@ -145,6 +146,7 @@ EXPERIMENTS = {
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
"tarpit",
],
"cpp_end2end_test": [
"chttp2_batch_requests",
@ -234,6 +236,7 @@ EXPERIMENTS = {
"on": {
"bad_client_test": [
"block_excessive_requests_before_settings_ack",
"tarpit",
],
"cpp_end2end_test": [
"chttp2_batch_requests",

@ -27,20 +27,23 @@
#include "absl/base/attributes.h"
#include "absl/meta/type_traits.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include <grpc/impl/channel_arg_names.h>
#include <grpc/status.h>
#include <grpc/support/log.h>
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/percent_encoding.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/surface/call_trace.h"
@ -59,6 +62,15 @@ void FilterOutgoingMetadata(ServerMetadata* md) {
PercentEncodingType::Compatible);
}
}
ServerMetadataHandle MalformedRequest(absl::string_view explanation) {
auto* arena = GetContext<Arena>();
auto hdl = arena->MakePooled<ServerMetadata>(arena);
hdl->Set(GrpcStatusMetadata(), GRPC_STATUS_UNKNOWN);
hdl->Set(GrpcMessageMetadata(), Slice::FromStaticString(explanation));
hdl->Set(GrpcTarPit(), Empty());
return hdl;
}
} // namespace
ArenaPromise<ServerMetadataHandle> HttpServerFilter::MakeCallPromise(
@ -77,42 +89,35 @@ ArenaPromise<ServerMetadataHandle> HttpServerFilter::MakeCallPromise(
ABSL_FALLTHROUGH_INTENDED;
case HttpMethodMetadata::kInvalid:
case HttpMethodMetadata::kGet:
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Bad method header")));
return Immediate(MalformedRequest("Bad method header"));
}
} else {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Missing :method header")));
return Immediate(MalformedRequest("Missing :method header"));
}
auto te = md->Take(TeMetadata());
if (te == TeMetadata::kTrailers) {
// Do nothing, ok.
} else if (!te.has_value()) {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Missing :te header")));
return Immediate(MalformedRequest("Missing :te header"));
} else {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Bad :te header")));
return Immediate(MalformedRequest("Bad :te header"));
}
auto scheme = md->Take(HttpSchemeMetadata());
if (scheme.has_value()) {
if (*scheme == HttpSchemeMetadata::kInvalid) {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Bad :scheme header")));
return Immediate(MalformedRequest("Bad :scheme header"));
}
} else {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Missing :scheme header")));
return Immediate(MalformedRequest("Missing :scheme header"));
}
md->Remove(ContentTypeMetadata());
Slice* path_slice = md->get_pointer(HttpPathMetadata());
if (path_slice == nullptr) {
return Immediate(
ServerMetadataFromStatus(absl::UnknownError("Missing :path header")));
return Immediate(MalformedRequest("Missing :path header"));
}
if (md->get_pointer(HttpAuthorityMetadata()) == nullptr) {
@ -123,8 +128,7 @@ ArenaPromise<ServerMetadataHandle> HttpServerFilter::MakeCallPromise(
}
if (md->get_pointer(HttpAuthorityMetadata()) == nullptr) {
return Immediate(ServerMetadataFromStatus(
absl::UnknownError("Missing :authority header")));
return Immediate(MalformedRequest("Missing :authority header"));
}
if (!surface_user_agent_) {

@ -37,6 +37,7 @@
#include "absl/container/flat_hash_map.h"
#include "absl/hash/hash.h"
#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/strings/cord.h"
#include "absl/strings/str_cat.h"
@ -133,6 +134,11 @@ static grpc_core::Duration g_default_server_keepalive_timeout =
static bool g_default_client_keepalive_permit_without_calls = false;
static bool g_default_server_keepalive_permit_without_calls = false;
// EXPERIMENTAL: control tarpitting in chttp2
#define GRPC_ARG_HTTP_ALLOW_TARPIT "grpc.http.tarpit"
#define GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS "grpc.http.tarpit_min_duration_ms"
#define GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS "grpc.http.tarpit_max_duration_ms"
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
@ -159,7 +165,7 @@ static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value);
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error);
grpc_error_handle error, bool tarpit);
// Start new streams that have been created if we can
static void maybe_start_some_streams(grpc_chttp2_transport* t);
@ -453,6 +459,19 @@ static void read_channel_args(grpc_chttp2_transport* t,
t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
t->allow_tarpit = channel_args.GetBool(GRPC_ARG_HTTP_ALLOW_TARPIT)
.value_or(grpc_core::IsTarpitEnabled());
t->min_tarpit_duration_ms =
channel_args
.GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS)
.value_or(grpc_core::Duration::Milliseconds(100))
.millis();
t->max_tarpit_duration_ms =
channel_args
.GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS)
.value_or(grpc_core::Duration::Seconds(1))
.millis();
const int soft_limit =
channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE).value_or(-1);
if (soft_limit < 0) {
@ -1126,13 +1145,13 @@ static void queue_setting_update(grpc_chttp2_transport* t,
// Cancel out streams that haven't yet started if we have received a GOAWAY
static void cancel_unstarted_streams(grpc_chttp2_transport* t,
grpc_error_handle error) {
grpc_error_handle error, bool tarpit) {
grpc_chttp2_stream* s;
while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
s->trailing_metadata_buffer.Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
grpc_chttp2_cancel_stream(t, s, error);
grpc_chttp2_cancel_stream(t, s, error, tarpit);
}
}
@ -1165,7 +1184,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
grpc_core::StatusToString(t->goaway_error).c_str());
}
if (t->is_client) {
cancel_unstarted_streams(t, t->goaway_error);
cancel_unstarted_streams(t, t->goaway_error, false);
// Cancel all unseen streams
std::vector<grpc_chttp2_stream*> to_cancel;
for (auto id_stream : t->stream_map) {
@ -1177,7 +1196,7 @@ void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
s->trailing_metadata_buffer.Set(
grpc_core::GrpcStreamNetworkState(),
grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error);
grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error, false);
}
}
absl::Status status = grpc_error_to_absl_status(t->goaway_error);
@ -1216,7 +1235,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
// maybe cancel out streams that haven't yet started if we have received a
// GOAWAY
if (!t->goaway_error.ok()) {
cancel_unstarted_streams(t, t->goaway_error);
cancel_unstarted_streams(t, t->goaway_error, false);
return;
}
// start streams where we have free grpc_chttp2_stream ids and free
@ -1258,7 +1277,8 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
t, s,
grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
GRPC_STATUS_UNAVAILABLE),
false);
}
}
}
@ -1403,7 +1423,8 @@ static void perform_stream_op_locked(void* stream_op,
}
if (op->cancel_stream) {
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
op_payload->cancel_stream.tarpit);
}
if (op->send_initial_metadata) {
@ -1441,7 +1462,8 @@ static void perform_stream_op_locked(void* stream_op,
GRPC_ERROR_CREATE_REFERENCING("Transport closed",
&t->closed_with_error, 1),
grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE));
GRPC_STATUS_UNAVAILABLE),
false);
}
} else {
GPR_ASSERT(s->id != 0);
@ -2102,8 +2124,8 @@ void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
}
}
static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
grpc_error_handle error) {
static grpc_chttp2_transport::RemovedStreamHandle remove_stream(
grpc_chttp2_transport* t, uint32_t id, grpc_error_handle error) {
grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
GPR_DEBUG_ASSERT(s);
if (t->incoming_stream == s) {
@ -2126,29 +2148,74 @@ static void remove_stream(grpc_chttp2_transport* t, uint32_t id,
grpc_chttp2_list_remove_stalled_by_transport(t, s);
maybe_start_some_streams(t);
if (t->is_client) return grpc_chttp2_transport::RemovedStreamHandle();
return grpc_chttp2_transport::RemovedStreamHandle(t->Ref());
}
namespace grpc_core {
namespace {
Duration TarpitDuration(grpc_chttp2_transport* t) {
return Duration::Milliseconds(absl::LogUniform<int>(
absl::BitGen(), t->min_tarpit_duration_ms, t->max_tarpit_duration_ms));
}
template <typename F>
void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
if (!tarpit || !t->allow_tarpit || t->is_client) {
fn(t);
return;
}
const auto duration = TarpitDuration(t);
t->event_engine->RunAfter(
duration, [t = t->Ref(), fn = std::move(fn)]() mutable {
ApplicationCallbackExecCtx app_exec_ctx;
ExecCtx exec_ctx;
t->combiner->Run(
NewClosure([t, fn = std::move(fn)](grpc_error_handle) mutable {
// TODO(ctiller): this can result in not sending RST_STREAMS if a
// request gets tarpit behind a transport close.
if (!t->closed_with_error.ok()) return;
fn(t.get());
}),
absl::OkStatus());
});
}
} // namespace
} // namespace grpc_core
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error) {
grpc_error_handle due_to_error, bool tarpit) {
if (!t->is_client && !s->sent_trailing_metadata &&
grpc_error_has_clear_grpc_status(due_to_error)) {
close_from_api(t, s, due_to_error);
grpc_error_has_clear_grpc_status(due_to_error) &&
!(s->read_closed && s->write_closed)) {
close_from_api(t, s, due_to_error, tarpit);
return;
}
if (!due_to_error.ok() && !s->seen_error) {
s->seen_error = true;
}
if (!s->read_closed || !s->write_closed) {
if (s->id != 0) {
grpc_http2_error_code http_error;
grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
&http_error, nullptr);
grpc_chttp2_add_rst_stream_to_next_write(
t, s->id, static_cast<uint32_t>(http_error), &s->stats.outgoing);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
grpc_core::MaybeTarpit(
t, tarpit,
[id = s->id, http_error,
remove_stream_handle = grpc_chttp2_mark_stream_closed(
t, s, 1, 1, due_to_error)](grpc_chttp2_transport* t) {
grpc_chttp2_add_rst_stream_to_next_write(
t, id, static_cast<uint32_t>(http_error), nullptr);
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
});
return;
}
}
if (!due_to_error.ok() && !s->seen_error) {
s->seen_error = true;
}
grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
}
@ -2241,9 +2308,10 @@ void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
flush_write_list(t, s, &s->on_flow_controlled_cbs, error);
}
void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error) {
grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error) {
grpc_chttp2_transport::RemovedStreamHandle rsh;
if (grpc_http_trace.enabled()) {
gpr_log(
GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id,
@ -2259,7 +2327,7 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_fake_status(t, s, overall_error);
}
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
return;
return rsh;
}
bool closed_read = false;
bool became_closed = false;
@ -2277,7 +2345,7 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
became_closed = true;
grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
if (s->id != 0) {
remove_stream(t, s->id, overall_error);
rsh = remove_stream(t, s->id, overall_error);
} else {
// Purge streams waiting on concurrency still waiting for id assignment
grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
@ -2301,17 +2369,11 @@ void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
}
return rsh;
}
static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error) {
grpc_slice hdr;
grpc_slice status_hdr;
grpc_slice http_status_hdr;
grpc_slice content_type_hdr;
grpc_slice message_pfx;
uint8_t* p;
uint32_t len = 0;
grpc_error_handle error, bool tarpit) {
grpc_status_code grpc_status;
std::string message;
grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
@ -2319,147 +2381,167 @@ static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
// Hand roll a header block.
// This is unnecessarily ugly - at some point we should find a more
// elegant solution.
// It's complicated by the fact that our send machinery would be dead by
// the time we got around to sending this, so instead we ignore HPACK
// compression and just write the uncompressed bytes onto the wire.
if (!s->sent_initial_metadata) {
http_status_hdr = GRPC_SLICE_MALLOC(13);
p = GRPC_SLICE_START_PTR(http_status_hdr);
*p++ = 0x00;
*p++ = 7;
*p++ = ':';
*p++ = 's';
*p++ = 't';
*p++ = 'a';
*p++ = 't';
*p++ = 'u';
*p++ = 's';
*p++ = 3;
*p++ = '2';
*p++ = '0';
*p++ = '0';
GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
content_type_hdr = GRPC_SLICE_MALLOC(31);
p = GRPC_SLICE_START_PTR(content_type_hdr);
*p++ = 0x00;
*p++ = 12;
*p++ = 'c';
*p++ = 'o';
*p++ = 'n';
*p++ = 't';
*p++ = 'e';
*p++ = 'n';
*p++ = 't';
*p++ = '-';
*p++ = 't';
*p++ = 'y';
*p++ = 'p';
*p++ = 'e';
*p++ = 16;
*p++ = 'a';
*p++ = 'p';
*p++ = 'p';
*p++ = 'l';
*p++ = 'i';
*p++ = 'c';
*p++ = 'a';
*p++ = 't';
*p++ = 'i';
*p++ = 'o';
*p++ = 'n';
*p++ = '/';
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
}
status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
p = GRPC_SLICE_START_PTR(status_hdr);
*p++ = 0x00; // literal header, not indexed
*p++ = 11; // len(grpc-status)
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
*p++ = 's';
*p++ = 't';
*p++ = 'a';
*p++ = 't';
*p++ = 'u';
*p++ = 's';
if (grpc_status < 10) {
*p++ = 1;
*p++ = static_cast<uint8_t>('0' + grpc_status);
} else {
*p++ = 2;
*p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
*p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
}
GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
size_t msg_len = message.length();
GPR_ASSERT(msg_len <= UINT32_MAX);
grpc_core::VarintWriter<1> msg_len_writer(static_cast<uint32_t>(msg_len));
message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x00; // literal header, not indexed
*p++ = 12; // len(grpc-message)
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
*p++ = 'm';
*p++ = 'e';
*p++ = 's';
*p++ = 's';
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
msg_len_writer.Write(0, p);
p += msg_len_writer.length();
GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
len += static_cast<uint32_t>(msg_len);
hdr = GRPC_SLICE_MALLOC(9);
p = GRPC_SLICE_START_PTR(hdr);
*p++ = static_cast<uint8_t>(len >> 16);
*p++ = static_cast<uint8_t>(len >> 8);
*p++ = static_cast<uint8_t>(len);
*p++ = GRPC_CHTTP2_FRAME_HEADER;
*p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
*p++ = static_cast<uint8_t>(s->id >> 24);
*p++ = static_cast<uint8_t>(s->id >> 16);
*p++ = static_cast<uint8_t>(s->id >> 8);
*p++ = static_cast<uint8_t>(s->id);
GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
grpc_slice_buffer_add(&t->qbuf, hdr);
if (!s->sent_initial_metadata) {
grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
}
grpc_slice_buffer_add(&t->qbuf, status_hdr);
grpc_slice_buffer_add(&t->qbuf, message_pfx);
grpc_slice_buffer_add(&t->qbuf,
grpc_slice_from_cpp_string(std::move(message)));
grpc_chttp2_reset_ping_clock(t);
grpc_chttp2_add_rst_stream_to_next_write(t, s->id, GRPC_HTTP2_NO_ERROR,
&s->stats.outgoing);
grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
auto remove_stream_handle = grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
grpc_core::MaybeTarpit(
t, tarpit,
[error = std::move(error),
sent_initial_metadata = s->sent_initial_metadata, id = s->id,
grpc_status, message = std::move(message),
remove_stream_handle =
std::move(remove_stream_handle)](grpc_chttp2_transport* t) mutable {
grpc_slice hdr;
grpc_slice status_hdr;
grpc_slice http_status_hdr;
grpc_slice content_type_hdr;
grpc_slice message_pfx;
uint8_t* p;
uint32_t len = 0;
// Hand roll a header block.
// This is unnecessarily ugly - at some point we should find a more
// elegant solution.
// It's complicated by the fact that our send machinery would be dead
// by the time we got around to sending this, so instead we ignore
// HPACK compression and just write the uncompressed bytes onto the
// wire.
if (!sent_initial_metadata) {
http_status_hdr = GRPC_SLICE_MALLOC(13);
p = GRPC_SLICE_START_PTR(http_status_hdr);
*p++ = 0x00;
*p++ = 7;
*p++ = ':';
*p++ = 's';
*p++ = 't';
*p++ = 'a';
*p++ = 't';
*p++ = 'u';
*p++ = 's';
*p++ = 3;
*p++ = '2';
*p++ = '0';
*p++ = '0';
GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
content_type_hdr = GRPC_SLICE_MALLOC(31);
p = GRPC_SLICE_START_PTR(content_type_hdr);
*p++ = 0x00;
*p++ = 12;
*p++ = 'c';
*p++ = 'o';
*p++ = 'n';
*p++ = 't';
*p++ = 'e';
*p++ = 'n';
*p++ = 't';
*p++ = '-';
*p++ = 't';
*p++ = 'y';
*p++ = 'p';
*p++ = 'e';
*p++ = 16;
*p++ = 'a';
*p++ = 'p';
*p++ = 'p';
*p++ = 'l';
*p++ = 'i';
*p++ = 'c';
*p++ = 'a';
*p++ = 't';
*p++ = 'i';
*p++ = 'o';
*p++ = 'n';
*p++ = '/';
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
}
status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
p = GRPC_SLICE_START_PTR(status_hdr);
*p++ = 0x00; // literal header, not indexed
*p++ = 11; // len(grpc-status)
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
*p++ = 's';
*p++ = 't';
*p++ = 'a';
*p++ = 't';
*p++ = 'u';
*p++ = 's';
if (grpc_status < 10) {
*p++ = 1;
*p++ = static_cast<uint8_t>('0' + grpc_status);
} else {
*p++ = 2;
*p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
*p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
}
GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
size_t msg_len = message.length();
GPR_ASSERT(msg_len <= UINT32_MAX);
grpc_core::VarintWriter<1> msg_len_writer(
static_cast<uint32_t>(msg_len));
message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
p = GRPC_SLICE_START_PTR(message_pfx);
*p++ = 0x00; // literal header, not indexed
*p++ = 12; // len(grpc-message)
*p++ = 'g';
*p++ = 'r';
*p++ = 'p';
*p++ = 'c';
*p++ = '-';
*p++ = 'm';
*p++ = 'e';
*p++ = 's';
*p++ = 's';
*p++ = 'a';
*p++ = 'g';
*p++ = 'e';
msg_len_writer.Write(0, p);
p += msg_len_writer.length();
GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
len += static_cast<uint32_t>(msg_len);
hdr = GRPC_SLICE_MALLOC(9);
p = GRPC_SLICE_START_PTR(hdr);
*p++ = static_cast<uint8_t>(len >> 16);
*p++ = static_cast<uint8_t>(len >> 8);
*p++ = static_cast<uint8_t>(len);
*p++ = GRPC_CHTTP2_FRAME_HEADER;
*p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM |
GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
*p++ = static_cast<uint8_t>(id >> 24);
*p++ = static_cast<uint8_t>(id >> 16);
*p++ = static_cast<uint8_t>(id >> 8);
*p++ = static_cast<uint8_t>(id);
GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
grpc_slice_buffer_add(&t->qbuf, hdr);
if (!sent_initial_metadata) {
grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
}
grpc_slice_buffer_add(&t->qbuf, status_hdr);
grpc_slice_buffer_add(&t->qbuf, message_pfx);
grpc_slice_buffer_add(&t->qbuf,
grpc_slice_from_cpp_string(std::move(message)));
grpc_chttp2_reset_ping_clock(t);
grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_NO_ERROR,
nullptr);
grpc_chttp2_initiate_write(t,
GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
});
}
static void end_all_the_calls(grpc_chttp2_transport* t,
@ -2472,13 +2554,13 @@ static void end_all_the_calls(grpc_chttp2_transport* t,
error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
GRPC_STATUS_UNAVAILABLE);
}
cancel_unstarted_streams(t, error);
cancel_unstarted_streams(t, error, false);
std::vector<grpc_chttp2_stream*> to_cancel;
for (auto id_stream : t->stream_map) {
to_cancel.push_back(id_stream.second);
}
for (auto s : to_cancel) {
grpc_chttp2_cancel_stream(t, s, error);
grpc_chttp2_cancel_stream(t, s, error, false);
}
}
@ -3029,7 +3111,8 @@ static void destructive_reclaimer_locked(
t.get(), s,
grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
grpc_core::StatusIntProperty::kHttp2Error,
GRPC_HTTP2_ENHANCE_YOUR_CALM));
GRPC_HTTP2_ENHANCE_YOUR_CALM),
false);
if (!t->stream_map.empty()) {
// Since we cancel one stream per destructive reclamation, if
// there are more streams left, we can immediately post a new

@ -25,6 +25,7 @@
#include <stdint.h>
#include <memory>
#include <utility>
#include "absl/container/flat_hash_map.h"
#include "absl/meta/type_traits.h"
@ -274,6 +275,31 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// maps stream id to grpc_chttp2_stream objects
absl::flat_hash_map<uint32_t, grpc_chttp2_stream*> stream_map;
// Count of streams that should be counted against max concurrent streams but
// are not in stream_map (due to tarpitting).
size_t extra_streams = 0;
class RemovedStreamHandle {
public:
RemovedStreamHandle() = default;
explicit RemovedStreamHandle(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t)
: transport_(std::move(t)) {
++transport_->extra_streams;
}
~RemovedStreamHandle() {
if (transport_ != nullptr) {
--transport_->extra_streams;
}
}
RemovedStreamHandle(const RemovedStreamHandle&) = delete;
RemovedStreamHandle& operator=(const RemovedStreamHandle&) = delete;
RemovedStreamHandle(RemovedStreamHandle&&) = default;
RemovedStreamHandle& operator=(RemovedStreamHandle&&) = default;
private:
grpc_core::RefCountedPtr<grpc_chttp2_transport> transport_;
};
grpc_closure write_action_begin_locked;
grpc_closure write_action;
@ -378,6 +404,10 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
uint32_t expect_continuation_stream_id = 0;
uint32_t incoming_frame_size = 0;
int min_tarpit_duration_ms;
int max_tarpit_duration_ms;
bool allow_tarpit;
grpc_chttp2_stream* incoming_stream = nullptr;
// active parser
struct Parser {
@ -755,9 +785,9 @@ void grpc_chttp2_settings_timeout(
void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
grpc_chttp2_stream* stream,
grpc_error_handle error);
void grpc_chttp2_mark_stream_closed(grpc_chttp2_transport* t,
grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error);
grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
int close_writes, grpc_error_handle error);
void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
#ifndef NDEBUG
@ -793,7 +823,7 @@ void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);
void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle due_to_error);
grpc_error_handle due_to_error, bool tarpit);
void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
grpc_chttp2_stream* s);

@ -645,7 +645,7 @@ static grpc_error_handle init_header_frame_parser(grpc_chttp2_transport* t,
t->incoming_stream_id));
return init_header_skip_frame_parser(t, priority_type, is_eoh);
} else if (GPR_UNLIKELY(
t->stream_map.size() >=
t->stream_map.size() + t->extra_streams >=
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS])) {
return GRPC_ERROR_CREATE("Max stream count exceeded");
@ -880,7 +880,7 @@ static grpc_error_handle parse_frame_slice(grpc_chttp2_transport* t,
&unused)) {
grpc_chttp2_parsing_become_skip_parser(t);
if (s) {
grpc_chttp2_cancel_stream(t, s, err);
grpc_chttp2_cancel_stream(t, s, err, true);
}
return absl::OkStatus();
}

@ -2047,7 +2047,8 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
!batch->recv_initial_metadata && !batch->recv_message &&
!batch->recv_trailing_metadata);
PollContext poll_ctx(this, &flusher);
Completed(batch->payload->cancel_stream.cancel_error, &flusher);
Completed(batch->payload->cancel_stream.cancel_error,
batch->payload->cancel_stream.tarpit, &flusher);
if (is_last()) {
batch.CompleteWith(&flusher);
} else {
@ -2166,7 +2167,8 @@ void ServerCallData::StartBatch(grpc_transport_stream_op_batch* b) {
}
// Handle cancellation.
void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) {
void ServerCallData::Completed(grpc_error_handle error,
bool tarpit_cancellation, Flusher* flusher) {
if (grpc_trace_channel.enabled()) {
gpr_log(
GPR_DEBUG,
@ -2196,6 +2198,7 @@ void ServerCallData::Completed(grpc_error_handle error, Flusher* flusher) {
}));
batch->cancel_stream = true;
batch->payload->cancel_stream.cancel_error = error;
batch->payload->cancel_stream.tarpit = tarpit_cancellation;
flusher->Resume(batch);
}
break;
@ -2331,7 +2334,8 @@ void ServerCallData::RecvTrailingMetadataReady(grpc_error_handle error) {
}
Flusher flusher(this);
PollContext poll_ctx(this, &flusher);
Completed(error, &flusher);
Completed(error, recv_trailing_metadata_->get(GrpcTarPit()).has_value(),
&flusher);
flusher.AddClosure(original_recv_trailing_metadata_ready_, std::move(error),
"continue recv trailing");
}
@ -2551,7 +2555,8 @@ void ServerCallData::WakeInsideCombiner(Flusher* flusher) {
break;
case SendTrailingState::kInitial: {
GPR_ASSERT(*md->get_pointer(GrpcStatusMetadata()) != GRPC_STATUS_OK);
Completed(StatusFromMetadata(*md), flusher);
Completed(StatusFromMetadata(*md), md->get(GrpcTarPit()).has_value(),
flusher);
} break;
case SendTrailingState::kCancelled:
// Nothing to do.

@ -739,7 +739,8 @@ class ServerCallData : public BaseCallData {
struct SendInitialMetadata;
// Shut things down when the call completes.
void Completed(grpc_error_handle error, Flusher* flusher);
void Completed(grpc_error_handle error, bool tarpit_cancellation,
Flusher* flusher);
// Construct a promise that will "call" the next filter.
// Effectively:
// - put the modified initial metadata into the batch being sent up.

@ -130,6 +130,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_tarpit =
"If set, tarpit invalid requests for some amount of time";
const char* const additional_constraints_tarpit = "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
@ -227,6 +230,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true},
{"tarpit", description_tarpit, additional_constraints_tarpit, true, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache",
@ -356,6 +360,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_tarpit =
"If set, tarpit invalid requests for some amount of time";
const char* const additional_constraints_tarpit = "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
@ -453,6 +460,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true},
{"tarpit", description_tarpit, additional_constraints_tarpit, true, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache",
@ -582,6 +590,9 @@ const char* const description_call_status_override_on_cancellation =
"with cancellation.";
const char* const additional_constraints_call_status_override_on_cancellation =
"{}";
const char* const description_tarpit =
"If set, tarpit invalid requests for some amount of time";
const char* const additional_constraints_tarpit = "{}";
const char* const description_settings_timeout =
"If set, use the settings timeout to send settings frame to the peer.";
const char* const additional_constraints_settings_timeout = "{}";
@ -679,6 +690,7 @@ const ExperimentMetadata g_experiment_metadata[] = {
description_call_status_override_on_cancellation,
additional_constraints_call_status_override_on_cancellation,
kDefaultForDebugOnly, true},
{"tarpit", description_tarpit, additional_constraints_tarpit, true, true},
{"settings_timeout", description_settings_timeout,
additional_constraints_settings_timeout, true, true},
{"work_serializer_clears_time_cache",

@ -103,6 +103,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true;
#endif
}
#define GRPC_EXPERIMENT_IS_INCLUDED_TARPIT
inline bool IsTarpitEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
@ -163,6 +165,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true;
#endif
}
#define GRPC_EXPERIMENT_IS_INCLUDED_TARPIT
inline bool IsTarpitEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
@ -223,6 +227,8 @@ inline bool IsCallStatusOverrideOnCancellationEnabled() {
return true;
#endif
}
#define GRPC_EXPERIMENT_IS_INCLUDED_TARPIT
inline bool IsTarpitEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() { return true; }
#define GRPC_EXPERIMENT_IS_INCLUDED_WORK_SERIALIZER_CLEARS_TIME_CACHE
@ -268,6 +274,7 @@ enum ExperimentIds {
kExperimentIdMultiping,
kExperimentIdRegisteredMethodLookupInTransport,
kExperimentIdCallStatusOverrideOnCancellation,
kExperimentIdTarpit,
kExperimentIdSettingsTimeout,
kExperimentIdWorkSerializerClearsTimeCache,
kExperimentIdChttp2BatchRequests,
@ -392,6 +399,10 @@ inline bool IsRegisteredMethodLookupInTransportEnabled() {
inline bool IsCallStatusOverrideOnCancellationEnabled() {
return IsExperimentEnabled(kExperimentIdCallStatusOverrideOnCancellation);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_TARPIT
inline bool IsTarpitEnabled() {
return IsExperimentEnabled(kExperimentIdTarpit);
}
#define GRPC_EXPERIMENT_IS_INCLUDED_SETTINGS_TIMEOUT
inline bool IsSettingsTimeoutEnabled() {
return IsExperimentEnabled(kExperimentIdSettingsTimeout);

@ -222,6 +222,12 @@
expiry: 2024/01/01
owner: vigneshbabu@google.com
test_tags: []
- name: tarpit
description:
If set, tarpit invalid requests for some amount of time
expiry: 2024/03/03
owner: ctiller@google.com
test_tags: [bad_client_test]
- name: settings_timeout
description:
If set, use the settings timeout to send settings frame to the peer.

@ -106,6 +106,8 @@
default: debug
- name: work_serializer_clears_time_cache
default: true
- name: tarpit
default: true
- name: settings_timeout
default: true
- name: chttp2_batch_requests

@ -45,6 +45,7 @@
#include "src/core/lib/gprpp/packed_table.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/gprpp/type_list.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/transport/custom_metadata.h"
@ -519,6 +520,15 @@ struct GrpcRegisteredMethod {
static std::string DisplayValue(void* x);
};
// Annotation added by filters to inform the transport to tarpit this
// response: add some random delay to thwart certain kinds of attacks.
struct GrpcTarPit {
static absl::string_view DebugKey() { return "GrpcTarPit"; }
static constexpr bool kRepeatable = false;
using ValueType = Empty;
static absl::string_view DisplayValue(Empty) { return "tarpit"; }
};
namespace metadata_detail {
// Build a key/value formatted debug string.
@ -1496,7 +1506,7 @@ using grpc_metadata_batch_base = grpc_core::MetadataMap<
grpc_core::GrpcStreamNetworkState, grpc_core::PeerString,
grpc_core::GrpcStatusContext, grpc_core::GrpcStatusFromWire,
grpc_core::GrpcCallWasCancelled, grpc_core::WaitForReady,
grpc_core::GrpcTrailersOnly,
grpc_core::GrpcTrailersOnly, grpc_core::GrpcTarPit,
grpc_core::GrpcRegisteredMethod GRPC_CUSTOM_CLIENT_METADATA
GRPC_CUSTOM_SERVER_METADATA>;

@ -495,6 +495,12 @@ struct grpc_transport_stream_op_batch_payload {
// Error contract: the transport that gets this op must cause cancel_error
// to be unref'ed after processing it
grpc_error_handle cancel_error;
// If true the transport should endeavor to delay sending the cancellation
// notification for some small amount of time, in order to foil certain
// exploits.
// This should be set for cancellations that result from malformed client
// initial metadata.
bool tarpit = false;
} cancel_stream;
// Indexes correspond to grpc_context_index enum values

Loading…
Cancel
Save