问题描述

近期业务反馈, 开启了 mini-batch 之后, 出现了数据不准的情况, 关掉了 mini-batch 之后, 就正常了, 因此业务方怀疑,是不是 Flink 的 mini-batch 存在 bug ?

问题排查

初步分析

  • mini-batch 已经在内部大规模使用, 目前没有发现一例和开启 mini-batch 有关, 同时 mini-batch 本质只是将数据进行攒批然后计算, 并没有修改核心的运算逻辑.
  • 开关 mini-batch 的关键时数据的批量计算, 是否在批量计算使得原本存在 bug 的代码暴露问题
  • 业务在 Flink SQL 使用了多个双流 join 和 group window,如果不注意使用,很可能导致乱序,最终的错误结果是某条数据没有被正常更新, 和乱序的情况比较类似.

综上考虑, 整体排查的方向还是排查 SQL 的业务逻辑是否存在乱序的 case, 开启了 mini-batch 后是否加剧了这种乱序的产生

代码逻辑梳理

flowchart LR
join1(join1 \n item_day, item_key) --> join2
join2(join2 \n item_day, item_key) --> join3
join3(join3 \n item_day, item_key) --> group1
group1(group1 \n item_day, item_key) --> group2
group2(group2 \n item_day, item_key, key1, key2, key3) --> sink
sink(sink \n pk: item_day, item_key)

抽象之后的 DAG 如图所示:

  1. join1, join2, join3, group1 都是基于 item_day 和 item_key 进行 hash 数据经过这些算子均按照 [item_day, item_key] 进行 hash
  2. group2 算子的 group key 为 [item_day, item_key, key1, key2, key3],Flink 会基于这些字段整体进行 hash
  3. Sink 算子的主键为 [item_day, item_key] ,数据流向 Sink 算子时会按照 [item_day, item_key] 进行 hash.

分析: key1, key2, key3 时由前面的 join1 算子补充的维度字段, 前面的 join 采用的是 left join, 因此可能会存在 item_day 和 item_key 相同的数据, 对应的 key1, key2, key3 并不相同, 经过 group2 会触发具有相同 [item_day, item_key] 的数据,被 hash 到不同的并发,这种就出现了乱序问题

修复手段

最后的 group by [item_day, item_key, key1, key2, key3], 核心还是为了聚合相同的 item_day和 item_key, key1, key2, key3 不属于 value 类型数据, 也不参与聚合, 因此将修改 SQL 避免基于 key1, key2, key3 进行聚合即可, 这里采用 last_value 聚合函数取最后一条数据

-- 原始 SQL
SELECT item_day, item_key, key1, key2, key3, sum(value)
FROM XXX
GROUP BY item_day, item_key, key1, key2, key3

-- 修改为
SELECT item_day, item_key, last_value(key1), last_value(key2), last_value(key3), sum(value)
FROM XXX
GROUP BY item_day, item_key

经过修改之后,保证整个 Flink 处理链路中, 相同的主键对应的数据,无论经过多少次 hash, 都是在同一个并行处理,这种才能保证最终结果的正确性

结论

修改后, 业务的结果恢复正常, 因此 Mini-batch 并不是导致作业出现问题的核心原因, 核心原因还是乱序, 而开启 mini-batch 会加剧这种乱序问题的触发。

开启 mini-batch 之后, 具有相同 key 的数据, 如果落到了同一个 batch, 这样物理上的时间差就更短,因而更容易暴露问题。