Kafka 消息序列化与反序列化详解
Kafka 消息序列化与反序列化概述
在 Kafka 中,消息在生产者端被序列化后发送到 Kafka 集群,在消费者端又需要反序列化才能被正确处理。序列化和反序列化的过程确保了消息能够以一种紧凑、高效且可在不同语言和系统间通用的格式在网络中传输,以及在 Kafka 的存储体系中存储。
序列化的主要目的有:
- 数据压缩:将对象转换为字节数组的过程中,可以对数据进行压缩,减少网络传输带宽和 Kafka 存储的空间占用。
- 跨语言兼容性:Kafka 客户端可以使用多种编程语言实现,通过特定的序列化格式,如 Avro、Protobuf 等,可以确保不同语言编写的生产者和消费者能够正确地理解和处理消息。
- 数据一致性:序列化格式通常会有一定的规范,按照规范进行序列化和反序列化可以保证数据在传输和存储过程中的一致性,避免数据丢失或损坏。
反序列化则是将接收到的字节数组还原为可处理的对象形式,使得消费者能够对消息进行业务逻辑处理。
Kafka 内置的序列化器与反序列化器
Kafka 提供了几种内置的序列化器和反序列化器:
- ByteArraySerializer 和 ByteArrayDeserializer:这是最基本的序列化器和反序列化器,直接将数据作为字节数组处理。适用于已经是字节数组形式的数据,或者不需要对数据进行结构化解析的场景。例如,如果你要发送一些二进制数据,如图片、视频片段等,使用这种方式就很合适。
- StringSerializer 和 StringDeserializer:用于将字符串类型的数据进行序列化和反序列化。这对于处理文本消息非常方便,生产者将字符串消息序列化为字节数组发送,消费者再将字节数组反序列化为字符串。
- IntegerSerializer 和 IntegerDeserializer:专门用于整数类型数据的序列化和反序列化。如果你的消息数据是整数,如计数、ID 等,使用这对序列化器和反序列化器可以确保数据的正确处理。
代码示例:使用内置序列化器与反序列化器
以下是使用 Java 编写的简单示例,展示如何使用 Kafka 内置的序列化器和反序列化器。
生产者示例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaStringProducer {
public static void main(String[] args) {
// 设置生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
String topic = "test-topic";
String key = "message-key";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
// 关闭生产者
producer.close();
}
}
在上述代码中,我们配置了 StringSerializer
作为键和值的序列化器。通过 ProducerRecord
将字符串类型的键和值封装成消息发送到 Kafka 集群。
消费者示例
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaStringConsumer {
public static void main(String[] args) {
// 设置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: key = " + record.key() + ", value = " + record.value());
}
}
}
}
这里我们配置了 StringDeserializer
来反序列化接收到的消息。消费者从 Kafka 集群拉取消息,并将键和值反序列化为字符串后进行打印。
自定义序列化器与反序列化器
虽然 Kafka 内置的序列化器和反序列化器能满足一些基本需求,但在实际应用中,我们常常需要处理复杂的数据结构,这时就需要自定义序列化器和反序列化器。
自定义序列化器
自定义序列化器需要实现 org.apache.kafka.common.serialization.Serializer
接口。该接口包含三个方法:configure
、serialize
和 close
。
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomUserSerializer implements Serializer<CustomUser> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 初始化配置,例如设置编码格式等
}
@Override
public byte[] serialize(String topic, CustomUser data) {
if (data == null) {
return null;
}
byte[] nameBytes = data.getName().getBytes();
int nameLength = nameBytes.length;
int age = data.getAge();
ByteBuffer buffer = ByteBuffer.allocate(4 + nameLength + 4);
buffer.putInt(nameLength);
buffer.put(nameBytes);
buffer.putInt(age);
return buffer.array();
}
@Override
public void close() {
// 清理资源
}
}
在上述代码中,我们定义了一个 CustomUserSerializer
来序列化 CustomUser
对象。CustomUser
类包含一个字符串类型的名字和一个整数类型的年龄。在 serialize
方法中,我们将名字的长度、名字字节数组和年龄依次写入 ByteBuffer
,最后返回字节数组。
自定义反序列化器
自定义反序列化器需要实现 org.apache.kafka.common.serialization.Deserializer
接口,同样包含 configure
、deserialize
和 close
方法。
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomUserDeserializer implements Deserializer<CustomUser> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 初始化配置
}
@Override
public CustomUser deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int nameLength = buffer.getInt();
byte[] nameBytes = new byte[nameLength];
buffer.get(nameBytes);
int age = buffer.getInt();
String name = new String(nameBytes);
return new CustomUser(name, age);
}
@Override
public void close() {
// 清理资源
}
}
在 CustomUserDeserializer
中,deserialize
方法从接收到的字节数组中按照之前序列化的顺序读取名字长度、名字字节数组和年龄,构建并返回 CustomUser
对象。
使用自定义序列化器与反序列化器的生产者和消费者示例
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomUserProducer {
public static void main(String[] args) {
// 设置生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomUserSerializer.class.getName());
// 创建 Kafka 生产者
KafkaProducer<String, CustomUser> producer = new KafkaProducer<>(props);
// 发送消息
String topic = "custom-user-topic";
String key = "user-key";
CustomUser user = new CustomUser("Alice", 25);
ProducerRecord<String, CustomUser> record = new ProducerRecord<>(topic, key, user);
producer.send(record);
// 关闭生产者
producer.close();
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CustomUserConsumer {
public static void main(String[] args) {
// 设置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-user-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomUserDeserializer.class.getName());
// 创建 Kafka 消费者
KafkaConsumer<String, CustomUser> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("custom-user-topic"));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, CustomUser> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, CustomUser> record : records) {
System.out.println("Received user: name = " + record.value().getName() + ", age = " + record.value().getAge());
}
}
}
}
在上述生产者和消费者示例中,我们分别使用了自定义的 CustomUserSerializer
和 CustomUserDeserializer
来处理 CustomUser
对象的序列化和反序列化。
常用第三方序列化框架
除了自定义序列化和使用 Kafka 内置序列化器外,还有一些第三方序列化框架在 Kafka 应用中也非常流行。
Avro
Avro 是一种数据序列化系统,它具有丰富的数据类型、紧凑的二进制格式、对动态语言友好以及支持模式演化等特点。
在 Kafka 中使用 Avro,需要引入相关依赖。以 Maven 为例:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.2.0</version>
</dependency>
定义 Avro 模式文件(例如 user.avsc
):
{
"namespace": "com.example",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
]
}
使用 Avro Maven 插件生成 Java 类:
mvn clean install
生产者示例:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
public class AvroProducer {
public static void main(String[] args) throws IOException {
// 读取 Avro 模式
Schema schema = new Schema.Parser().parse(Files.newInputStream(Paths.get("user.avsc")));
// 设置生产者属性
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
// 创建 Kafka 生产者
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// 创建 Avro 记录
GenericRecord user = new GenericData.Record(schema);
user.put("name", "Bob");
user.put("age", 30);
// 发送消息
String topic = "avro-topic";
String key = "avro-key";
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, user);
producer.send(record);
// 关闭生产者
producer.close();
}
}
消费者示例:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class AvroConsumer {
public static void main(String[] args) throws IOException {
// 读取 Avro 模式
Schema schema = new Schema.Parser().parse(Files.newInputStream(Paths.get("user.avsc")));
// 设置消费者属性
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
// 创建 Kafka 消费者
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("avro-topic"));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("Received user: name = " + user.get("name") + ", age = " + user.get("age"));
}
}
}
}
在上述示例中,我们使用了 Confluent 的 Kafka Avro 序列化器和反序列化器,并通过 Schema Registry 来管理 Avro 模式。
Protobuf
Protobuf(Protocol Buffers)是 Google 开发的一种数据序列化格式,具有高效、轻便的特点。
引入 Protobuf 依赖(Maven):
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.17.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-protobuf-serde</artifactId>
<version>3.2.0</version>
</dependency>
定义 Protobuf 消息文件(例如 user.proto
):
syntax = "proto3";
package com.example;
message User {
string name = 1;
int32 age = 2;
}
使用 Protobuf 插件生成 Java 类:
protoc --java_out=. user.proto
生产者示例:
import com.example.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class ProtobufProducer {
public static void main(String[] args) {
// 设置生产者属性
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.protobuf.ProtobufSerializer");
props.put("proto.value.type", "com.example.User");
// 创建 Kafka 生产者
KafkaProducer<String, User> producer = new KafkaProducer<>(props);
// 创建 Protobuf 消息
User user = User.newBuilder()
.setName("Charlie")
.setAge(35)
.build();
// 发送消息
String topic = "protobuf-topic";
String key = "protobuf-key";
ProducerRecord<String, User> record = new ProducerRecord<>(topic, key, user);
producer.send(record);
// 关闭生产者
producer.close();
}
}
消费者示例:
import com.example.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class ProtobufConsumer {
public static void main(String[] args) {
// 设置消费者属性
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.protobuf.ProtobufDeserializer");
props.put("proto.value.type", "com.example.User");
// 创建 Kafka 消费者
KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("protobuf-topic"));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.println("Received user: name = " + user.getName() + ", age = " + user.getAge());
}
}
}
}
在这个 Protobuf 的示例中,我们使用了 Kafka 自带的 Protobuf 序列化器和反序列化器,并指定了 Protobuf 消息的类型。
序列化与反序列化中的性能与优化
在 Kafka 消息处理中,序列化和反序列化的性能对整个系统的吞吐量和延迟有重要影响。
- 选择合适的序列化格式:不同的序列化格式在性能、空间占用和功能上各有优劣。例如,Avro 适合需要模式演化和对动态语言友好的场景,但它的序列化开销相对较大;Protobuf 则以高效紧凑著称,适合对性能要求极高的场景。根据实际业务需求选择合适的序列化格式可以显著提升性能。
- 批量处理:生产者端可以通过设置
batch.size
等参数来批量发送消息,减少网络请求次数。消费者端也可以通过设置max.poll.records
来批量拉取消息,一次性处理多个消息,减少反序列化的次数。 - 缓存与复用:在自定义序列化器和反序列化器中,可以考虑缓存一些常用的对象或配置,避免重复创建和初始化。例如,在序列化器中缓存
ByteBuffer
对象,在反序列化器中缓存模式对象等。 - 优化网络传输:合理设置 Kafka 客户端的网络参数,如
linger.ms
等,控制消息的发送时机,减少网络拥塞。同时,确保网络带宽足够,避免因网络瓶颈导致序列化和反序列化后的消息传输延迟。
序列化与反序列化中的错误处理
在序列化和反序列化过程中,可能会出现各种错误,需要进行适当的处理。
- 序列化错误:例如,当对象无法正确序列化为字节数组时,如数据类型不匹配、对象状态异常等,生产者应该捕获异常并进行适当处理。可以选择重试发送消息、记录错误日志并跳过当前消息等策略。
- 反序列化错误:消费者在反序列化消息时,如果字节数组格式不正确、版本不兼容等原因导致反序列化失败,同样需要处理。可以选择跳过当前消息、将错误消息发送到专门的死信队列(DLQ),或者尝试使用备用的反序列化逻辑。
序列化与反序列化的版本兼容性
在 Kafka 应用的长期运行过程中,数据结构可能会发生变化,这就涉及到序列化与反序列化的版本兼容性问题。
- 模式演化:对于使用 Avro、Protobuf 等带模式的序列化框架,模式演化是解决版本兼容性的关键。例如,Avro 支持新增字段、删除字段、修改字段类型等操作,但需要遵循一定的规则。在生产者端更新模式后,消费者端也需要相应地更新模式以确保能够正确反序列化。
- 版本标识:可以在消息中添加版本标识字段,生产者在序列化时写入当前数据结构的版本号,消费者在反序列化时根据版本号选择合适的反序列化逻辑。这样即使数据结构发生较大变化,也能通过版本标识进行兼容处理。
通过合理地处理序列化与反序列化的版本兼容性,可以保证 Kafka 系统在数据结构变化时的稳定性和可靠性。