MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

利用 Kafka Streams 进行实时计算的技巧

2023-12-015.6k 阅读

Kafka Streams 简介

Kafka Streams 是 Apache Kafka 提供的一个轻量级流处理库,用于在 Kafka 之上构建实时流处理应用程序。它允许开发人员使用简单的 Java 或 Scala 代码对 Kafka 主题中的数据流进行处理、转换和聚合等操作。Kafka Streams 基于 Kafka 的分区和副本机制,提供了可扩展、容错的流处理能力。

Kafka Streams 的核心概念

  1. 流(Stream)
    • 流是一个持续的、无界的数据记录序列。在 Kafka Streams 中,流的数据来源通常是 Kafka 主题。例如,一个记录用户行为的主题,每一条用户行为记录就是流中的一个数据单元。流中的数据是顺序到达的,并且会随着时间不断增长。
  2. 表(Table)
    • 表是一个随时间变化的键值对集合。它与流不同,表可以被视为一种特殊的流,其数据是基于键进行更新的。例如,一个存储用户最新信息的表,当有新的用户信息更新时,对应的键值对会被修改。表在 Kafka Streams 中通常用于存储和查询状态数据。
  3. 拓扑(Topology)
    • 拓扑定义了流处理应用程序的逻辑结构。它由一系列的处理器节点组成,这些节点通过流或表相互连接。例如,一个简单的拓扑可能包括从一个 Kafka 主题读取数据的源节点,对数据进行转换的处理节点,以及将处理后的数据写入另一个 Kafka 主题的 sink 节点。拓扑描述了数据如何在应用程序中流动和转换。

Kafka Streams 的实时计算基础

流处理模式

  1. 过滤(Filtering)
    • 过滤是根据特定条件从流中选择或排除数据记录的操作。例如,假设我们有一个记录用户登录事件的流,其中包含用户名、登录时间等信息。我们可能只对特定用户的登录事件感兴趣,比如只关注名为“admin”的用户登录事件。在 Kafka Streams 中,可以通过以下代码实现过滤:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Properties;

public class FilterExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "filter - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - login - topic");

        KStream<String, String> filteredStream = stream.filter((key, value) -> {
            // 假设 value 格式为 "username:loginTime"
            String[] parts = value.split(":");
            return "admin".equals(parts[0]);
        });

        filteredStream.to("admin - login - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 在上述代码中,首先从“user - login - topic”主题读取流数据,然后使用`filter`方法对数据进行过滤,只保留用户名是“admin”的数据记录,最后将过滤后的数据写入“admin - login - topic”主题。

2. 映射(Mapping): - 映射是对流中的每个数据记录进行转换的操作。例如,我们有一个记录用户年龄的流,现在需要将年龄加倍。在 Kafka Streams 中,可以这样实现:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Properties;

public class MapExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "map - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - age - topic");

        KStream<String, String> mappedStream = stream.mapValues((ValueMapper<String, String>) value -> {
            int age = Integer.parseInt(value);
            return String.valueOf(age * 2);
        });

        mappedStream.to("doubled - age - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 代码从“user - age - topic”主题读取流数据,使用`mapValues`方法对每个值(用户年龄)进行转换,将年龄加倍后写入“doubled - age - topic”主题。

窗口化操作

  1. 时间窗口(Time Windows)
    • 时间窗口是按时间划分的一段数据范围。在实时计算中,常常需要对一段时间内的数据进行聚合操作。例如,我们要统计每 5 分钟内用户的登录次数。Kafka Streams 提供了方便的时间窗口支持:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class TimeWindowExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "time - window - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - login - topic");

        KTable<Windowed<String>, Long> windowedTable = stream
               .groupByKey()
               .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
               .count();

        windowedTable.toStream()
               .to("login - count - per - 5min - topic", Produced.with(
                        WindowedSerdes.timeWindowedSerdeFrom(String.class),
                        Serdes.Long()
                ));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 上述代码从“user - login - topic”主题读取流数据,先按用户(键)进行分组,然后使用 5 分钟的时间窗口,对每个窗口内的登录事件进行计数,最后将结果写入“login - count - per - 5min - topic”主题。

2. 会话窗口(Session Windows): - 会话窗口是基于数据记录之间的时间间隔来定义的。如果数据记录之间的时间间隔超过一定阈值,就会开启新的会话窗口。例如,我们可以用会话窗口来统计用户在一次“会话”内的操作次数。假设用户操作间隔超过 10 分钟视为新的会话:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class SessionWindowExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "session - window - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - action - topic");

        KTable<Windowed<String>, Long> sessionWindowedTable = stream
               .groupByKey()
               .windowedBy(SessionWindows.with(Duration.ofMinutes(10)))
               .count();

        sessionWindowedTable.toStream()
               .to("action - count - per - session - topic", Produced.with(
                        WindowedSerdes.timeWindowedSerdeFrom(String.class),
                        Serdes.Long()
                ));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 这里从“user - action - topic”主题读取流数据,按用户(键)分组,使用 10 分钟间隔的会话窗口,统计每个会话窗口内的用户操作次数,并将结果写入“action - count - per - session - topic”主题。

状态管理与容错

本地状态存储

  1. 概念
    • Kafka Streams 使用本地状态存储来保存应用程序的中间状态。例如,在聚合操作中,需要保存部分聚合结果。本地状态存储是基于 RocksDB 实现的,它是一个嵌入式的、持久化的键值存储。当 Kafka Streams 应用程序处理流数据时,会将相关的状态数据存储在本地的 RocksDB 实例中。这使得应用程序可以快速访问和更新状态,而无需频繁地与外部存储进行交互。
  2. 应用场景
    • 以计算用户的累计登录次数为例,每次用户登录事件到达时,应用程序需要在本地状态存储中查找该用户当前的登录次数,然后将其加 1 并更新回状态存储。这样,即使在处理大量并发登录事件时,也能高效地管理每个用户的登录次数状态。
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class LocalStateStoreExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "local - state - store - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - login - topic");

        KTable<String, Long> loginCountTable = stream
               .groupByKey()
               .count(Materialized.as("login - count - store"));

        loginCountTable.toStream()
               .to("user - login - count - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 在上述代码中,`count(Materialized.as("login - count - store"))`指定了使用名为“login - count - store”的本地状态存储来保存用户登录次数的中间结果。

容错机制

  1. 副本与故障恢复
    • Kafka Streams 通过 Kafka 的分区和副本机制来实现容错。每个 Kafka Streams 应用程序实例处理一个或多个 Kafka 分区的数据。当某个实例发生故障时,Kafka 会重新分配这些分区给其他可用的实例。同时,本地状态存储也会进行复制,确保故障恢复后,新的实例可以从之前的状态继续处理。
    • 例如,假设一个 Kafka Streams 应用程序有 3 个实例,分别处理 3 个 Kafka 分区的数据。如果其中一个实例崩溃,Kafka 会将其负责的分区重新分配给另外两个实例。由于本地状态存储也有副本,新接手分区的实例可以从副本中恢复状态,继续处理流数据,保证计算的连续性和准确性。
  2. Exactly - Once 语义
    • Kafka Streams 支持 Exactly - Once 语义,确保每条数据在流处理过程中仅被处理一次,即使在发生故障的情况下也不会重复处理。这是通过将流处理的状态与 Kafka 主题的偏移量进行关联实现的。当应用程序处理完一条数据并更新状态后,会将对应的 Kafka 主题偏移量提交。如果发生故障并恢复,应用程序会从上次提交的偏移量继续处理,避免重复处理已处理过的数据。

复杂实时计算场景与解决方案

多流联合处理

  1. 流连接(Stream Join)
    • 流连接是将两个或多个流中的数据根据某个共同的键进行关联的操作。例如,我们有一个记录用户基本信息的流,另一个记录用户购买行为的流,现在要将用户购买行为与对应的用户基本信息关联起来。在 Kafka Streams 中,可以使用以下代码实现流连接:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class StreamJoinExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream - join - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> userInfoStream = builder.stream("user - info - topic");
        KStream<String, String> purchaseStream = builder.stream("purchase - topic");

        KStream<String, String> joinedStream = userInfoStream.join(
                purchaseStream,
                (userInfo, purchase) -> userInfo + ":" + purchase,
                JoinWindows.of(Duration.ofMinutes(5)),
                StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
        );

        joinedStream.to("joined - user - purchase - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 上述代码从“user - info - topic”和“purchase - topic”主题分别读取用户信息流和购买行为流,使用`join`方法进行连接。连接条件是两个流具有相同的用户键,连接窗口设置为 5 分钟,连接结果将用户信息和购买行为拼接后写入“joined - user - purchase - topic”主题。

2. 流合并(Stream Merge): - 流合并是将多个流的数据合并为一个流。例如,我们有两个不同来源但格式相同的用户行为流,现在要将它们合并为一个统一的用户行为流。在 Kafka Streams 中,可以这样实现:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamMergeExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream - merge - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream1 = builder.stream("user - action - source1 - topic");
        KStream<String, String> stream2 = builder.stream("user - action - source2 - topic");

        KStream<String, String> mergedStream = stream1.merge(stream2);

        mergedStream.to("unified - user - action - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 这里从“user - action - source1 - topic”和“user - action - source2 - topic”主题读取两个用户行为流,使用`merge`方法将它们合并,然后将合并后的流写入“unified - user - action - topic”主题。

复杂事件处理(CEP)

  1. 模式匹配
    • 在复杂事件处理中,模式匹配是识别流数据中特定模式的操作。例如,我们要识别用户在短时间内连续进行登录和支付操作的模式。Kafka Streams 可以通过自定义处理器来实现这种模式匹配:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.*;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class CEPExample {
    public static class PatternMatcherProcessor implements Processor<String, String> {
        private ProcessorContext context;
        private Map<String, String> userActions = new HashMap<>();

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
        }

        @Override
        public void process(String key, String value) {
            // 假设 value 格式为 "action:timestamp"
            String[] parts = value.split(":");
            String action = parts[0];
            long timestamp = Long.parseLong(parts[1]);

            if ("login".equals(action)) {
                userActions.put(key, value);
            } else if ("payment".equals(action)) {
                String loginAction = userActions.get(key);
                if (loginAction!= null) {
                    long loginTimestamp = Long.parseLong(loginAction.split(":")[1]);
                    if (timestamp - loginTimestamp < 60 * 1000) { // 1 分钟内
                        context.forward(key, "Login and Payment in 1 minute");
                    }
                }
            }
        }

        @Override
        public void close() {
            // 清理资源
        }
    }

    public static class PatternMatcherSupplier implements ProcessorSupplier<String, String> {
        @Override
        public Processor<String, String> get() {
            return new PatternMatcherProcessor();
        }
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cep - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - action - topic");

        stream.transform(new PatternMatcherSupplier())
               .to("pattern - match - result - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 上述代码通过自定义`PatternMatcherProcessor`处理器,在处理用户行为流数据时,检查是否存在登录后 1 分钟内进行支付的模式,匹配到模式的结果会被发送到“pattern - match - result - topic”主题。

2. 事件关联: - 事件关联是将不同类型的事件关联起来以获取更完整的信息。例如,在一个电商系统中,将订单创建事件、订单支付事件和订单发货事件关联起来,以跟踪订单的完整生命周期。这可以通过在 Kafka Streams 中使用状态存储和流处理逻辑来实现。首先,每个事件流都按订单号进行分组,然后使用状态存储来保存每个订单的当前状态。当新的事件到达时,根据订单号从状态存储中获取订单的当前状态,并更新状态。例如:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class EventCorrelationExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "event - correlation - example - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> orderCreateStream = builder.stream("order - create - topic");
        KStream<String, String> orderPaymentStream = builder.stream("order - payment - topic");
        KStream<String, String> orderShipmentStream = builder.stream("order - shipment - topic");

        KTable<String, String> orderStateTable = orderCreateStream
               .groupByKey()
               .mapValues(value -> "CREATED")
               .merge(
                        orderPaymentStream.groupByKey().mapValues(value -> "PAID"),
                        (prevState, newState) -> "PAID"
                )
               .merge(
                        orderShipmentStream.groupByKey().mapValues(value -> "SHIPPED"),
                        (prevState, newState) -> "SHIPPED"
                )
               .toTable(Materialized.as("order - state - store"));

        orderStateTable.toStream()
               .to("order - state - result - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 代码从“order - create - topic”、“order - payment - topic”和“order - shipment - topic”主题读取不同类型的订单事件流,通过`merge`方法依次更新订单状态,并使用名为“order - state - store”的本地状态存储保存订单状态,最终将订单状态结果写入“order - state - result - topic”主题。

Kafka Streams 的性能优化

资源分配与调优

  1. CPU 与内存优化
    • Kafka Streams 应用程序的性能与 CPU 和内存的使用密切相关。在 CPU 方面,合理设置 Kafka Streams 实例的并行度可以充分利用多核 CPU 的性能。例如,如果服务器有 8 个 CPU 核心,可以将 Kafka Streams 应用程序的并行度设置为 8,让每个实例处理不同的 Kafka 分区,从而提高整体处理能力。
    • 在内存方面,需要根据应用程序的需求合理分配本地状态存储的内存大小。如果状态数据量较大,需要增加 RocksDB 的内存缓存大小,以减少磁盘 I/O。可以通过rocksdb.block.cache.size等配置参数来调整 RocksDB 的内存使用。例如:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "performance - tuned - app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8); // 设置并行度为 8
props.put("rocksdb.block.cache.size", "256m"); // 设置 RocksDB 缓存大小为 256MB
  1. 网络优化
    • Kafka Streams 应用程序与 Kafka 集群之间的网络通信对性能也有重要影响。可以通过优化网络拓扑,减少网络延迟和带宽消耗。例如,将 Kafka Streams 实例部署在与 Kafka 集群同一数据中心内,或者使用高速网络连接。此外,合理设置 Kafka 生产者和消费者的网络相关配置参数,如batch.sizelinger.ms等,可以提高数据传输效率。batch.size控制生产者一次发送的数据量,linger.ms控制生产者在发送数据前等待的时间,适当增大这两个值可以减少网络请求次数,但可能会增加数据发送的延迟。例如:
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 设置 batch 大小为 16KB
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 设置等待时间为 10 毫秒

代码优化

  1. 避免不必要的转换
    • 在编写 Kafka Streams 代码时,应尽量避免进行不必要的数据转换。例如,如果只是对数据进行过滤操作,就不需要先进行复杂的映射操作再过滤。每一次转换操作都会增加计算开销,所以应直接在原始数据上进行必要的处理。例如,之前的过滤示例,如果原始数据格式已经满足过滤条件判断,就不需要额外的映射转换:
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class AvoidUnnecessaryTransformationExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "avoid - unnecessary - transform - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - login - topic");

        KStream<String, String> filteredStream = stream.filter((key, value) -> {
            // 假设 value 格式直接可用于判断
            return value.startsWith("admin");
        });

        filteredStream.to("admin - login - topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 这里直接在原始流数据上进行过滤,避免了不必要的转换操作,提高了性能。

2. 优化窗口操作: - 在进行窗口化操作时,合理选择窗口大小和窗口类型对性能有重要影响。如果窗口过大,会导致内存占用增加和处理延迟;如果窗口过小,可能无法准确捕捉到需要的数据模式。例如,在统计用户行为频率时,如果行为频率较低,可以适当增大窗口大小。同时,对于一些实时性要求不高但需要更准确统计结果的场景,可以选择会话窗口,因为它可以根据实际数据间隔动态调整窗口,避免了固定时间窗口可能出现的边界问题。例如,在之前的会话窗口示例中,如果发现用户操作间隔普遍较长,可以适当增大会话窗口的间隔时间,如从 10 分钟调整为 30 分钟:

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class OptimizedSessionWindowExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized - session - window - app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream("user - action - topic");

        KTable<Windowed<String>, Long> sessionWindowedTable = stream
               .groupByKey()
               .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
               .count();

        sessionWindowedTable.toStream()
               .to("action - count - per - session - topic", Produced.with(
                        WindowedSerdes.timeWindowedSerdeFrom(String.class),
                        Serdes.Long()
                ));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
- 通过这种方式,可以在满足业务需求的同时,优化窗口操作的性能。