Skip to content

Spark 技术

Apache Spark 是一个用于大规模数据处理的统一分析引擎,提供高效的批处理和流处理能力。

Spark Core

概述

Spark Core 是 Spark 的核心组件,提供分布式任务调度和基本的数据抽象。

RDD 操作

python
from pyspark import SparkContext

sc = SparkContext("local", "SparkCoreExample")

# 创建 RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

# 转换操作
squared_rdd = rdd.map(lambda x: x * x)
filtered_rdd = squared_rdd.filter(lambda x: x > 10)

# 行动操作
result = filtered_rdd.collect()
print(result)  # [16, 25]

sc.stop()

持久化

python
# 缓存到内存
rdd.cache()

# 缓存到磁盘
rdd.persist(pyspark.StorageLevel.DISK_ONLY)

Spark SQL

概述

Spark SQL 提供了用于处理结构化数据的高级 API。

DataFrame 操作

python
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkSQLExample") \
    .getOrCreate()

# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# 显示数据
df.show()

# 数据查询
result = df.filter(df["age"] > 18).select("name", "age")
result.show()

# SQL 查询
df.createOrReplaceTempView("users")
sql_result = spark.sql("SELECT name, age FROM users WHERE age > 18")
sql_result.show()

spark.stop()

Spark Streaming

概述

Spark Streaming 提供实时流数据处理能力。

DStream 操作

python
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)  # 1秒批处理间隔

# 创建 DStream
lines = ssc.socketTextStream("localhost", 9999)

# 处理流数据
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# 输出结果
word_counts.pprint()

# 启动流处理
ssc.start()
ssc.awaitTermination()

Structured Streaming

python
from pyspark.sql.functions import split, count

# 读取流数据
stream_df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# 处理数据
words = stream_df.select(split(stream_df.value, " ").alias("words"))
word_counts = words.select(explode("words").alias("word")).groupBy("word").count()

# 输出到控制台
query = word_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()