Issue #1
问题描述
用户在提交 FlinkSQL 时, 始终处于 codegen 阶段,整体耗时甚至超过了 3-4 个小时
问题分析
通过排查执行栈,主要执行栈 codesplit 上, 说明生成的代码太长,需要花非常多的时间用于代码切割上
"main" #1 prio=5 os_prio=0 tid=0x00007f480c028800 nid=0x11316 runnable [0x00007f4813b2e000]
java.lang.Thread.State: RUNNABLE
at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:414)
at org.apache.flink.table.shaded.org.antlr.v4.runtime.TokenStreamRewriter.getText(TokenStreamRewriter.java:360)
at org.apache.flink.table.codesplit.ReturnValueRewriter.rewrite(ReturnValueRewriter.java:86)
at org.apache.flink.table.codesplit.JavaCodeSplitter.splitImpl(JavaCodeSplitter.java:48)
at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:35)
at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:124)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:71)
at org.apache.flink.table.planner.delegation.StreamPlanner$$Lambda$1420/309438139.apply(Unknown Source)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.TraversableLike$$Lambda$338/537524656.apply(Unknown Source)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
原因分析:
原始的 SQL 只是一个 不到 400 行的 SQL,整体的处理并没有特别复杂, 通过代码生成的太长,甚至达到了 100w 行, 我们可以反推: 代码太长 -》 SQL 执行树太复杂 -〉SQL 过滤或者 ETL 逻辑被上推到上面的分支,导致执行树膨胀, 通过将一些可能优化规则逐一排查,禁用相关的 Rule,SQL 的执行过程是否会加速
排查:
通过测试, SparkSQL 和 FlinkSQL 存在同样的问题, 由于 FlinkSQL 当前并不支持排除掉某个 Ruler,因此我们使用 SparkSQL 来进行测试和排查, 最终发现,通过禁用掉 org.apache.spark.sql.catalyst.optimizer.PushDownPredicates
SQL 可以正常执行
SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushDownPredicates;
解决方式
SPARK SQL 在执行 SQL 前禁用掉 PushDownPredicates 这条规则即可
SET spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushDownPredicates;
Flink SQL 修改 SQL planner 模块代码,注释掉部分 Rule 的使用,重新打包即可 经过不断排查, FlinkSQL 可能会和多条SQL 影响相关
// CoreRules.FILTER_REDUCE_EXPRESSIONS,
// CoreRules.PROJECT_REDUCE_EXPRESSIONS,
// CoreRules.CALC_REDUCE_EXPRESSIONS,
// CoreRules.JOIN_REDUCE_EXPRESSIONS
+// SimplifyFilterConditionRule.INSTANCE,
+// SimplifyJoinConditionRule.INSTANCE,
+// JoinConditionTypeCoerceRule.INSTANCE,
+// CoreRules.JOIN_PUSH_EXPRESSIONS
+// PREDICATE_SIMPLIFY_EXPRESSION_RULES.asScala ++
+// REWRITE_COALESCE_RULES.asScala ++
+// REDUCE_EXPRESSION_RULES.asScala ++
+// CoreRules.FILTER_SET_OP_TRANSPOSE
+// CoreRules.FILTER_MERGE