Flink SQL
Flink SQL 提供了声明式的 SQL 接口来处理流数据和批数据。
概述
Flink SQL 支持使用标准 SQL 查询流数据和批数据。
SQL 查询示例
创建表
sql
-- 创建 Kafka 表
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
);
-- 创建文件表
CREATE TABLE sales (
date DATE,
amount DECIMAL(10, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'data/sales',
'format' = 'csv'
);查询数据
sql
-- 查询最近5分钟的订单总额
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) as window_start,
SUM(amount) as total_amount
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
-- 连接查询
SELECT o.order_id, u.name, o.amount
FROM orders o
JOIN users u ON o.user_id = u.id;插入数据
sql
INSERT INTO sales_summary
SELECT date, SUM(amount)
FROM sales
GROUP BY date;Python API
环境配置
python
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)执行 SQL
python
# 创建表
table_env.execute_sql("""
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# 查询
result = table_env.sql_query("SELECT * FROM orders WHERE amount > 100")
result.execute().print()混合 API
python
from pyflink.table.expressions import col
# 使用 Table API
table = table_env.from_path("orders")
result = table \
.filter(col("amount") > 100) \
.group_by(col("user_id")) \
.select(col("user_id"), col("amount").sum)
# 转换为 SQL 查询
result = table_env.sql_query(f"SELECT * FROM ({result}) WHERE sum_amount > 1000")窗口函数
滚动窗口
sql
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) as window_start,
COUNT(*) as count
FROM events
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR);滑动窗口
sql
SELECT
user_id,
HOP_START(event_time, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) as window_start,
SUM(amount) as total
FROM orders
GROUP BY user_id, HOP(event_time, INTERVAL '15' MINUTE, INTERVAL '1' HOUR);会话窗口
sql
SELECT
user_id,
SESSION_START(event_time, INTERVAL '30' MINUTE) as window_start,
COUNT(*) as count
FROM events
GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE);性能优化
状态 TTL
sql
ALTER TABLE orders SET ('state.ttl' = '1 h');并行度
sql
SET 'parallelism.default' = '4';