MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

使用 Kafka Connect 开发数据集成管道

2021-06-282.3k 阅读

Kafka Connect 简介

Kafka Connect 是 Apache Kafka 生态系统中的一个重要组件,它提供了一种可扩展且可靠的方式来在 Kafka 与其他系统之间传输数据。其设计目标是简化数据集成流程,使得将数据从各种数据源(如数据库、文件系统等)导入到 Kafka 以及将数据从 Kafka 导出到各种数据接收器(如其他数据库、搜索索引等)变得更加容易。

Kafka Connect 基于插件架构,这意味着用户可以通过开发自定义插件来支持特定的数据源和数据接收器。它提供了一组标准的接口和约定,使得开发人员能够专注于实现数据读取和写入的逻辑,而无需关心 Kafka 连接的底层细节。这种架构不仅提高了开发效率,还使得 Kafka Connect 能够适应各种不同的数据集成场景。

Kafka Connect 架构

  1. Connectors:Connectors 是 Kafka Connect 的核心组件,负责定义数据的来源或去向。分为两种类型:
    • Source Connectors:用于从外部系统读取数据,并将其发送到 Kafka 主题。例如,一个 JDBC Source Connector 可以从关系型数据库中读取数据,并将其发布到 Kafka 主题。
    • Sink Connectors:负责从 Kafka 主题读取数据,并将其写入到外部系统。比如,一个 Elasticsearch Sink Connector 可以将 Kafka 主题中的数据写入到 Elasticsearch 索引中。
  2. Tasks:每个 Connector 可以配置多个 Tasks。Tasks 是实际执行数据传输的工作单元。例如,一个 Source Connector 可能会启动多个 Tasks,每个 Task 负责从数据源的一部分读取数据并发送到 Kafka。这样可以实现并行的数据处理,提高数据传输的效率。
  3. Cluster:Kafka Connect 可以以独立模式或分布式模式运行。在分布式模式下,多个 Kafka Connect 实例组成一个集群。集群中的每个实例都可以运行 Connectors 和 Tasks。这种分布式架构提供了高可用性和可扩展性,当某个实例出现故障时,其他实例可以接管其工作。
  4. Config Server:在分布式模式下,Kafka Connect 使用一个配置服务器来存储和管理 Connectors 的配置。这样,所有的 Kafka Connect 实例都可以从配置服务器获取最新的配置信息,确保整个集群的一致性。

安装与配置 Kafka Connect

  1. 下载 Kafka:首先,需要从 Apache Kafka 官方网站下载 Kafka 安装包。假设下载的是 kafka_2.13 - 2.8.0.tgz
    wget https://downloads.apache.org/kafka/2.8.0/kafka_2.13 - 2.8.0.tgz
    tar -xzf kafka_2.13 - 2.8.0.tgz
    cd kafka_2.13 - 2.8.0
    
  2. 配置 Kafka Connect
    • 独立模式:编辑 config/connect - standalone.properties 文件。
      bootstrap.servers=localhost:9092
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      
    • 分布式模式:编辑 config/connect - distributed.properties 文件。
      bootstrap.servers=localhost:9092
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      group.id=connect - cluster
      config.storage.topic=connect - configs
      offset.storage.topic=connect - offsets
      status.storage.topic=connect - status
      config.storage.replication.factor=1
      offset.storage.replication.factor=1
      status.storage.replication.factor=1
      
  3. 启动 Kafka Connect
    • 独立模式
      bin/connect - standalone.sh config/connect - standalone.properties config/source - connector.properties config/sink - connector.properties
      
    • 分布式模式
      bin/connect - distributed.sh config/connect - distributed.properties
      

开发自定义 Source Connector

  1. 创建项目:使用 Maven 创建一个新的 Java 项目。在 pom.xml 文件中添加以下依赖:
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka - clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect - api</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>
    
  2. 实现 Connector 类
    import org.apache.kafka.connect.connector.Task;
    import org.apache.kafka.connect.source.SourceConnector;
    
    import java.util.Map;
    
    public class CustomSourceConnector extends SourceConnector {
        @Override
        public String version() {
            return "1.0.0";
        }
    
        @Override
        public Class<? extends Task> taskClass() {
            return CustomSourceTask.class;
        }
    
        @Override
        public List<Map<String, String>> taskConfigs(int maxTasks) {
            List<Map<String, String>> taskConfigs = new ArrayList<>();
            Map<String, String> config = new HashMap<>();
            // 可以在这里设置任务的配置
            taskConfigs.add(config);
            return taskConfigs;
        }
    
        @Override
        public void start(Map<String, String> props) {
            // 初始化连接器,例如建立数据库连接
        }
    
        @Override
        public void stop() {
            // 清理资源,例如关闭数据库连接
        }
    }
    
  3. 实现 Task 类
    import org.apache.kafka.connect.source.SourceRecord;
    import org.apache.kafka.connect.source.SourceTask;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class CustomSourceTask extends SourceTask {
        @Override
        public String version() {
            return "1.0.0";
        }
    
        @Override
        public void start(Map<String, String> props) {
            // 初始化任务,例如获取数据库连接
        }
    
        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            List<SourceRecord> records = new ArrayList<>();
            // 从数据源读取数据
            // 假设从文件读取数据
            try (BufferedReader br = new BufferedReader(new FileReader("data.txt"))) {
                String line;
                while ((line = br.readLine()) != null) {
                    Map<String, Object> key = null;
                    Map<String, Object> value = new HashMap<>();
                    value.put("data", line);
                    SourceRecord record = new SourceRecord(
                            Collections.singletonMap("connector", "custom - source"),
                            Collections.singletonMap("partition", 0),
                            "custom - topic",
                            Schema.STRING_SCHEMA, key,
                            Schema.STRING_SCHEMA, value
                    );
                    records.add(record);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            return records;
        }
    
        @Override
        public void stop() {
            // 清理任务资源,例如关闭数据库连接
        }
    }
    
  4. 打包与部署:使用 mvn clean package 命令打包项目。将生成的 JAR 文件复制到 Kafka Connect 的 libs 目录下。然后创建一个配置文件 custom - source - connector.properties
    name=custom - source - connector
    connector.class=com.example.CustomSourceConnector
    tasks.max=1
    
    最后,在独立模式下启动 Kafka Connect 时,将此配置文件作为参数传入:
    bin/connect - standalone.sh config/connect - standalone.properties custom - source - connector.properties
    

开发自定义 Sink Connector

  1. 创建项目与依赖:同样使用 Maven 创建项目,并在 pom.xml 中添加依赖:
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka - clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect - api</artifactId>
            <version>2.8.0</version>
        </dependency>
    </dependencies>
    
  2. 实现 Connector 类
    import org.apache.kafka.connect.connector.Task;
    import org.apache.kafka.connect.sink.SinkConnector;
    
    import java.util.Map;
    
    public class CustomSinkConnector extends SinkConnector {
        @Override
        public String version() {
            return "1.0.0";
        }
    
        @Override
        public Class<? extends Task> taskClass() {
            return CustomSinkTask.class;
        }
    
        @Override
        public List<Map<String, String>> taskConfigs(int maxTasks) {
            List<Map<String, String>> taskConfigs = new ArrayList<>();
            Map<String, String> config = new HashMap<>();
            // 设置任务配置
            taskConfigs.add(config);
            return taskConfigs;
        }
    
        @Override
        public void start(Map<String, String> props) {
            // 初始化连接器,例如建立数据库连接
        }
    
        @Override
        public void stop() {
            // 清理资源,例如关闭数据库连接
        }
    }
    
  3. 实现 Task 类
    import org.apache.kafka.connect.sink.SinkRecord;
    import org.apache.kafka.connect.sink.SinkTask;
    
    import java.util.Collection;
    import java.util.Map;
    
    public class CustomSinkTask extends SinkTask {
        @Override
        public String version() {
            return "1.0.0";
        }
    
        @Override
        public void start(Map<String, String> props) {
            // 初始化任务,例如获取数据库连接
        }
    
        @Override
        public void put(Collection<SinkRecord> records) {
            // 将数据写入目标系统
            // 假设写入文件
            try (BufferedWriter bw = new BufferedWriter(new FileWriter("output.txt", true))) {
                for (SinkRecord record : records) {
                    bw.write(record.value().toString());
                    bw.newLine();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void stop() {
            // 清理任务资源,例如关闭数据库连接
        }
    }
    
  4. 打包与部署:使用 mvn clean package 命令打包项目。将生成的 JAR 文件复制到 Kafka Connect 的 libs 目录下。创建配置文件 custom - sink - connector.properties
    name=custom - sink - connector
    connector.class=com.example.CustomSinkConnector
    tasks.max=1
    topics=custom - topic
    
    在启动 Kafka Connect 时传入此配置文件。

使用 Kafka Connect 与 JDBC 集成

  1. JDBC Source Connector
    • 下载 JDBC 驱动:假设使用 MySQL 数据库,下载 MySQL JDBC 驱动 mysql - connector - java - 8.0.26.jar,并将其复制到 Kafka Connect 的 libs 目录。
    • 创建配置文件jdbc - source - connector.properties
      name=jdbc - source - connector
      connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
      tasks.max=1
      connection.url=jdbc:mysql://localhost:3306/mydb
      connection.user=root
      connection.password=password
      table.whitelist=mytable
      mode=incrementing
      incrementing.column.name=id
      topic.prefix=jdbc - topic -
      
    • 启动连接器:在 Kafka Connect 中启动此连接器。
  2. JDBC Sink Connector
    • 配置文件jdbc - sink - connector.properties
      name=jdbc - sink - connector
      connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
      tasks.max=1
      connection.url=jdbc:mysql://localhost:3306/mydb
      connection.user=root
      connection.password=password
      auto.create=true
      auto.evolve=true
      topics=jdbc - topic - mytable
      insert.mode=upsert
      pk.fields=id
      
    • 启动连接器:在 Kafka Connect 中启动此连接器,将 Kafka 主题中的数据写入到 MySQL 数据库。

Kafka Connect 中的数据转换

  1. Converter:Kafka Connect 使用 Converter 将数据转换为 Kafka 能够理解的格式。常见的 Converter 有 JsonConverterAvroConverter 等。
    • JsonConverter:将数据转换为 JSON 格式。在配置文件中可以如下配置:
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=true
      value.converter.schemas.enable=true
      
    • AvroConverter:将数据转换为 Avro 格式,Avro 提供了高效的序列化和模式管理。需要添加 Avro 相关依赖,并在配置文件中配置:
      key.converter=io.confluent.connect.avro.AvroConverter
      value.converter=io.confluent.connect.avro.AvroConverter
      key.converter.schema.registry.url=http://localhost:8081
      value.converter.schema.registry.url=http://localhost:8081
      
  2. Transformations:Kafka Connect 支持多种数据转换,如 InsertFieldRegexRouter 等。
    • InsertField:可以在数据记录中插入一个新的字段。例如,在配置文件中添加:
      transforms=InsertField
      transforms.InsertField.type=org.apache.kafka.connect.transforms.InsertField$Value
      transforms.InsertField.static.field=timestamp
      transforms.InsertField.static.value=${date:yyyy - MM - dd HH:mm:ss}
      
    • RegexRouter:根据正则表达式将数据路由到不同的主题。配置如下:
      transforms=RegexRouter
      transforms.RegexRouter.type=org.apache.kafka.connect.transforms.RegexRouter
      transforms.RegexRouter.regex=(.*)_(.*)
      transforms.RegexRouter.replacement=$1 - topic
      

Kafka Connect 的监控与调优

  1. 监控指标
    • Connector 状态:可以通过 Kafka Connect 提供的 REST API 获取 Connector 的状态,如运行状态、错误信息等。例如,发送 GET 请求到 http://localhost:8083/connectors/{connector - name}/status
    • Task 指标:可以监控每个 Task 的数据读取和写入速率、延迟等指标。这些指标可以通过 JMX 或 Kafka Connect 自带的指标系统获取。
  2. 调优参数
    • 任务并行度:通过调整 tasks.max 参数,可以控制每个 Connector 启动的任务数量,从而提高数据处理的并行度。
    • 数据批处理大小:在 Task 中,可以调整每次读取或写入的数据批处理大小。例如,在 JDBC Sink Connector 中,可以通过 batch.size 参数控制每次写入数据库的记录数。
    • 内存管理:合理设置 Kafka Connect 进程的堆内存大小,避免内存溢出问题。可以通过 -Xmx-Xms 参数来设置。

与其他系统集成案例

  1. 与 Elasticsearch 集成
    • Sink Connector:使用 Elasticsearch Sink Connector 将 Kafka 主题中的数据写入到 Elasticsearch 索引。首先下载 Elasticsearch Sink Connector 插件,然后创建配置文件 elasticsearch - sink - connector.properties
      name=elasticsearch - sink - connector
      connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
      tasks.max=1
      topics=my - topic
      connection.url=http://localhost:9200
      type.name=type1
      key.ignore=true
      
    • 启动连接器:在 Kafka Connect 中启动此连接器,将 Kafka 数据导入到 Elasticsearch。
  2. 与 Hadoop 集成
    • Sink Connector:使用 HDFS Sink Connector 将 Kafka 数据写入到 Hadoop 分布式文件系统。创建配置文件 hdfs - sink - connector.properties
      name=hdfs - sink - connector
      connector.class=org.apache.kafka.connect.hdfs.HdfsSinkConnector
      tasks.max=1
      topics=my - topic
      hdfs.url=hdfs://localhost:9000
      flush.size=1000
      
    • 启动连接器:在 Kafka Connect 中启动此连接器,实现 Kafka 与 Hadoop 的数据集成。

通过以上对 Kafka Connect 的深入介绍、开发自定义 Connector 的示例以及与其他系统的集成案例,开发者可以充分利用 Kafka Connect 构建高效、可靠的数据集成管道,满足各种复杂的数据处理需求。在实际应用中,还需要根据具体的业务场景和性能要求,对 Kafka Connect 进行合理的配置和调优。