Flink Core
Flink Core 提供了 Flink 的基础运行时和核心 API。
概述
Flink Core 是 Flink 的核心模块,包括:
- 执行引擎: 分布式流处理引擎
- 状态管理: 高效的状态管理机制
- 容错机制: 精确一次语义保证
编程模型
环境配置
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
// 启用检查点
env.enableCheckpointing(1000);数据源
java
// 从 Socket 读取
DataStream<String> stream = env.socketTextStream("localhost", 9999);
// 从文件读取
DataStream<String> stream = env.readTextFile("data.txt");
// 从 Kafka 读取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));转换操作
java
// map
DataStream<Integer> numbers = stream.map(s -> Integer.parseInt(s));
// filter
DataStream<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
// flatMap
DataStream<String> words = stream.flatMap((String line, Collector<String> out) -> {
for (String word : line.split(" ")) {
out.collect(word);
}
});输出操作
java
// 打印到控制台
stream.print();
// 写入文件
stream.writeAsText("output");
// 写入 Kafka
stream.addSink(new FlinkKafkaProducer<>("topic", new SimpleStringSchema(), properties));状态管理
键控状态
java
// 定义状态描述符
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("count", Integer.class);
// 获取状态
ValueState<Integer> count = getRuntimeContext().getState(descriptor);
// 更新状态
count.update(count.value() + 1);列表状态
java
ListStateDescriptor<String> descriptor = new ListStateDescriptor<>("history", String.class);
ListState<String> history = getRuntimeContext().getListState(descriptor);
// 添加元素
history.add(element);
// 遍历状态
for (String item : history.get()) {
// 处理
}容错机制
检查点
java
// 启用检查点
env.enableCheckpointing(1000);
// 设置检查点模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置检查点存储
env.getCheckpointConfig().setCheckpointStorage("hdfs://path");执行模式
流模式
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();批模式
java
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();自动模式
java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);