Issue #1

问题描述

用户在提交 FlinkSQL 时, 始终处于 codegen 阶段,整体耗时甚至超过了 3-4 个小时

问题分析

通过排查执行栈,主要执行栈 codesplit 上, 说明生成的代码太长,需要花非常多的时间用于代码切割上

"main" #1 prio=5 os_prio=0 tid=0x00007f480c028800 nid=0x11316 runnable [0x00007f4813b2e000]
   java.lang.Thread.State: RUNNABLE
        at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:414)
        at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:360)
        at org.apache.flink.table.codesplit.ReturnValueRewriter.rewrite(ReturnValueRewriter.java:86)
        at org.apache.flink.table.codesplit.JavaCodeSplitter.splitImpl(JavaCodeSplitter.java:48)
        at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:35)
        at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
        at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
        at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:124)
        at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
        at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
        at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
        at org.apache.flink.table.planner.delegation.StreamPlanner$$Lambda$1420/309438139.apply(Unknown Source)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.TraversableLike$$Lambda$338/537524656.apply(Unknown Source)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

原因分析:

原始的 SQL 只是一个 不到 400 行的 SQL,整体的处理并没有特别复杂, 通过代码生成的太长,甚至达到了 100w 行, 我们可以反推: 代码太长 -》 SQL 执行树太复杂 -〉SQL 过滤或者 ETL 逻辑被上推到上面的分支,导致执行树膨胀, 通过将一些可能优化规则逐一排查,禁用相关的 Rule,SQL 的执行过程是否会加速

排查:

通过测试, SparkSQL 和 FlinkSQL 存在同样的问题, 由于 FlinkSQL 当前并不支持排除掉某个 Ruler,因此我们使用 SparkSQL 来进行测试和排查, 最终发现,通过禁用掉 org.apache.spark.sql.catalyst.optimizer.PushDownPredicates SQL 可以正常执行

SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushDownPredicates;

解决方式

SPARK SQL 在执行 SQL 前禁用掉 PushDownPredicates 这条规则即可

SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushDownPredicates;

Flink SQL 修改 SQL planner 模块代码,注释掉部分 Rule 的使用,重新打包即可 经过不断排查, FlinkSQL 可能会和多条SQL 影响相关

//    CoreRules.FILTER_REDUCE_EXPRESSIONS,
//    CoreRules.PROJECT_REDUCE_EXPRESSIONS,
//    CoreRules.CALC_REDUCE_EXPRESSIONS,
//    CoreRules.JOIN_REDUCE_EXPRESSIONS


+//    SimplifyFilterConditionRule.INSTANCE,
+//    SimplifyJoinConditionRule.INSTANCE,
+//    JoinConditionTypeCoerceRule.INSTANCE,
+//    CoreRules.JOIN_PUSH_EXPRESSIONS


+//    PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+//      REWRITE_COALESCE_RULES.asScala ++
+//      REDUCE_EXPRESSION_RULES.asScala ++


+//    CoreRules.FILTER_SET_OP_TRANSPOSE
+//    CoreRules.FILTER_MERGE