Spark 技术
Apache Spark 是一个用于大规模数据处理的统一分析引擎,提供高效的批处理和流处理能力。
Spark Core
概述
Spark Core 是 Spark 的核心组件,提供分布式任务调度和基本的数据抽象。
RDD 操作
python
from pyspark import SparkContext
sc = SparkContext("local", "SparkCoreExample")
# 创建 RDD
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
# 转换操作
squared_rdd = rdd.map(lambda x: x * x)
filtered_rdd = squared_rdd.filter(lambda x: x > 10)
# 行动操作
result = filtered_rdd.collect()
print(result) # [16, 25]
sc.stop()持久化
python
# 缓存到内存
rdd.cache()
# 缓存到磁盘
rdd.persist(pyspark.StorageLevel.DISK_ONLY)Spark SQL
概述
Spark SQL 提供了用于处理结构化数据的高级 API。
DataFrame 操作
python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkSQLExample") \
.getOrCreate()
# 读取数据
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# 显示数据
df.show()
# 数据查询
result = df.filter(df["age"] > 18).select("name", "age")
result.show()
# SQL 查询
df.createOrReplaceTempView("users")
sql_result = spark.sql("SELECT name, age FROM users WHERE age > 18")
sql_result.show()
spark.stop()Spark Streaming
概述
Spark Streaming 提供实时流数据处理能力。
DStream 操作
python
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1) # 1秒批处理间隔
# 创建 DStream
lines = ssc.socketTextStream("localhost", 9999)
# 处理流数据
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# 输出结果
word_counts.pprint()
# 启动流处理
ssc.start()
ssc.awaitTermination()Structured Streaming
python
from pyspark.sql.functions import split, count
# 读取流数据
stream_df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 处理数据
words = stream_df.select(split(stream_df.value, " ").alias("words"))
word_counts = words.select(explode("words").alias("word")).groupBy("word").count()
# 输出到控制台
query = word_counts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()