2023-Q1-W9 oncall 记录

Issue #1 问题描述 用户在提交 FlinkSQL 时, 始终处于 codegen 阶段,整体耗时甚至超过了 3-4 个小时 问题分析 通过排查执行栈,主要执行栈 codesplit 上, 说明生成的代码太长,需要花非常多的时间用于代码切割上 "main" #1 prio=5 os_prio=0 tid=0x00007f480c028800 nid=0x11316 runnable [0x00007f4813b2e000] java.lang.Thread.State: RUNNABLE at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:414) at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:360) at org.apache.flink.table.codesplit.ReturnValueRewriter.rewrite(ReturnValueRewriter.java:86) at org.apache.flink.table.codesplit.JavaCodeSplitter.splitImpl(JavaCodeSplitter.java:48) at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:35) at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:124) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) at org.apache.flink.table.planner.delegation.StreamPlanner$$Lambda$1420/309438139.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.TraversableLike$$Lambda$338/537524656.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala....

March 10, 2023

2023-Q1-W8 oncall 记录

Issue #1 问题描述 Flink StreamLoad 导入 Doris 报错 401 解决方式 一般来说, 报错 401 就是鉴权的问题, 排查账号密码是否正确即可。 最终排查, 是集群写错了, 因此除了排除账号密码的正确性外, 还需要确保集群没有填错。。

March 1, 2023

2023-Q1-W7 oncall 记录

Issue #1 问题描述 触发 STOP_WITH_SAVEPOINT 之后,作业完成 savepoint 之后, 作业未正常退出 根因分析 报错原因: 通过查看 task 的运行状态,发现存在部分的 task 依旧处于 RUNNING 状态. 对应 task 的执行栈 "Legacy Source Thread - Source: Custom File Source (1/1)#4" #3929 prio=5 os_prio=0 tid=0x00007f05f020a800 nid=0x2dcdf waiting on condition [0x00007f058a851000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:226) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:138) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) File Source Task 一直处于 sleep 状态, 未退出,导致整体未退出 File Source 对应的代码: env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.minutes(30).toMilliseconds(), xxxxxx) 基本可以断定,由于 monitor-interval 设置的过长,导致 FileSource 长时间处于 sleep 状态, 未能及时响应 cancel 的逻辑。...

February 15, 2023

2023-Q1-W6 oncall 记录

Issue #1 问题描述 使用 Rocksdb statebackend Flink 作业频繁 oom 导致作业失败 报错原因 OOM 报错,本质是内存不够用导致, 通过 heap Dump 分析, 主要占用在 broadcaststate 的占用上 解决方案 调整堆内内存比例, 增大堆内内存比例, 或者增加作业 TaskManager 的内存, 进而增大堆内内存. 小结 当 Flink 作业用到了 broadcast 操作时,要注意多分配一些内存给堆内, 避免 broadcast 导致堆内内存占用过多,导致 OOM 反思 作为平台方,如何及时预警和发现这些可能存在问题的作业

February 9, 2023

2023-Q1-W5 oncall 记录

Issue #1 问题描述 使用 Flink SQL 写 hive 表报错 org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive_zjyprc_hadoop`.`dw`.`dwd_mif_edge_prod_smt_print_di` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind at org.apache.flink.connectors.hive.HiveTableSink.createStreamSink(HiveTableSink.java:257) at org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:196) at org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:147) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:317) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:158) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) at org....

February 2, 2023

2023-Q1-W3 oncall 记录

Issue #1 问题描述 使用有限流补数据,出现数据丢失 原因分析 最后的 Sink 开启了两阶段提交,Source 在进入 Finish 状态之后, 最后没有触发一次 checkpoint,导致最后的数据没有被写入到下游,出现了数据丢失 解决方式 使用 Flink1.14 版本, 设置参数 execution.checkpointing.checkpoints-after-tasks-finish.enable=true; 作业进入 finish 状态之后,会主动触发一次 checkpoint,确保数据被正确写入到下游。 https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams Issue #2 问题描述 Flink Job Application 中同时运行了两个 Job 大概逻辑为: DataStream<Pojo> sourceDatastream = xxxx; // 基于 Table API 创建了 Table 对象 Table table = tbEnv.fromDataStram(sourceDataStream); // 基于 SQL API 创建了 Sink 表 tbEnv.executeSql("create table sinkTable xxx () with ()") // 调用 execute table.executeInto("selectTable"); // 调用 env.execute tbEnv.execute("jobxxxx"); 原因 分析 用户混用了的 DataStream API 和 table API, 在调用 table....

January 16, 2023

2023-Q1-W1 oncall 记录

问题1: 使用 over window 聚合,发现同一个时间窗口会产生多条数据 ? ANS: Over Window 的开窗模式是每条数据过来会产生一个新的窗口,并不会删除原来的窗口产生的数据, 如果要保证每个时间窗口的数据只有一条, 应该使用 group window 问题2: 在 FlinkSQL 中使用 Hive 作为维表时,报错: The only supported ‘streaming-source.partition.include’ is ‘all’ in hive table scan, but is ’latest’) ANS: SQL 写法的问题, 当使用 latest 时,必须要使用 temporal join 的写法, 如果使用了普通的 join,或者使用了子查询, Hive 表实际会当作一个 source 表,因此无法支持 latest 的选项 问题3: Hive 函数中的 collect_list ,Flink SQL 中应该用什么函数? ANS: Flink SQL 中提供了 collect 函数用于聚合 可以使用 hive module, 使用 hive 函数直接引用即可 问题4: 使用 Hive 维度表,非常容易 OOM,实际的维度表可能就几十 M ANS:...

January 4, 2023