2023-Q1-W9 oncall 记录
Issue #1 问题描述 用户在提交 FlinkSQL 时, 始终处于 codegen 阶段,整体耗时甚至超过了 3-4 个小时 问题分析 通过排查执行栈,主要执行栈 codesplit 上, 说明生成的代码太长,需要花非常多的时间用于代码切割上 "main" #1 prio=5 os_prio=0 tid=0x00007f480c028800 nid=0x11316 runnable [0x00007f4813b2e000] java.lang.Thread.State: RUNNABLE at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:414) at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:360) at org.apache.flink.table.codesplit.ReturnValueRewriter.rewrite(ReturnValueRewriter.java:86) at org.apache.flink.table.codesplit.JavaCodeSplitter.splitImpl(JavaCodeSplitter.java:48) at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:35) at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:124) at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59) at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) at org.apache.flink.table.planner.delegation.StreamPlanner$$Lambda$1420/309438139.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.TraversableLike$$Lambda$338/537524656.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala....
2023-Q1-W8 oncall 记录
Issue #1 问题描述 Flink StreamLoad 导入 Doris 报错 401 解决方式 一般来说, 报错 401 就是鉴权的问题, 排查账号密码是否正确即可。 最终排查, 是集群写错了, 因此除了排除账号密码的正确性外, 还需要确保集群没有填错。。
2023-Q1-W7 oncall 记录
Issue #1 问题描述 触发 STOP_WITH_SAVEPOINT 之后,作业完成 savepoint 之后, 作业未正常退出 根因分析 报错原因: 通过查看 task 的运行状态,发现存在部分的 task 依旧处于 RUNNING 状态. 对应 task 的执行栈 "Legacy Source Thread - Source: Custom File Source (1/1)#4" #3929 prio=5 os_prio=0 tid=0x00007f05f020a800 nid=0x2dcdf waiting on condition [0x00007f058a851000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:226) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:138) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) File Source Task 一直处于 sleep 状态, 未退出,导致整体未退出 File Source 对应的代码: env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.minutes(30).toMilliseconds(), xxxxxx) 基本可以断定,由于 monitor-interval 设置的过长,导致 FileSource 长时间处于 sleep 状态, 未能及时响应 cancel 的逻辑。...
Flink 批流不一致场景整理
说明 在 Flink 社区的努力下, Flink StreamSQL 和 BatchSQL 的语法可以做到基本一致,但在有些场景下, 两者还是有语义上的区别,因此有些情况下,不能按照离线处理的思维去写 StreamSQL,有可能会出问题。 Batch VS Stream 不一致 case 以下是目前在实际使用中常见的不一致的 case 乱序问题 在业务支持中, 最常见的就是乱序问题了, 在 Batch SQL 转化为 Stream SQL 时, 由于顺序的问题, 导致实时作业产生的结果和离线不一致。 Flink SQL 中 task 和 task 之间常见的传输方式就是 hash。 无论是 source -> group -> over window -> join -> sink 乱序场景的产生: hash-key 和下一计算阶段的主键不一一对应 如上图所示: 实时表和另外两张维度表进行 join, join 的字段分别为 id1, id2 实际数据流向变化: 经过 join2, 数据按 id2 进行 hash,导致存在相同的 id1 数据会被 hash 到不同的节点,因此就无法保证 id1 的顺序性, 在 Sink 到外部系统时,大部分系统是直接基于主键进行更新,最终结果不符合预期...
Flink 积压问题排查
Flink 作业运行时,最常见的问题就是积压问题, 当作业出现积压时,如何才能快速定位到积压原因,并针对性解决呢? 积压的发现 通过我们会通过配置作业的积压报警来及时发现作用的积压情况,下面是一些常用的积压监控指标: freshness freshness 一般代表当前消费的消息体时间和当前时刻的差值,如果差值越大,说明积压也就越严重。 无论是消息队列还是数据湖,消息体本身就带有时间戳,可以较为方便的进行积压的计算 offsetLag 积压的条目数,适用于消息队列, 一般指当前消费的位点和相比消息队列的头节点的 offset 差 snapshotLag snapshot 积压的个数,适用于数据湖,代表当前消费的 snapshot 和最新的 snapshot 版本的差异数量 splitLag 剩余的 split 数量,适用于数据湖, 代表生于待消费的分片数 积压问题的排查 反压排查时,一般分为两步: 发现存在积压的 task 结合 jstack 造成积压的具体原因 发现积压的 task 通常有多种手段来找到积压的 task inpoolUsage/outpoolUsage An estimate of the input/output buffers usage. (ignores LocalInputChannels) 一般情况下, 有这个一个原则: inpool 高的 task 处理比较慢 (可能原因:1. 自身处理慢 2. 下游处理慢导致反压) outpool 高的 task 下游处理比较慢 反压 If you see a back pressure warning (e.g. High) for a task, this means that it is producing data faster than the downstream operators can consume....
2023-Q1-W6 oncall 记录
Issue #1 问题描述 使用 Rocksdb statebackend Flink 作业频繁 oom 导致作业失败 报错原因 OOM 报错,本质是内存不够用导致, 通过 heap Dump 分析, 主要占用在 broadcaststate 的占用上 解决方案 调整堆内内存比例, 增大堆内内存比例, 或者增加作业 TaskManager 的内存, 进而增大堆内内存. 小结 当 Flink 作业用到了 broadcast 操作时,要注意多分配一些内存给堆内, 避免 broadcast 导致堆内内存占用过多,导致 OOM 反思 作为平台方,如何及时预警和发现这些可能存在问题的作业
2023-Q1-W5 oncall 记录
Issue #1 问题描述 使用 Flink SQL 写 hive 表报错 org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive_zjyprc_hadoop`.`dw`.`dwd_mif_edge_prod_smt_print_di` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind at org.apache.flink.connectors.hive.HiveTableSink.createStreamSink(HiveTableSink.java:257) at org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:196) at org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:147) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:317) at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:158) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) at org....
FlinkSQL 时间类型转化使用小结
Flink SQL的时间类型 在 Flink SQL 中,存在两种时间类型, 分别是 TIMESTAMP 和 TIMESTAMP_LTZ. 以下示例所用的字段: TIMESTAMP_FIELD, TIMESTAMP_LTZ_FIELD, BIGINT_FIELD, STRING_FIELD 分别代表对应类型的字段. TIMESTAMP TIMESTAMP <-> BIGINT -- 在 Flink 1.14 之前 -- TIMESTAMP TO BIGINT -- 在 Flink 1.14 之前 直接使用 cast 将 TIMESTAMP 转换为 BIGINT CAST(timestamp_field as BIGINT) as bigint_field -- BIGINT TO TIMESTAMP CAST(bigint_field as TIMESTAMP) as timestamp_field ------------------------------------ -- 在 Flink 1.14 之后 -- TIMESTAMP TO BIGINT -- 1. TIMESTAMP TO BIGINT cast(TIMESTAMP as STRING) as string_field -- 2....
2023-Q1-W3 oncall 记录
Issue #1 问题描述 使用有限流补数据,出现数据丢失 原因分析 最后的 Sink 开启了两阶段提交,Source 在进入 Finish 状态之后, 最后没有触发一次 checkpoint,导致最后的数据没有被写入到下游,出现了数据丢失 解决方式 使用 Flink1.14 版本, 设置参数 execution.checkpointing.checkpoints-after-tasks-finish.enable=true; 作业进入 finish 状态之后,会主动触发一次 checkpoint,确保数据被正确写入到下游。 https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams Issue #2 问题描述 Flink Job Application 中同时运行了两个 Job 大概逻辑为: DataStream<Pojo> sourceDatastream = xxxx; // 基于 Table API 创建了 Table 对象 Table table = tbEnv.fromDataStram(sourceDataStream); // 基于 SQL API 创建了 Sink 表 tbEnv.executeSql("create table sinkTable xxx () with ()") // 调用 execute table.executeInto("selectTable"); // 调用 env.execute tbEnv.execute("jobxxxx"); 原因 分析 用户混用了的 DataStream API 和 table API, 在调用 table....
2023-Q1-W2 oncall 记录
Issue #1 问题描述 使用 StreamingFileSink,报错 fileNotFoundException File does not exist: /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-32-474.inprogress.e9b18821-adf3-4e45-b5e9-dacd1abe32c3 原因分析 Flink 作业在写入过程中, 内部发生了重启, 但下次恢复时, 由于其他定时任务, 对应时刻的文件刚好被 mv 到了其他的目录,导致出现了问题 解决方式 mv 文件时,要避开正在写入的文件, 只处理已经 commit 的文件即可 Issue #2 问题描述 使用 StreamingFileSink 时, 报错 DFSClient_NONMAPREDUCE_-xxxx is already the current lease holder. Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/xxxx/xxx_realtime_sample/tfrecord/2023011202/.part-220-474.inprogress.6746e5ad-1f94-4345-82c8-8d7251394cc5 for DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 on [xxxx](http://xxxxx) because DFSClient_NONMAPREDUCE_-640776598_106_application_1667456877234_4942861 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2783) at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:125) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2867) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:914) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:562) at org....