Issue #1

问题描述

使用 Flink SQL 写 hive 表报错

org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive_zjyprc_hadoop`.`dw`.`dwd_mif_edge_prod_smt_print_di` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind
        at org.apache.flink.connectors.hive.HiveTableSink.createStreamSink(HiveTableSink.java:257)
        at org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:196)
        at org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:147)
        at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:317)
        at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:158)
        at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1675)
        at com.xiaomi.infra.galaxy.flink.env.FlinkTableEnvironment.translateAndClearBuffer(FlinkTableEnvironment.java:424)
        at com.xiaomi.infra.galaxy.flink.env.FlinkTableEnvironment.execute(FlinkTableEnvironment.java:407)

报错原因:

未配置分区 commit 的方式, 这样数据写入之后未添加分区,会导致数据不可见。

解决方案:

在写入 hive 表时,增加 commit 策略, 可以在 SQL hint 中动态加进去

INSERT INTO 
    hive_table /*+ options('sink.partition-commit.policy.kind'='metastore') */
SELECT * 
FROM XXX

Issue #2

问题描述

Flink SQL 实时和 Spark SQL 离线产生结果不一致, 结果产生 NAN

核心 SQL 如下:


SELECT 
    key1,
    key2,
    key3,
    case when min(value1) = 0 then 0
         when min(value1) > 0 then power(10, sum(log(10, case(value1 as double))))
FROM table1
GROUP BY key1, key2, key3

-- 上面的SQL 逻辑是,如果存在最小值为0, 则直接结果为0, 如果所有的值都大于 0, 则计算 log 值进行撤回操作。

原因分析

离线场景的计算逻辑: 先算 min(value1) 如果直接 = 0 , 则直接不计算后面的逻辑了。

实时场景的计算逻辑: 先计算 log, 在计算聚合 min 和 sum,之后不断修正判断结果

解决方案

在实时计算时,算 log 前就预处理好 value1,避免出现非法的结果,导致后面的 retract 流程全部失效。

小结

在使用 Flink SQL 实时处理数据时,当数据可能产生 infinity value 时,要尤为注意, Flink 的 retract 函数是不支持处理这种类型的数据的。