[core] Add a channel argument to set DSCP on streams (#28322)

This adds a new channel argument `GRPC_ARG_DSCP` which allows users to
create classified gRPC streams with a
Differentiated Services Code Point (DSCP) marking on the IP frames.

The channel argument is handled on both clients and servers, but
currently only on posix based systems.

Fixes #17225

**Background**:
In addition to what is already described is #17225, when gRPC is used in
telco systems there is often a need to classify streams of importance.
There can be multiple hops between two endpoints (e.g. between 2 telecom
operators) and some streams that are more important than others (e.g.
emergency call related or similar). By marking the IP packets using DSCP
the aware routers can make a sound decision of the prioritization.

This PR propose to use DSCP as the configuration value since its common
for both IPv4/IPv6, an alternative would be to use a config name that
includes TOS and Traffic Class.
There might be more needed regarding documentation and end2end testing,
but there I need some advice.

**References**
https://datatracker.ietf.org/doc/html/rfc2474
https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml

<!--

Your pull request will be routed to the following person by default for
triaging.
If you know who should review your pull request, please remove the
mentioning below.

-->

@yashykt
pull/33588/head
Bjorn Svensson 1 year ago committed by GitHub
parent 189acd82dc
commit ac874c2c83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      doc/qos-dscp.md
  2. 3
      include/grpc/impl/grpc_types.h
  3. 40
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.cc
  4. 6
      src/core/lib/event_engine/posix_engine/tcp_socket_utils.h
  5. 29
      src/core/lib/iomgr/socket_utils_common_posix.cc
  6. 2
      src/core/lib/iomgr/socket_utils_posix.cc
  7. 6
      src/core/lib/iomgr/socket_utils_posix.h
  8. 2
      src/core/lib/iomgr/tcp_client_posix.cc
  9. 2
      src/core/lib/iomgr/tcp_server_utils_posix_common.cc
  10. 50
      test/core/iomgr/socket_utils_test.cc
  11. 1
      tools/doxygen/Doxyfile.c++
  12. 1
      tools/doxygen/Doxyfile.c++.internal
  13. 1
      tools/doxygen/Doxyfile.core
  14. 1
      tools/doxygen/Doxyfile.core.internal
  15. 1
      tools/doxygen/Doxyfile.objc
  16. 1
      tools/doxygen/Doxyfile.objc.internal
  17. 1
      tools/doxygen/Doxyfile.php

@ -0,0 +1,12 @@
# Quality of Service (QoS) using Differentiated services
Differentiated services or DiffServ is a mechanism for classifying network traffic and providing quality of service on IP networks.
DiffServ uses dedicated fields in the IP header for packet classification purposes.
By marking outgoing packets using a Differentiated Services Code Point (DSCP) the network can prioritize accordingly.
The DSCP value on outgoing packets is controlled by the following channel argument:
* **GRPC_ARG_DSCP**
* This channel argument accepts integer values 0 to 63. See [dscp-registry](https://www.iana.org/assignments/dscp-registry/dscp-registry.xhtml) for details.
* Default value is to use system default, i.e. not set.
* Only apply to POSIX systems.

@ -481,6 +481,9 @@ typedef struct {
* channel arg. Int valued, milliseconds. Defaults to 10 minutes.*/
#define GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS \
"grpc.experimental.server_config_change_drain_grace_time_ms"
/** Configure the Differentiated Services Code Point used on outgoing packets.
* Integer value ranging from 0 to 63. */
#define GRPC_ARG_DSCP "grpc.dscp"
/** \} */
/** Result of a grpc call. If the caller satisfies the prerequisites of a

@ -132,6 +132,7 @@ absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
// If its not a unix socket or vsock address.
GRPC_RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
GRPC_RETURN_IF_ERROR(sock.SetSocketReuseAddr(1));
GRPC_RETURN_IF_ERROR(sock.SetSocketDscp(options.dscp));
sock.TrySetSocketTcpUserTimeout(options, true);
}
GRPC_RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible());
@ -184,6 +185,8 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
options.expand_wildcard_addrs =
(AdjustValue(0, 1, INT_MAX,
config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
options.dscp = AdjustValue(PosixTcpOptions::kDscpNotSet, 0, 63,
config.GetInt(GRPC_ARG_DSCP));
options.allow_reuse_port = PosixSocketWrapper::IsSocketReusePortSupported();
auto allow_reuse_port_value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
if (allow_reuse_port_value.has_value()) {
@ -519,6 +522,39 @@ absl::Status PosixSocketWrapper::SetSocketLowLatency(int low_latency) {
return absl::OkStatus();
}
// Set Differentiated Services Code Point (DSCP)
absl::Status PosixSocketWrapper::SetSocketDscp(int dscp) {
if (dscp == PosixTcpOptions::kDscpNotSet) {
return absl::OkStatus();
}
// The TOS/TrafficClass byte consists of following bits:
// | 7 6 5 4 3 2 | 1 0 |
// | DSCP | ECN |
int newval = dscp << 2;
int val;
socklen_t intlen = sizeof(val);
// Get ECN bits from current IP_TOS value unless IPv6 only
if (0 == getsockopt(fd_, IPPROTO_IP, IP_TOS, &val, &intlen)) {
newval |= (val & 0x3);
if (0 != setsockopt(fd_, IPPROTO_IP, IP_TOS, &newval, sizeof(newval))) {
return absl::Status(
absl::StatusCode::kInternal,
absl::StrCat("setsockopt(IP_TOS): ", grpc_core::StrError(errno)));
}
}
// Get ECN from current Traffic Class value if IPv6 is available
if (0 == getsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &val, &intlen)) {
newval |= (val & 0x3);
if (0 !=
setsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &newval, sizeof(newval))) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("setsockopt(IPV6_TCLASS): ",
grpc_core::StrError(errno)));
}
}
return absl::OkStatus();
}
#if GPR_LINUX == 1
// For Linux, it will be detected to support TCP_USER_TIMEOUT
#ifndef TCP_USER_TIMEOUT
@ -799,6 +835,10 @@ absl::Status PosixSocketWrapper::SetSocketReusePort(int /*reuse*/) {
grpc_core::Crash("unimplemented");
}
absl::Status PosixSocketWrapper::SetSocketDscp(int /*dscp*/) {
grpc_core::Crash("unimplemented");
}
void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool /*enable*/,
int /*timeout*/,
bool /*is_client*/) {}

@ -60,6 +60,7 @@ struct PosixTcpOptions {
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
// Let the system decide the proper buffer size.
static constexpr int kReadBufferSizeUnset = -1;
static constexpr int kDscpNotSet = -1;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
@ -71,6 +72,7 @@ struct PosixTcpOptions {
int keep_alive_timeout_ms = 0;
bool expand_wildcard_addrs = false;
bool allow_reuse_port = false;
int dscp = kDscpNotSet;
grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
struct grpc_socket_mutator* socket_mutator = nullptr;
PosixTcpOptions() = default;
@ -135,6 +137,7 @@ struct PosixTcpOptions {
keep_alive_timeout_ms = other.keep_alive_timeout_ms;
expand_wildcard_addrs = other.expand_wildcard_addrs;
allow_reuse_port = other.allow_reuse_port;
dscp = other.dscp;
}
};
@ -182,6 +185,9 @@ class PosixSocketWrapper {
// Set SO_REUSEPORT
absl::Status SetSocketReusePort(int reuse);
// Set Differentiated Services Code Point (DSCP)
absl::Status SetSocketDscp(int dscp);
// Override default Tcp user timeout values if necessary.
void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options,
bool is_client);

@ -249,6 +249,35 @@ grpc_error_handle grpc_set_socket_low_latency(int fd, int low_latency) {
return absl::OkStatus();
}
/* Set Differentiated Services Code Point (DSCP) */
grpc_error_handle grpc_set_socket_dscp(int fd, int dscp) {
if (dscp == grpc_core::PosixTcpOptions::kDscpNotSet) {
return absl::OkStatus();
}
// The TOS/TrafficClass byte consists of following bits:
// | 7 6 5 4 3 2 | 1 0 |
// | DSCP | ECN |
int value = dscp << 2;
int optval;
socklen_t optlen = sizeof(optval);
// Get ECN bits from current IP_TOS value unless IPv6 only
if (0 == getsockopt(fd, IPPROTO_IP, IP_TOS, &optval, &optlen)) {
value |= (optval & 0x3);
if (0 != setsockopt(fd, IPPROTO_IP, IP_TOS, &value, sizeof(value))) {
return GRPC_OS_ERROR(errno, "setsockopt(IP_TOS)");
}
}
// Get ECN from current Traffic Class value if IPv6 is available
if (0 == getsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &optval, &optlen)) {
value |= (optval & 0x3);
if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_TCLASS, &value, sizeof(value))) {
return GRPC_OS_ERROR(errno, "setsockopt(IPV6_TCLASS)");
}
}
return absl::OkStatus();
}
// The default values for TCP_USER_TIMEOUT are currently configured to be in
// line with the default values of KEEPALIVE_TIMEOUT as proposed in
// https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md

@ -93,6 +93,8 @@ PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
options.allow_reuse_port =
(AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
0);
options.dscp = AdjustValue(PosixTcpOptions::kDscpNotSet, 0, 63,
config.GetInt(GRPC_ARG_DSCP));
if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;

@ -51,6 +51,7 @@ struct PosixTcpOptions {
static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
// Let the system decide the proper buffer size.
static constexpr int kReadBufferSizeUnset = -1;
static constexpr int kDscpNotSet = -1;
int tcp_read_chunk_size = kDefaultReadChunkSize;
int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
@ -60,6 +61,7 @@ struct PosixTcpOptions {
bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
int keep_alive_time_ms = 0;
int keep_alive_timeout_ms = 0;
int dscp = kDscpNotSet;
bool expand_wildcard_addrs = false;
bool allow_reuse_port = false;
RefCountedPtr<ResourceQuota> resource_quota;
@ -126,6 +128,7 @@ struct PosixTcpOptions {
keep_alive_timeout_ms = other.keep_alive_timeout_ms;
expand_wildcard_addrs = other.expand_wildcard_addrs;
allow_reuse_port = other.allow_reuse_port;
dscp = other.dscp;
}
};
@ -159,6 +162,9 @@ grpc_error_handle grpc_set_socket_low_latency(int fd, int low_latency);
// set SO_REUSEPORT
grpc_error_handle grpc_set_socket_reuse_port(int fd, int reuse);
/* Set Differentiated Services Code Point (DSCP) */
grpc_error_handle grpc_set_socket_dscp(int fd, int dscp);
// Configure the default values for TCP_USER_TIMEOUT
void config_default_tcp_user_timeout(bool enable, int timeout, bool is_client);

@ -117,6 +117,8 @@ static grpc_error_handle prepare_socket(
if (!err.ok()) goto error;
err = grpc_set_socket_reuse_addr(fd, 1);
if (!err.ok()) goto error;
err = grpc_set_socket_dscp(fd, options.dscp);
if (!err.ok()) goto error;
err = grpc_set_socket_tcp_user_timeout(fd, options, true /* is_client */);
if (!err.ok()) goto error;
}

@ -212,6 +212,8 @@ grpc_error_handle grpc_tcp_server_prepare_socket(
if (!err.ok()) goto error;
err = grpc_set_socket_reuse_addr(fd, 1);
if (!err.ok()) goto error;
err = grpc_set_socket_dscp(fd, s->options.dscp);
if (!err.ok()) goto error;
err =
grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */);
if (!err.ok()) goto error;

@ -135,6 +135,40 @@ static void test_with_vtable(const grpc_socket_mutator_vtable* vtable) {
ASSERT_FALSE(err.ok());
}
static void test_set_socket_dscp(int sock, int dscp) {
// Get the initial IP_TOS byte that consists of following bits:
// | 7 6 5 4 3 2 | 1 0 |
// | DSCP | ECN |
int optval;
socklen_t optlen = sizeof(optval);
ASSERT_TRUE(getsockopt(sock, IPPROTO_IP, IP_TOS, &optval, &optlen) == 0);
ASSERT_TRUE((optval >> 2) != dscp);
ASSERT_TRUE(
GRPC_LOG_IF_ERROR("set_socket_dscp", grpc_set_socket_dscp(sock, dscp)));
// Verify that value was changed
ASSERT_TRUE(getsockopt(sock, IPPROTO_IP, IP_TOS, &optval, &optlen) == 0);
ASSERT_TRUE((optval >> 2) == dscp);
}
static void test_set_socket_dscp_ipv6(int sock, int dscp) {
int optval;
socklen_t optlen = sizeof(optval);
// Get the initial IPPROTO_IPV6, same bit layout as IP_TOS above.
ASSERT_TRUE(getsockopt(sock, IPPROTO_IPV6, IPV6_TCLASS, &optval, &optlen) ==
0);
ASSERT_TRUE((optval >> 2) != dscp);
ASSERT_TRUE(
GRPC_LOG_IF_ERROR("set_socket_dscp", grpc_set_socket_dscp(sock, dscp)));
// Verify that value was changed
ASSERT_TRUE(getsockopt(sock, IPPROTO_IPV6, IPV6_TCLASS, &optval, &optlen) ==
0);
ASSERT_TRUE((optval >> 2) == dscp);
}
TEST(SocketUtilsTest, MainTest) {
int sock;
@ -157,11 +191,23 @@ TEST(SocketUtilsTest, MainTest) {
grpc_set_socket_low_latency(sock, 1)));
ASSERT_TRUE(GRPC_LOG_IF_ERROR("set_socket_low_latency",
grpc_set_socket_low_latency(sock, 0)));
test_set_socket_dscp(sock, 8 /*CS1*/);
test_set_socket_dscp(sock, 16 /*CS2*/);
close(sock);
if (grpc_ipv6_loopback_available()) {
sock = socket(AF_INET6, SOCK_STREAM, 0);
GPR_ASSERT(sock > 0);
test_set_socket_dscp_ipv6(sock, 8 /*CS1*/);
test_set_socket_dscp_ipv6(sock, 16 /*CS2*/);
close(sock);
}
test_with_vtable(&mutator_vtable);
test_with_vtable(&mutator_vtable2);
close(sock);
}
int main(int argc, char** argv) {

@ -788,6 +788,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -788,6 +788,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -795,6 +795,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -795,6 +795,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -786,6 +786,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -786,6 +786,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

@ -786,6 +786,7 @@ doc/interop-test-descriptions.md \
doc/keepalive.md \
doc/load-balancing.md \
doc/naming.md \
doc/qos-dscp.md \
doc/security_audit.md \
doc/server-reflection.md \
doc/server_reflection_tutorial.md \

Loading…
Cancel
Save