Kafka 架构下的消息过滤机制
Kafka 消息过滤机制概述
在 Kafka 架构中,消息过滤机制是一个重要的功能组件,它允许用户根据特定的规则对消息进行筛选,从而使消费者能够只获取到符合条件的消息。这在实际应用场景中非常有用,例如,在一个电商系统中,可能有各种类型的订单消息,包括普通订单、促销订单、团购订单等。通过消息过滤机制,不同的服务(如订单处理服务、财务结算服务等)可以只关注并获取与自身业务相关的订单消息,提高系统的处理效率和资源利用率。
Kafka 本身并没有像一些企业级消息队列那样提供内置的、开箱即用的丰富消息过滤功能。然而,通过合理利用 Kafka 的特性和一些编程技巧,我们可以实现灵活有效的消息过滤机制。常见的消息过滤方式主要基于消息的属性(如消息头、消息键等)和消息体内容进行过滤。
基于消息属性的过滤
- 消息头(Headers)过滤 Kafka 从 0.11.0.0 版本开始支持为消息添加自定义的消息头。消息头是一个键值对集合,可以携带额外的元数据信息。通过在生产者端添加合适的消息头,在消费者端就可以根据这些消息头来过滤消息。
在生产者端添加消息头示例代码(以 Java 为例):
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;
public class KafkaProducerWithHeaders {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(props);
String topic = "test-topic";
String key = "message-key";
String value = "message-value";
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("message-type", "order".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value, headers);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset());
}
}
});
producer.close();
}
}
在消费者端根据消息头过滤消息示例代码:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class KafkaConsumerWithHeadersFilter {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
boolean isOrderMessage = false;
for (Header header : record.headers()) {
if ("message-type".equals(new String(header.key())) && "order".equals(new String(header.value()))) {
isOrderMessage = true;
break;
}
}
if (isOrderMessage) {
System.out.println("Received order message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
}
- 消息键(Key)过滤 消息键在 Kafka 中有多种用途,其中之一就是可以作为过滤的依据。例如,在一个分布式系统中,消息键可能代表不同的业务实体 ID。如果某些消费者只关心特定业务实体相关的消息,就可以根据消息键进行过滤。
生产者端设置消息键示例代码(以 Python 为例,使用 kafka-python
库):
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'test-topic'
key = b'user123'
value = b'User 123 related message'
producer.send(topic, key=key, value=value)
producer.close()
消费者端根据消息键过滤消息示例代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer('test-topic', bootstrap_servers='localhost:9092')
for message in consumer:
if message.key == b'user123':
print(f"Received message for user123: {message.value.decode('utf-8')}")
基于消息体内容的过滤
简单文本匹配过滤
如果消息体是简单的文本格式,我们可以通过字符串匹配的方式进行过滤。例如,在一个日志收集系统中,消息体可能是日志文本,我们可以根据特定的关键字(如错误信息关键字)来过滤出相关的日志消息。
以 Java 为例,假设消息体是字符串,在消费者端进行关键字匹配过滤:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class KafkaConsumerTextFilter {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.value().contains("error")) {
System.out.println("Received error message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
}
基于 JSON 消息体的过滤
在现代的微服务架构中,很多消息体采用 JSON 格式来传递复杂的数据结构。对于 JSON 格式的消息体,我们可以利用 JSON 解析库来提取特定的字段,并根据这些字段的值进行过滤。
假设消息体是如下 JSON 格式:
{
"order_id": "12345",
"order_status": "paid",
"customer_name": "John Doe"
}
在消费者端使用 Jackson 库(Java)进行 JSON 消息体过滤示例代码:
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class KafkaConsumerJsonFilter {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
ObjectMapper objectMapper = new ObjectMapper();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
JsonNode jsonNode = objectMapper.readTree(record.value());
if ("paid".equals(jsonNode.get("order_status").asText())) {
System.out.println("Received paid order message: key = " + record.key() + ", value = " + record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
复杂条件过滤与表达式语言
对于更复杂的过滤需求,我们可以引入表达式语言(Expression Language)。例如,在一个数据处理管道中,可能需要根据多个字段的逻辑关系进行过滤,如“订单金额大于 100 且订单状态为已发货”。
以 SpEL(Spring Expression Language)为例,假设消息体是一个 Java 对象,在消费者端进行复杂条件过滤: 首先定义消息类:
public class Order {
private String orderId;
private double amount;
private String status;
// getters and setters
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
消费者端使用 SpEL 进行过滤示例代码:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.util.*;
public class KafkaConsumerSpELFilter {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
ExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression("amount > 100 and status == 'shipped'");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
Order order = new ObjectMapper().readValue(record.value(), Order.class);
StandardEvaluationContext context = new StandardEvaluationContext(order);
if (expression.getValue(context, Boolean.class)) {
System.out.println("Received qualified order message: key = " + record.key() + ", value = " + record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Kafka Streams 中的消息过滤
Kafka Streams 是 Kafka 提供的一个流处理库,它基于 Kafka 的主题和分区模型构建,提供了丰富的流处理功能,其中也包括消息过滤。Kafka Streams 允许用户通过 DSL(Domain - Specific Language)来定义消息处理逻辑,使得消息过滤的实现更加简洁和直观。
使用 Kafka Streams DSL 进行过滤
假设我们有一个主题 input - topic
,其中的消息是用户的操作记录,格式为 JSON,包含 user_id
、operation_type
和 timestamp
等字段。我们希望过滤出 operation_type
为 purchase
的消息,并将其发送到 output - topic
。
以下是使用 Java 和 Kafka Streams DSL 实现的示例代码:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
public class KafkaStreamsFilterExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter - application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("input - topic");
KStream<String, String> filteredStream = inputStream.filter((key, value) -> {
try {
JsonNode jsonNode = new ObjectMapper().readTree(value);
return "purchase".equals(jsonNode.get("operation_type").asText());
} catch (Exception e) {
e.printStackTrace();
return false;
}
});
filteredStream.to("output - topic", Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
在上述代码中,首先通过 builder.stream("input - topic")
获取输入流,然后使用 filter
方法定义过滤逻辑。在 filter
方法的闭包中,对 JSON 格式的消息体进行解析,并根据 operation_type
字段的值进行过滤。最后,将过滤后的消息发送到 output - topic
。
状态化过滤
Kafka Streams 还支持状态化过滤,即根据之前处理的消息状态来过滤当前消息。例如,在一个用户登录跟踪系统中,我们可能只关心某个用户首次登录之后的操作消息。
假设消息体包含 user_id
和 operation_type
字段,并且 operation_type
可能的值为 login
和 action
。我们可以使用 Kafka Streams 的 StateStore
来实现状态化过滤。
首先定义一个 UserLoginState
类来表示用户的登录状态:
import java.io.Serializable;
public class UserLoginState implements Serializable {
private boolean isLoggedIn;
public UserLoginState() {
this.isLoggedIn = false;
}
public boolean isLoggedIn() {
return isLoggedIn;
}
public void setLoggedIn(boolean loggedIn) {
isLoggedIn = loggedIn;
}
}
然后使用 Kafka Streams 实现状态化过滤:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;
public class KafkaStreamsStatefulFilterExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stateful - filter - application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("input - topic");
Materialized<String, UserLoginState, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<String, UserLoginState>as("user - login - state - store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.serdeFrom(new UserLoginStateSerializer(), new UserLoginStateDeserializer()));
KStream<String, String> filteredStream = inputStream.transformValues(() -> new UserLoginStateTransformer(), materialized);
filteredStream.to("output - topic", Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
class UserLoginStateTransformer implements Transformer<String, String, String> {
private KeyValueStore<String, UserLoginState> stateStore;
@Override
public void init(ProcessorContext context) {
stateStore = (KeyValueStore<String, UserLoginState>) context.getStateStore("user - login - state - store");
}
@Override
public String transform(String key, String value) {
try {
JsonNode jsonNode = new ObjectMapper().readTree(value);
String operationType = jsonNode.get("operation_type").asText();
UserLoginState state = stateStore.get(key);
if (state == null) {
state = new UserLoginState();
}
if ("login".equals(operationType)) {
state.setLoggedIn(true);
stateStore.put(key, state);
return null;
} else if (state.isLoggedIn()) {
return value;
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Override
public void close() {
// 关闭资源
}
}
class UserLoginStateSerializer implements Serializer<UserLoginState> {
@Override
public byte[] serialize(String topic, UserLoginState data) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
class UserLoginStateDeserializer implements Deserializer<UserLoginState> {
@Override
public UserLoginState deserialize(String topic, byte[] data) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(data, UserLoginState.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
在上述代码中,首先定义了一个 UserLoginStateTransformer
类,它实现了 Transformer
接口。在 init
方法中获取 StateStore
,在 transform
方法中根据用户的登录状态和当前操作类型来决定是否过滤消息。如果是登录操作,更新用户登录状态并过滤掉该消息;如果用户已登录且当前操作不是登录操作,则保留该消息。通过这种方式实现了状态化过滤。
自定义消息过滤组件
在一些复杂的业务场景中,Kafka 现有的功能和简单的编程实现可能无法满足需求,这时我们可以考虑开发自定义的消息过滤组件。
自定义过滤组件架构设计
自定义消息过滤组件通常需要与 Kafka 的生产者和消费者进行集成。一种常见的架构设计是在生产者和消费者之间添加一个过滤层。这个过滤层可以是一个独立的服务,也可以是一个嵌入到应用程序中的模块。
- 独立服务架构 在独立服务架构中,生产者将消息发送到 Kafka 之前,先将消息发送到自定义过滤服务。过滤服务根据预定义的规则对消息进行过滤,然后将符合条件的消息发送到 Kafka。在消费者端,消费者直接从 Kafka 消费经过过滤的消息。
这种架构的优点是解耦性强,过滤逻辑的修改和扩展不会影响到生产者和消费者的业务逻辑。缺点是增加了系统的复杂性和网络开销。
- 嵌入模块架构 在嵌入模块架构中,过滤逻辑以模块的形式嵌入到生产者或消费者应用程序中。例如,在生产者应用程序中,添加一个过滤模块,在消息发送到 Kafka 之前进行过滤。这种架构的优点是性能较高,因为不需要额外的网络通信。缺点是耦合度较高,过滤逻辑的修改可能会影响到整个应用程序的代码。
自定义过滤组件实现示例
以 Python 为例,假设我们要开发一个简单的自定义过滤组件,用于过滤掉消息体中包含敏感词汇的消息。
首先定义敏感词汇列表:
sensitive_words = ['敏感词1', '敏感词2', '敏感词3']
然后实现过滤函数:
def filter_message(message):
for word in sensitive_words:
if word in message:
return False
return True
在生产者端使用自定义过滤组件示例代码(假设使用 kafka - python
库):
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic = 'test - topic'
messages = ['普通消息', '包含敏感词1的消息', '另一条普通消息']
for message in messages:
if filter_message(message):
producer.send(topic, value=message.encode('utf - 8'))
producer.close()
在这个示例中,通过定义 filter_message
函数来检查消息是否包含敏感词汇。在生产者端,在发送消息之前调用该函数进行过滤,只有不包含敏感词汇的消息才会被发送到 Kafka。
消息过滤机制的性能与优化
性能影响因素
- 过滤逻辑复杂度 过滤逻辑越复杂,对性能的影响越大。例如,基于 JSON 解析和复杂表达式语言的过滤,需要更多的 CPU 和内存资源来处理消息。相比之下,简单的消息头或消息键过滤性能较高。
- 消息量 随着消息量的增加,过滤操作的累计开销也会增大。如果在高吞吐量的场景下进行复杂的过滤,可能会导致消息处理延迟增加。
- 过滤位置 在 Kafka 架构中,过滤位置也会影响性能。如果在生产者端进行过滤,可以减少发送到 Kafka 的消息量,从而降低 Kafka 集群的负载。但如果过滤逻辑依赖于 Kafka 中的某些状态(如消息的顺序等),则可能需要在消费者端进行过滤,这可能会增加消费者的处理负担。
性能优化策略
- 简化过滤逻辑 尽量使用简单的过滤条件,例如优先使用消息头和消息键过滤,避免不必要的复杂 JSON 解析和表达式计算。如果必须进行复杂过滤,可以考虑在消息量相对较小的阶段(如在生产者端对少量关键消息进行预过滤)进行。
- 缓存与状态复用
在状态化过滤场景中,合理使用缓存和复用状态信息可以提高性能。例如,在 Kafka Streams 的状态化过滤中,通过
StateStore
来缓存用户状态,避免重复计算。 - 并行处理 在消费者端,可以通过增加消费者实例的数量来并行处理消息,提高整体的过滤效率。同时,Kafka Streams 也支持通过分区并行处理消息,通过合理设置分区数和并行度,可以充分利用系统资源。
例如,在 Kafka Streams 中,可以通过设置 StreamsConfig.NUM_STREAM_THREADS_CONFIG
属性来调整并行处理的线程数:
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
通过以上性能优化策略,可以在保证消息过滤功能的同时,尽可能减少对系统性能的影响,提高 Kafka 架构下消息处理的整体效率。