Issue #1
问题描述
使用有限流补数据,出现数据丢失
原因分析
最后的 Sink 开启了两阶段提交,Source 在进入 Finish 状态之后, 最后没有触发一次 checkpoint,导致最后的数据没有被写入到下游,出现了数据丢失
解决方式
使用 Flink1.14 版本, 设置参数 execution.checkpointing.checkpoints-after-tasks-finish.enable=true; 作业进入 finish 状态之后,会主动触发一次 checkpoint,确保数据被正确写入到下游。 https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
Issue #2
问题描述
Flink Job Application 中同时运行了两个 Job
大概逻辑为:
DataStream<Pojo> sourceDatastream = xxxx;
// 基于 Table API 创建了 Table 对象
Table table = tbEnv.fromDataStram(sourceDataStream);
// 基于 SQL API 创建了 Sink 表
tbEnv.executeSql("create table sinkTable xxx () with ()")
// 调用 execute
table.executeInto("selectTable");
// 调用 env.execute
tbEnv.execute("jobxxxx");
原因 分析
用户混用了的 DataStream API 和 table API, 在调用
table.executeInto("selectTable");
时提交了一个作业,在调 tbEnv.execute("jobxxx")
时又提交了一个作业, 但这个作业的逻辑为 DataStream 到 Table 的转换, 没有 Sink 的逻辑.
修复方式
table.sqlUpdate("insert into selectTable select * from " + table);
说明
以上的 修复方式针对于 Flink1.12 版本.