线上 Flink keytab 丢失异常修复

问题描述 当 Flink 作业运行在 Yarn 集群时,往往机器上的物理盘, 一般会保存作业以来的一些配置, keytab, log 当磁盘故障时, 对于 Flink 作业的影响某些场景并不是致命的,不会导致作业立马失败。 对应的日志如下: org.apache.hadoop.security.KerberosAuthException: Login failure for user: xxxx 失败原因: Flink 作业在 relogin 时,依赖的 keytab 找不到,导致 relogin 失败, 无法近一步触发 checkpoint. 线上问题修复 修复思路: 只要把对应的 keytab 目录重建即可, 再把相应的 keytab 拷贝过去即可。 使用 arthas 找到作业依赖的 keytab 的路径. [arthas@80073]$ watch org.apache.hadoop.security.UserGroupInformation newLoginContext params[3].getParameters() Press Q or Ctrl+C to abort. Affect(class count: 1 , method count: 1) cost in 160 ms, listenerId: 9 method=org.apache.hadoop.security.UserGroupInformation.newLoginContext location=AtExit ts=2023-01-10 15:37:09; [cost=0....

January 10, 2023

Flink SQL 的 catalog 局限性

Flink SQL Catalog 局限性 现阶段的 Flink SQL 元数据引用方式 Flink SQL 使用 calcite 框架作为 Flink SQL 的核心组建, calcite 虽然支持无限层的元数据结构,但 Flink SQL 中限制了元数据的组织结构为 3 级别 结构, 分别是 catalog, database, table 。 在 Flink SQL 中使用一张外部数据表, 大概有两种形式: 创建一个 catalog, 通过 catalog.database.table 来引用一张数据表. -- 使用 CREATE CATALOG DDL CREATE CATALOG my_catalog WITH( 'type' = 'jdbc', 'default-database' = '...', 'username' = '...', 'password' = '...', 'base-url' = '...' ); -- 使用 TableEnv 进行注册 HiveCatalog hiveCatalog = new HiveCatalog(); tbEnv....

January 7, 2023

漫长的 DNS 问题排查和修复

问题发现 2022.05.20 用户反馈 Flink 作业莫名卡住 暂时修复: 通知用户及时重启恢复作业 关注作业积压的报警, 及时发现问题 排查机器和DNS服务: DNS 服务无明显的问题异常, 凌晨机器负载较高,但进一步的原因无法确认 由于作业已经重启, 无有效手段进行下一步分析, 因此计划开发 Flink Canray 程序,定期扫描全集群的作业,结合 metric 和 jstack 能过及时发现问题 Flink Canary 规划和开发 问题排查 2022.06.28 上线 Flink canary 之后, 能过及时发现现场并进行分析 查看 jstack 查看对应的执行栈, 发现最终是在一个 native 方法上 pool-21-thread-1": running at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) at java.net.InetAddress.getAllByName0(InetAddress.java:1277) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at org.apache.http.impl.conn.SystemDefaultDnsResolver.resolve(SystemDefaultDnsResolver.java:45) at org.apache.http.impl.conn.DefaultClientConnectionOperator.resolveHostname(DefaultClientConnectionOperator.java:263) at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:162) at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326) at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:118) at org....

January 5, 2023

2023-Q1-W1 oncall 记录

问题1: 使用 over window 聚合,发现同一个时间窗口会产生多条数据 ? ANS: Over Window 的开窗模式是每条数据过来会产生一个新的窗口,并不会删除原来的窗口产生的数据, 如果要保证每个时间窗口的数据只有一条, 应该使用 group window 问题2: 在 FlinkSQL 中使用 Hive 作为维表时,报错: The only supported ‘streaming-source.partition.include’ is ‘all’ in hive table scan, but is ’latest’) ANS: SQL 写法的问题, 当使用 latest 时,必须要使用 temporal join 的写法, 如果使用了普通的 join,或者使用了子查询, Hive 表实际会当作一个 source 表,因此无法支持 latest 的选项 问题3: Hive 函数中的 collect_list ,Flink SQL 中应该用什么函数? ANS: Flink SQL 中提供了 collect 函数用于聚合 可以使用 hive module, 使用 hive 函数直接引用即可 问题4: 使用 Hive 维度表,非常容易 OOM,实际的维度表可能就几十 M ANS:...

January 4, 2023

Flink mini-batch "引发" 的乱序问题

问题描述 近期业务反馈, 开启了 mini-batch 之后, 出现了数据不准的情况, 关掉了 mini-batch 之后, 就正常了, 因此业务方怀疑,是不是 Flink 的 mini-batch 存在 bug ? 问题排查 初步分析 mini-batch 已经在内部大规模使用, 目前没有发现一例和开启 mini-batch 有关, 同时 mini-batch 本质只是将数据进行攒批然后计算, 并没有修改核心的运算逻辑. 开关 mini-batch 的关键时数据的批量计算, 是否在批量计算使得原本存在 bug 的代码暴露问题 业务在 Flink SQL 使用了多个双流 join 和 group window,如果不注意使用,很可能导致乱序,最终的错误结果是某条数据没有被正常更新, 和乱序的情况比较类似. 综上考虑, 整体排查的方向还是排查 SQL 的业务逻辑是否存在乱序的 case, 开启了 mini-batch 后是否加剧了这种乱序的产生 代码逻辑梳理 flowchart LR join1(join1 \n item_day, item_key) --> join2 join2(join2 \n item_day, item_key) --> join3 join3(join3 \n item_day, item_key) --> group1 group1(group1 \n item_day, item_key) --> group2 group2(group2 \n item_day, item_key, key1, key2, key3) --> sink sink(sink \n pk: item_day, item_key) 抽象之后的 DAG 如图所示:...

January 3, 2023

Flink Checkpoint 失败策略未生效排查

问题描述 2022.06.19 突然有用户反馈, 作业的积压时间突然变为 24 个小时 排查过程 初步怀疑是配置state.savepoints.dir了导致从一个非常旧的offset恢复,导致作业消费越界,直接从 topic 的最旧的位置消费, 一般 topic 的保留时间为 24 小时 通过查看 topic 的消息过期时间, 可以发现是 7 天的超时时间,旧的 savepoint 路径的 checkpoint 时间是 5.24, 所以基本可以排除是 state.savepoints.dir造成的影响. 查看作业的监控,确认 checkpoint 是否正常 查看 checkpoint 失败情况 (并没有 checkpoint 失败的情况, 对应的 metric 是 NumofFailedCheckpoint 无增长) 查看 作业重启情况 (06.18 20:00 作业发生了重启) 通过查看日志, 确认 checkpoint 是否正常, 最后一次成功 checkpoint 时间为 21:40 2022-06-17 21:40:56.770 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 121061 (type=CHECKPOINT) @ 1655473256761 for job 2e2dbb876d892e170cb7418a5812cbad. 2022-06-17 21:40:57....

June 22, 2022

FlinkSQL 踩坑经历之乱序问题

FlinkSQL 平台上线至今,满足了大大小小的业务, 也踩了不少坑, 希望能回忆并记录下来。 乱序问题 在业务编写 FlinkSQL 时, 非常常见的就是乱序相关问题, 在出现问题时,非常难以排查,且无法稳定复现,这样无论是业务方,还是平台方,都处于一种非常尴尬的地步。 在实时 join 中, 如果是 Regular Join, 则使用的是 Hash Join 方式, 左表和右表根据 Join Key 进行hash,保证具有相同 Join Key 的数据能够 Hash 到同一个并发,进行 join 的计算 。 以下面的例子进行说明, 以下有三张表, 分别是订单表, 订单明细表, 和商品类目 。 这三张表的实时数据都从 MySQL 采集得到并实时写入 Kafka, 均会实时发生变化, 无法使用窗口计算 除了订单表有订单时间, 其他两张表都没有时间属性, 因此无法使用watermark CREATE TABLE orders ( order_id VARCHAR, order_time TIMESTAMP ) WITH ( 'connector' = 'kafka', 'format' = 'changelog-json' ... ); CREATE TABLE order_item ( order_id VARCHAR, item_id VARCHAR ) WITH ( 'connector' = 'kafka',) 'format' = 'changelog-json' ....

June 3, 2022