MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

基于 Kafka Streams 开发实时流处理应用

2021-10-062.5k 阅读

Kafka Streams 简介

Kafka Streams 是一个轻量级的流处理库,构建在 Apache Kafka 之上。它提供了一种简单且可扩展的方式来处理实时流数据,允许开发人员使用熟悉的 Java 或 Scala 语言编写流处理应用程序。

与其他流处理框架(如 Apache Flink、Spark Streaming)不同,Kafka Streams 紧密集成 Kafka 生态系统。它利用 Kafka 的分区、副本和持久化特性来实现容错、可扩展的流处理。这意味着 Kafka Streams 应用程序可以直接使用 Kafka 的主题(Topic)作为输入和输出源,而无需额外的复杂数据传输机制。

Kafka Streams 基于流处理的一些核心概念,如记录(Record)、流(Stream)和表(Table)。记录是流中的最小数据单元,通常由键值对组成。流是一个无界的、按时间顺序排列的记录序列,而表则是一个随时间变化的键值对集合,反映了最新的状态。

环境搭建

  1. 安装 Kafka:首先需要下载并安装 Apache Kafka。可以从 Kafka 官方网站(https://kafka.apache.org/downloads)下载最新版本。解压下载的文件后,按照官方文档中的说明启动 Kafka 服务器和 Zookeeper(Kafka 依赖 Zookeeper 进行集群管理和元数据存储)。
  2. 引入 Kafka Streams 依赖:如果使用 Maven 管理项目依赖,可以在 pom.xml 文件中添加以下依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.0.0</version>
</dependency>

如果使用 Gradle,则在 build.gradle 文件中添加:

implementation 'org.apache.kafka:kafka-streams:3.0.0'

基本概念与 API

  1. KafkaStreams 类:这是 Kafka Streams 应用程序的入口点。通过 KafkaStreams 类,我们可以构建拓扑(Topology)并启动流处理应用。以下是一个简单的创建 KafkaStreams 实例的示例:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

public class BasicStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = new Topology();
        // 这里可以添加拓扑的具体定义,暂时为空

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        // 添加关闭钩子,以便在接收到终止信号时正确关闭流处理应用
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  1. 拓扑(Topology):拓扑定义了流处理应用的处理逻辑。它由一系列的处理器(Processor)和源(Source)、汇(Sink)节点组成。源节点从 Kafka 主题读取数据,汇节点将处理后的数据写回 Kafka 主题。处理器则对数据进行转换、聚合等操作。例如,我们可以创建一个简单的拓扑,将一个主题的消息直接转发到另一个主题:
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStreamBuilder;

public class SimpleTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream("input-topic")
               .to("output-topic");
        return builder.build();
    }
}

然后在 main 方法中使用这个拓扑:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

public class SimpleStreamsApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = SimpleTopology.buildTopology();

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  1. KStream 和 KTableKStream 代表一个无界的流数据,而 KTable 代表一个随时间变化的键值对集合,通常用于表示状态。KStream 支持各种流处理操作,如过滤(filter)、映射(map)、分组(groupBy)等。例如,过滤出特定条件的记录:
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

public class FilteringTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream("input-topic");
        stream.filter((key, value) -> value.length() > 10)
              .to("filtered-topic");
        return builder.build();
    }
}

KTable 则常用于聚合操作。例如,统计每个键出现的次数:

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class CountingTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream("input-topic");
        KTable<String, Long> countTable = stream.groupByKey()
                                               .count("count-store");
        countTable.toStream()
                  .to("count-output-topic");
        return builder.build();
    }
}

这里 count-store 是一个用于存储中间聚合结果的状态存储(State Store)。

实际应用场景与代码示例

  1. 实时数据清洗:假设我们有一个包含用户行为数据的主题,数据格式为 JSON 字符串。其中有些记录可能格式不正确,我们需要过滤掉这些无效记录。 首先,定义一个简单的 JSON 解析工具类:
import com.google.gson.JsonSyntaxException;
import com.google.gson.JsonObject;
import com.google.gson.Gson;

public class JsonUtils {
    private static final Gson gson = new Gson();

    public static boolean isValidJson(String json) {
        try {
            gson.fromJson(json, JsonObject.class);
            return true;
        } catch (JsonSyntaxException e) {
            return false;
        }
    }
}

然后构建拓扑进行数据清洗:

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

public class DataCleaningTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream("user - behavior - topic");
        stream.filter((key, value) -> JsonUtils.isValidJson(value))
              .to("cleaned - user - behavior - topic");
        return builder.build();
    }
}

main 方法中启动流处理应用:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

public class DataCleaningApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "data - cleaning - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = DataCleaningTopology.buildTopology();

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  1. 实时聚合分析:假设我们有一个电子商务网站的订单主题,记录了每笔订单的金额和商品类别。我们需要实时统计每个商品类别在过去一段时间内的总销售额。 定义订单数据的 Java 类:
import com.google.gson.Gson;

public class Order {
    private String category;
    private double amount;

    public String getCategory() {
        return category;
    }

    public double getAmount() {
        return amount;
    }

    @Override
    public String toString() {
        Gson gson = new Gson();
        return gson.toJson(this);
    }
}

构建拓扑进行聚合分析:

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

public class AggregationTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream("orders - topic");
        KTable<String, Double> categoryTotalAmountTable = stream
               .map((key, value) -> {
                    Order order = new Gson().fromJson(value, Order.class);
                    return new KeyValue<>(order.getCategory(), order.getAmount());
                })
               .groupByKey()
               .aggregate(
                        () -> 0.0,
                        (category, amount, total) -> total + amount,
                        "category - total - amount - store"
                );
        categoryTotalAmountTable.toStream()
                               .to("category - total - amount - topic");
        return builder.build();
    }
}

main 方法中启动流处理应用:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

public class AggregationApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "aggregation - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = AggregationTopology.buildTopology();

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
  1. 窗口化处理:在实时聚合分析的基础上,我们可能需要按照时间窗口进行统计。例如,每 5 分钟统计一次每个商品类别在该时间段内的总销售额。 构建拓扑进行窗口化聚合分析:
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

public class WindowedAggregationTopology {
    public static Topology buildTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> stream = builder.stream("orders - topic");
        KTable<Windowed<String>, Double> windowedCategoryTotalAmountTable = stream
               .map((key, value) -> {
                    Order order = new Gson().fromJson(value, Order.class);
                    return new KeyValue<>(order.getCategory(), order.getAmount());
                })
               .groupByKey()
               .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
               .aggregate(
                        () -> 0.0,
                        (category, amount, total) -> total + amount,
                        "windowed - category - total - amount - store"
                );
        windowedCategoryTotalAmountTable.toStream()
                                       .to("windowed - category - total - amount - topic");
        return builder.build();
    }
}

main 方法中启动流处理应用:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;

public class WindowedAggregationApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "windowed - aggregation - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = WindowedAggregationTopology.buildTopology();

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

状态管理与容错

  1. 状态存储(State Store):在 Kafka Streams 中,状态存储用于保存流处理过程中的中间结果,如聚合状态、窗口化聚合结果等。状态存储分为本地存储和分布式存储。本地存储使用 RocksDB,它是一个高性能的嵌入式键值存储。分布式存储则通过 Kafka 主题来实现,适用于需要跨节点共享状态的场景。 例如,在前面的聚合分析示例中,category - total - amount - store 就是一个状态存储,用于保存每个商品类别对应的总销售额。
  2. 容错机制:Kafka Streams 利用 Kafka 的副本机制来实现容错。当某个流处理实例发生故障时,Kafka 可以自动将其负责的分区重新分配给其他实例。同时,状态存储也会进行备份,确保故障恢复后能够恢复到故障前的状态。例如,如果一个使用状态存储进行聚合的实例崩溃,新接管该分区的实例可以从备份的状态存储中恢复聚合状态,继续进行处理。

性能优化

  1. 并行处理:Kafka Streams 支持并行处理,通过合理设置分区数,可以充分利用多台机器的资源。在创建 Kafka 主题时,可以根据预期的负载和处理能力设置合适的分区数。例如,如果预计有大量的订单数据需要处理,可以将 orders - topic 的分区数设置为较高的值,如 10 或 20。
  2. 优化状态存储:合理配置状态存储的参数可以提高性能。例如,可以调整 RocksDB 的块缓存大小、写缓冲区大小等参数。可以通过 StreamsConfig 中的相关配置项来设置这些参数:
props.put(StreamsConfig.STATE_DIR_CONFIG, "/path/to/state/store");
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);

这里 STATE_DIR_CONFIG 设置了状态存储的目录,COMMIT_INTERVAL_MS_CONFIG 设置了状态存储的提交间隔。 3. 减少数据传输:尽量在本地处理数据,减少跨网络的数据传输。例如,在进行聚合操作时,尽量在同一台机器上完成相关数据的聚合,避免不必要的网络开销。

部署与监控

  1. 部署:Kafka Streams 应用可以像普通的 Java 应用一样进行部署。可以将其打包成 JAR 文件,然后在服务器上运行。如果是在集群环境中,可以使用容器化技术(如 Docker)来简化部署过程。可以创建一个 Docker 镜像,将 Kafka Streams 应用和相关的依赖打包进去,然后在 Kubernetes 等容器编排平台上进行部署。
  2. 监控:Kafka Streams 提供了一些内置的监控指标,可以通过 JMX(Java Management Extensions)进行监控。例如,可以监控流处理应用的处理延迟、吞吐量、状态存储的大小等指标。可以使用工具如 JConsole、VisualVM 来连接到运行中的 Kafka Streams 应用,查看这些指标。此外,也可以将这些指标发送到监控系统(如 Prometheus、Grafana)进行可视化和长期存储。

在实际生产环境中,还需要考虑安全性、资源管理等方面的问题。例如,通过 SSL/TLS 对 Kafka 通信进行加密,通过 SASL 进行身份验证;合理分配服务器资源,避免资源竞争等。

通过以上内容,我们对基于 Kafka Streams 开发实时流处理应用有了较为全面的了解,从基本概念、环境搭建、实际应用到性能优化、部署监控等方面都进行了详细的介绍和代码示例展示,希望能帮助开发者更好地利用 Kafka Streams 构建高效、可靠的实时流处理应用。