Flink 技术
Apache Flink 是一个分布式流处理框架,支持低延迟、高吞吐的实时数据处理。
Flink Core
概述
Flink Core 提供了 Flink 的基础运行时和核心 API。
执行模式
- 流模式: 处理无界数据流
- 批模式: 处理有界数据集
- Hybrid Mode: 混合流批处理
编程模型
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据源
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 数据转换
DataStream<Tuple2<String, Integer>> wordCounts = stream
.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
for (String word : line.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
})
.keyBy(value -> value.f0)
.sum(1);
// 输出结果
wordCounts.print();
env.execute("Flink WordCount");Flink SQL
概述
Flink SQL 提供了声明式的 SQL 接口来处理流数据和批数据。
SQL 查询示例
sql
-- 创建表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 查询最近5分钟的订单总额
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) as window_start,
SUM(amount) as total_amount
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);Python API
python
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# 读取数据
table_env.execute_sql("""
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 执行查询
result = table_env.sql_query("SELECT * FROM orders WHERE amount > 100")
result.execute().print()CEP
概述
CEP(Complex Event Processing)用于检测和处理事件流中的复杂模式。
模式定义
java
// 定义模式:连续两次失败的登录尝试
Pattern<String, String> pattern = Pattern.<String>begin("first")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.contains("login_failed");
}
})
.next("second")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String event) {
return event.contains("login_failed");
}
});
// 应用模式
PatternStream<String> patternStream = CEP.pattern(stream, pattern);
// 处理匹配结果
patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) {
return "Detected suspicious login pattern";
}
}).print();