From aa48c45682fad21d03a2750b0a3ec16b62c5971c Mon Sep 17 00:00:00 2001 From: "data-plane-api(Azure Pipelines)" Date: Thu, 9 Mar 2023 16:25:52 +0000 Subject: [PATCH] kafka: provide Fetch support, giving mesh-filter stateful consumer proxy (#25611) Signed-off-by: Adam Kotwasinski Mirrored from https://github.com/envoyproxy/envoy @ 5e83af5042ec4ff87cad9d3baf476fbd57a7d048 --- .../kafka_mesh/v3alpha/kafka_mesh.proto | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto b/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto index 4a906cde..68c71f29 100644 --- a/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto +++ b/contrib/envoy/extensions/filters/network/kafka_mesh/v3alpha/kafka_mesh.proto @@ -18,7 +18,17 @@ option (xds.annotations.v3.file_status).work_in_progress = true; // Kafka Mesh :ref:`configuration overview `. // [#extension: envoy.filters.network.kafka_mesh] +// [#next-free-field: 6] message KafkaMesh { + enum ConsumerProxyMode { + // Records received are going to be distributed amongst downstream consumer connections. + // In this mode Envoy uses librdkafka consumers pointing at upstream Kafka clusters, what means that these + // consumers' position is meaningful and affects what records are received from upstream. + // Users might want to take a look into these consumers' custom configuration to manage their auto-committing + // capabilities, as it will impact Envoy's behaviour in case of restarts. + StatefulConsumerProxy = 0; + } + // Envoy's host that's advertised to clients. // Has the same meaning as corresponding Kafka broker properties. // Usually equal to filter chain's listener config, but needs to be reachable by clients @@ -33,8 +43,12 @@ message KafkaMesh { // Rules that will decide which cluster gets which request. repeated ForwardingRule forwarding_rules = 4; + + // How the consumer proxying should behave - this relates mostly to Fetch request handling. + ConsumerProxyMode consumer_proxy_mode = 5; } +// [#next-free-field: 6] message KafkaClusterDefinition { // Cluster name. string cluster_name = 1 [(validate.rules).string = {min_len: 1}]; @@ -44,10 +58,14 @@ message KafkaClusterDefinition { // Default number of partitions present in this cluster. // This is especially important for clients that do not specify partition in their payloads and depend on this value for hashing. + // The same number of partitions is going to be used by upstream-pointing Kafka consumers for consumer proxying scenarios. int32 partition_count = 3 [(validate.rules).int32 = {gt: 0}]; // Custom configuration passed to Kafka producer. map producer_config = 4; + + // Custom configuration passed to Kafka consumer. + map consumer_config = 5; } message ForwardingRule {