Issue #1
问题描述
使用 Flink SQL 写 hive 表报错
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive_zjyprc_hadoop`.`dw`.`dwd_mif_edge_prod_smt_print_di` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind
at org.apache.flink.connectors.hive.HiveTableSink.createStreamSink(HiveTableSink.java:257)
at org.apache.flink.connectors.hive.HiveTableSink.consume(HiveTableSink.java:196)
at org.apache.flink.connectors.hive.HiveTableSink.lambda$getSinkRuntimeProvider$0(HiveTableSink.java:147)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:317)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:158)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:140)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1675)
at com.xiaomi.infra.galaxy.flink.env.FlinkTableEnvironment.translateAndClearBuffer(FlinkTableEnvironment.java:424)
at com.xiaomi.infra.galaxy.flink.env.FlinkTableEnvironment.execute(FlinkTableEnvironment.java:407)
报错原因:
未配置分区 commit 的方式, 这样数据写入之后未添加分区,会导致数据不可见。
解决方案:
在写入 hive 表时,增加 commit 策略, 可以在 SQL hint 中动态加进去
INSERT INTO
hive_table /*+ options('sink.partition-commit.policy.kind'='metastore') */
SELECT *
FROM XXX
Issue #2
问题描述
Flink SQL 实时和 Spark SQL 离线产生结果不一致, 结果产生 NAN
核心 SQL 如下:
SELECT
key1,
key2,
key3,
case when min(value1) = 0 then 0
when min(value1) > 0 then power(10, sum(log(10, case(value1 as double))))
FROM table1
GROUP BY key1, key2, key3
-- 上面的SQL 逻辑是,如果存在最小值为0, 则直接结果为0, 如果所有的值都大于 0, 则计算 log 值进行撤回操作。
原因分析
离线场景的计算逻辑: 先算 min(value1) 如果直接 = 0 , 则直接不计算后面的逻辑了。
实时场景的计算逻辑: 先计算 log, 在计算聚合 min 和 sum,之后不断修正判断结果
解决方案
在实时计算时,算 log 前就预处理好 value1,避免出现非法的结果,导致后面的 retract 流程全部失效。
小结
在使用 Flink SQL 实时处理数据时,当数据可能产生 infinity value 时,要尤为注意, Flink 的 retract 函数是不支持处理这种类型的数据的。