Spark Streaming
Spark Streaming 提供实时流数据处理能力。
概述
Spark Streaming 允许使用批处理的方式处理实时数据流。
DStream 操作
创建 DStream
python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1) # 1秒批处理间隔
# 从 Socket 创建
lines = ssc.socketTextStream("localhost", 9999)
# 从 Kafka 创建
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createStream(ssc, "zk:2181", "consumer-group", {"topic": 1})转换操作
python
# map: 转换每个元素
words = lines.flatMap(lambda line: line.split(" "))
# filter: 过滤元素
filtered = words.filter(lambda word: len(word) > 3)
# countByValue: 统计词频
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)输出操作
python
# 打印到控制台
word_counts.pprint()
# 保存到文件
word_counts.saveAsTextFiles("output/prefix")
# 保存到 HDFS
word_counts.saveAsHadoopFiles("hdfs://path", "prefix")启动流处理
python
ssc.start()
ssc.awaitTermination()Structured Streaming
概述
Structured Streaming 是基于 DataFrame/Dataset API 的流处理接口。
创建流 DataFrame
python
from pyspark.sql.functions import split, count
# 读取流数据
stream_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()处理流数据
python
# 词频统计
words = stream_df.select(split(stream_df.value, " ").alias("words"))
word_counts = words.select(explode("words").alias("word")).groupBy("word").count()输出流数据
python
# 输出到控制台
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 输出到文件
query = word_counts.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "output") \
.option("checkpointLocation", "checkpoint") \
.start()
query.awaitTermination()窗口操作
DStream 窗口
python
# 滑动窗口
windowed_counts = word_counts.reduceByKeyAndWindow(
lambda a, b: a + b, # 窗口内聚合
lambda a, b: a - b, # 窗口外移除
30, # 窗口大小(秒)
10 # 滑动间隔(秒)
)Structured Streaming 窗口
python
from pyspark.sql.functions import window
# 5分钟窗口,1分钟滑动
windowed_df = stream_df.groupBy(
window(stream_df.timestamp, "5 minutes", "1 minute"),
stream_df.user
).count()容错机制
Spark Streaming 使用 checkpoint 来实现容错:
python
ssc.checkpoint("hdfs://path/to/checkpoint")对比
| 特性 | DStream | Structured Streaming |
|---|---|---|
| API 风格 | 基于 RDD | 基于 DataFrame |
| 类型安全 | 否 | 是 |
| 优化 | 手动 | 自动 |
| 易用性 | 较低 | 较高 |