MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kafka 架构下的消息过滤机制

2023-05-125.6k 阅读

Kafka 消息过滤机制概述

在 Kafka 架构中,消息过滤机制是一个重要的功能组件,它允许用户根据特定的规则对消息进行筛选,从而使消费者能够只获取到符合条件的消息。这在实际应用场景中非常有用,例如,在一个电商系统中,可能有各种类型的订单消息,包括普通订单、促销订单、团购订单等。通过消息过滤机制,不同的服务(如订单处理服务、财务结算服务等)可以只关注并获取与自身业务相关的订单消息,提高系统的处理效率和资源利用率。

Kafka 本身并没有像一些企业级消息队列那样提供内置的、开箱即用的丰富消息过滤功能。然而,通过合理利用 Kafka 的特性和一些编程技巧,我们可以实现灵活有效的消息过滤机制。常见的消息过滤方式主要基于消息的属性(如消息头、消息键等)和消息体内容进行过滤。

基于消息属性的过滤

  1. 消息头(Headers)过滤 Kafka 从 0.11.0.0 版本开始支持为消息添加自定义的消息头。消息头是一个键值对集合,可以携带额外的元数据信息。通过在生产者端添加合适的消息头,在消费者端就可以根据这些消息头来过滤消息。

在生产者端添加消息头示例代码(以 Java 为例):

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;

public class KafkaProducerWithHeaders {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(props);

        String topic = "test-topic";
        String key = "message-key";
        String value = "message-value";

        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("message-type", "order".getBytes()));

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value, headers);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    exception.printStackTrace();
                } else {
                    System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
                }
            }
        });

        producer.close();
    }
}

在消费者端根据消息头过滤消息示例代码:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

public class KafkaConsumerWithHeadersFilter {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                boolean isOrderMessage = false;
                for (Header header : record.headers()) {
                    if ("message-type".equals(new String(header.key())) && "order".equals(new String(header.value()))) {
                        isOrderMessage = true;
                        break;
                    }
                }
                if (isOrderMessage) {
                    System.out.println("Received order message: key = " + record.key() + ", value = " + record.value());
                }
            }
        }
    }
}
  1. 消息键(Key)过滤 消息键在 Kafka 中有多种用途,其中之一就是可以作为过滤的依据。例如,在一个分布式系统中,消息键可能代表不同的业务实体 ID。如果某些消费者只关心特定业务实体相关的消息,就可以根据消息键进行过滤。

生产者端设置消息键示例代码(以 Python 为例,使用 kafka-python 库):

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'test-topic'
key = b'user123'
value = b'User 123 related message'
producer.send(topic, key=key, value=value)
producer.close()

消费者端根据消息键过滤消息示例代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')
for message in consumer:
    if message.key == b'user123':
        print(f"Received message for user123: {message.value.decode('utf-8')}")

基于消息体内容的过滤

简单文本匹配过滤

如果消息体是简单的文本格式,我们可以通过字符串匹配的方式进行过滤。例如,在一个日志收集系统中,消息体可能是日志文本,我们可以根据特定的关键字(如错误信息关键字)来过滤出相关的日志消息。

以 Java 为例,假设消息体是字符串,在消费者端进行关键字匹配过滤:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

public class KafkaConsumerTextFilter {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                if (record.value().contains("error")) {
                    System.out.println("Received error message: key = " + record.key() + ", value = " + record.value());
                }
            }
        }
    }
}

基于 JSON 消息体的过滤

在现代的微服务架构中,很多消息体采用 JSON 格式来传递复杂的数据结构。对于 JSON 格式的消息体,我们可以利用 JSON 解析库来提取特定的字段,并根据这些字段的值进行过滤。

假设消息体是如下 JSON 格式:

{
    "order_id": "12345",
    "order_status": "paid",
    "customer_name": "John Doe"
}

在消费者端使用 Jackson 库(Java)进行 JSON 消息体过滤示例代码:

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

public class KafkaConsumerJsonFilter {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        ObjectMapper objectMapper = new ObjectMapper();

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    JsonNode jsonNode = objectMapper.readTree(record.value());
                    if ("paid".equals(jsonNode.get("order_status").asText())) {
                        System.out.println("Received paid order message: key = " + record.key() + ", value = " + record.value());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

复杂条件过滤与表达式语言

对于更复杂的过滤需求,我们可以引入表达式语言(Expression Language)。例如,在一个数据处理管道中,可能需要根据多个字段的逻辑关系进行过滤,如“订单金额大于 100 且订单状态为已发货”。

以 SpEL(Spring Expression Language)为例,假设消息体是一个 Java 对象,在消费者端进行复杂条件过滤: 首先定义消息类:

public class Order {
    private String orderId;
    private double amount;
    private String status;

    // getters and setters
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public double getAmount() {
        return amount;
    }

    public void setAmount(double amount) {
        this.amount = amount;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}

消费者端使用 SpEL 进行过滤示例代码:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.util.*;

public class KafkaConsumerSpELFilter {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        String topic = "test-topic";
        consumer.subscribe(Collections.singletonList(topic));

        ExpressionParser parser = new SpelExpressionParser();
        Expression expression = parser.parseExpression("amount > 100 and status == 'shipped'");

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    Order order = new ObjectMapper().readValue(record.value(), Order.class);
                    StandardEvaluationContext context = new StandardEvaluationContext(order);
                    if (expression.getValue(context, Boolean.class)) {
                        System.out.println("Received qualified order message: key = " + record.key() + ", value = " + record.value());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Kafka Streams 中的消息过滤

Kafka Streams 是 Kafka 提供的一个流处理库,它基于 Kafka 的主题和分区模型构建,提供了丰富的流处理功能,其中也包括消息过滤。Kafka Streams 允许用户通过 DSL(Domain - Specific Language)来定义消息处理逻辑,使得消息过滤的实现更加简洁和直观。

使用 Kafka Streams DSL 进行过滤

假设我们有一个主题 input - topic,其中的消息是用户的操作记录,格式为 JSON,包含 user_idoperation_typetimestamp 等字段。我们希望过滤出 operation_typepurchase 的消息,并将其发送到 output - topic

以下是使用 Java 和 Kafka Streams DSL 实现的示例代码:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;

public class KafkaStreamsFilterExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter - application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("input - topic");

        KStream<String, String> filteredStream = inputStream.filter((key, value) -> {
            try {
                JsonNode jsonNode = new ObjectMapper().readTree(value);
                return "purchase".equals(jsonNode.get("operation_type").asText());
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        });

        filteredStream.to("output - topic", Produced.with(Serdes.String(), Serdes.String()));

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

在上述代码中,首先通过 builder.stream("input - topic") 获取输入流,然后使用 filter 方法定义过滤逻辑。在 filter 方法的闭包中,对 JSON 格式的消息体进行解析,并根据 operation_type 字段的值进行过滤。最后,将过滤后的消息发送到 output - topic

状态化过滤

Kafka Streams 还支持状态化过滤,即根据之前处理的消息状态来过滤当前消息。例如,在一个用户登录跟踪系统中,我们可能只关心某个用户首次登录之后的操作消息。

假设消息体包含 user_idoperation_type 字段,并且 operation_type 可能的值为 loginaction。我们可以使用 Kafka Streams 的 StateStore 来实现状态化过滤。

首先定义一个 UserLoginState 类来表示用户的登录状态:

import java.io.Serializable;

public class UserLoginState implements Serializable {
    private boolean isLoggedIn;

    public UserLoginState() {
        this.isLoggedIn = false;
    }

    public boolean isLoggedIn() {
        return isLoggedIn;
    }

    public void setLoggedIn(boolean loggedIn) {
        isLoggedIn = loggedIn;
    }
}

然后使用 Kafka Streams 实现状态化过滤:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;

public class KafkaStreamsStatefulFilterExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful - filter - application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> inputStream = builder.stream("input - topic");

        Materialized<String, UserLoginState, KeyValueStore<Bytes, byte[]>> materialized =
                Materialized.<String, UserLoginState>as("user - login - state - store")
                       .withKeySerde(Serdes.String())
                       .withValueSerde(Serdes.serdeFrom(new UserLoginStateSerializer(), new UserLoginStateDeserializer()));

        KStream<String, String> filteredStream = inputStream.transformValues(() -> new UserLoginStateTransformer(), materialized);

        filteredStream.to("output - topic", Produced.with(Serdes.String(), Serdes.String()));

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

class UserLoginStateTransformer implements Transformer<String, String, String> {
    private KeyValueStore<String, UserLoginState> stateStore;

    @Override
    public void init(ProcessorContext context) {
        stateStore = (KeyValueStore<String, UserLoginState>) context.getStateStore("user - login - state - store");
    }

    @Override
    public String transform(String key, String value) {
        try {
            JsonNode jsonNode = new ObjectMapper().readTree(value);
            String operationType = jsonNode.get("operation_type").asText();
            UserLoginState state = stateStore.get(key);
            if (state == null) {
                state = new UserLoginState();
            }
            if ("login".equals(operationType)) {
                state.setLoggedIn(true);
                stateStore.put(key, state);
                return null;
            } else if (state.isLoggedIn()) {
                return value;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void close() {
        // 关闭资源
    }
}

class UserLoginStateSerializer implements Serializer<UserLoginState> {
    @Override
    public byte[] serialize(String topic, UserLoginState data) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

class UserLoginStateDeserializer implements Deserializer<UserLoginState> {
    @Override
    public UserLoginState deserialize(String topic, byte[] data) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            return objectMapper.readValue(data, UserLoginState.class);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

在上述代码中,首先定义了一个 UserLoginStateTransformer 类,它实现了 Transformer 接口。在 init 方法中获取 StateStore,在 transform 方法中根据用户的登录状态和当前操作类型来决定是否过滤消息。如果是登录操作,更新用户登录状态并过滤掉该消息;如果用户已登录且当前操作不是登录操作,则保留该消息。通过这种方式实现了状态化过滤。

自定义消息过滤组件

在一些复杂的业务场景中,Kafka 现有的功能和简单的编程实现可能无法满足需求,这时我们可以考虑开发自定义的消息过滤组件。

自定义过滤组件架构设计

自定义消息过滤组件通常需要与 Kafka 的生产者和消费者进行集成。一种常见的架构设计是在生产者和消费者之间添加一个过滤层。这个过滤层可以是一个独立的服务,也可以是一个嵌入到应用程序中的模块。

  1. 独立服务架构 在独立服务架构中,生产者将消息发送到 Kafka 之前,先将消息发送到自定义过滤服务。过滤服务根据预定义的规则对消息进行过滤,然后将符合条件的消息发送到 Kafka。在消费者端,消费者直接从 Kafka 消费经过过滤的消息。

这种架构的优点是解耦性强,过滤逻辑的修改和扩展不会影响到生产者和消费者的业务逻辑。缺点是增加了系统的复杂性和网络开销。

  1. 嵌入模块架构 在嵌入模块架构中,过滤逻辑以模块的形式嵌入到生产者或消费者应用程序中。例如,在生产者应用程序中,添加一个过滤模块,在消息发送到 Kafka 之前进行过滤。这种架构的优点是性能较高,因为不需要额外的网络通信。缺点是耦合度较高,过滤逻辑的修改可能会影响到整个应用程序的代码。

自定义过滤组件实现示例

以 Python 为例,假设我们要开发一个简单的自定义过滤组件,用于过滤掉消息体中包含敏感词汇的消息。

首先定义敏感词汇列表:

sensitive_words = ['敏感词1', '敏感词2', '敏感词3']

然后实现过滤函数:

def filter_message(message):
    for word in sensitive_words:
        if word in message:
            return False
    return True

在生产者端使用自定义过滤组件示例代码(假设使用 kafka - python 库):

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'test - topic'
messages = ['普通消息', '包含敏感词1的消息', '另一条普通消息']

for message in messages:
    if filter_message(message):
        producer.send(topic, value=message.encode('utf - 8'))
producer.close()

在这个示例中,通过定义 filter_message 函数来检查消息是否包含敏感词汇。在生产者端,在发送消息之前调用该函数进行过滤,只有不包含敏感词汇的消息才会被发送到 Kafka。

消息过滤机制的性能与优化

性能影响因素

  1. 过滤逻辑复杂度 过滤逻辑越复杂,对性能的影响越大。例如,基于 JSON 解析和复杂表达式语言的过滤,需要更多的 CPU 和内存资源来处理消息。相比之下,简单的消息头或消息键过滤性能较高。
  2. 消息量 随着消息量的增加,过滤操作的累计开销也会增大。如果在高吞吐量的场景下进行复杂的过滤,可能会导致消息处理延迟增加。
  3. 过滤位置 在 Kafka 架构中,过滤位置也会影响性能。如果在生产者端进行过滤,可以减少发送到 Kafka 的消息量,从而降低 Kafka 集群的负载。但如果过滤逻辑依赖于 Kafka 中的某些状态(如消息的顺序等),则可能需要在消费者端进行过滤,这可能会增加消费者的处理负担。

性能优化策略

  1. 简化过滤逻辑 尽量使用简单的过滤条件,例如优先使用消息头和消息键过滤,避免不必要的复杂 JSON 解析和表达式计算。如果必须进行复杂过滤,可以考虑在消息量相对较小的阶段(如在生产者端对少量关键消息进行预过滤)进行。
  2. 缓存与状态复用 在状态化过滤场景中,合理使用缓存和复用状态信息可以提高性能。例如,在 Kafka Streams 的状态化过滤中,通过 StateStore 来缓存用户状态,避免重复计算。
  3. 并行处理 在消费者端,可以通过增加消费者实例的数量来并行处理消息,提高整体的过滤效率。同时,Kafka Streams 也支持通过分区并行处理消息,通过合理设置分区数和并行度,可以充分利用系统资源。

例如,在 Kafka Streams 中,可以通过设置 StreamsConfig.NUM_STREAM_THREADS_CONFIG 属性来调整并行处理的线程数:

props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);

通过以上性能优化策略,可以在保证消息过滤功能的同时,尽可能减少对系统性能的影响,提高 Kafka 架构下消息处理的整体效率。