Skip to content

Spark Streaming

Spark Streaming 提供实时流数据处理能力。

概述

Spark Streaming 允许使用批处理的方式处理实时数据流。

DStream 操作

创建 DStream

python
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)  # 1秒批处理间隔

# 从 Socket 创建
lines = ssc.socketTextStream("localhost", 9999)

# 从 Kafka 创建
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createStream(ssc, "zk:2181", "consumer-group", {"topic": 1})

转换操作

python
# map: 转换每个元素
words = lines.flatMap(lambda line: line.split(" "))

# filter: 过滤元素
filtered = words.filter(lambda word: len(word) > 3)

# countByValue: 统计词频
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

输出操作

python
# 打印到控制台
word_counts.pprint()

# 保存到文件
word_counts.saveAsTextFiles("output/prefix")

# 保存到 HDFS
word_counts.saveAsHadoopFiles("hdfs://path", "prefix")

启动流处理

python
ssc.start()
ssc.awaitTermination()

Structured Streaming

概述

Structured Streaming 是基于 DataFrame/Dataset API 的流处理接口。

创建流 DataFrame

python
from pyspark.sql.functions import split, count

# 读取流数据
stream_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

处理流数据

python
# 词频统计
words = stream_df.select(split(stream_df.value, " ").alias("words"))
word_counts = words.select(explode("words").alias("word")).groupBy("word").count()

输出流数据

python
# 输出到控制台
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# 输出到文件
query = word_counts.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "output") \
    .option("checkpointLocation", "checkpoint") \
    .start()

query.awaitTermination()

窗口操作

DStream 窗口

python
# 滑动窗口
windowed_counts = word_counts.reduceByKeyAndWindow(
    lambda a, b: a + b,  # 窗口内聚合
    lambda a, b: a - b,  # 窗口外移除
    30,  # 窗口大小(秒)
    10   # 滑动间隔(秒)
)

Structured Streaming 窗口

python
from pyspark.sql.functions import window

# 5分钟窗口,1分钟滑动
windowed_df = stream_df.groupBy(
    window(stream_df.timestamp, "5 minutes", "1 minute"),
    stream_df.user
).count()

容错机制

Spark Streaming 使用 checkpoint 来实现容错:

python
ssc.checkpoint("hdfs://path/to/checkpoint")

对比

特性DStreamStructured Streaming
API 风格基于 RDD基于 DataFrame
类型安全
优化手动自动
易用性较低较高