Home > Archives > 【BusTub】执行层优化之Filter Pushdown In Join

【BusTub】执行层优化之Filter Pushdown In Join

Published on

在《【数据库综述系列】CMU15-445课程数据库BusTub综述》中我们对于BusTub进行了概述,本文我们将针对查询优化中的Filter Pushdown In Join进行展开讲解。

假设我们在基础的Join(就是INNER JOIN,以及LEFT & RIGHT JOIN)实现这个优化的话,需要考虑如下问题:

1. 实现思考

2. 具体实现

2.1 树形遍历

逻辑计划的优化本质上就是TreeNode的转换、移动,比如说Spark SQL中就基于Scala的模式匹配、偏函数结合树遍历(pre-order、post-order)的方式来巧妙的完成了这一过程。

而BusTub采用是也是类似的方式,只不过采用的至下向上(post-order 左-右-Root)的遍历方式,如果当前节点有Children先优化Children,最后再到自己。基本代码就类似于:

/**
   * Returns a copy of this node where `rule` has been recursively applied first to all of its
   * children and then itself (post-order).  When `rule` does not apply to a given node, it is left unchanged.   
   */
def transformUpWithPruning(cond: TreePatternBits => Boolean, ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[BaseType, BaseType])
: BaseType = {
      
}
auto Optimizer::OptimizeNLJAsHashJoinWithFilterPushDown(const AbstractPlanNodeRef &plan) -> AbstractPlanNodeRef {
    
    std::vector<AbstractPlanNodeRef> children;
    
    // 先优化Children
    for (const auto &child : plan->GetChildren()) {
      children.emplace_back(OptimizeNLJAsHashJoinWithFilterPushDown(child));
    }
    
    auto optimized_plan = plan->CloneWithChildren(std::move(children));
    
    // 最后来优化当前节点,这样可以整体返回新的Node 
}

2.2 节点定位以及转换

回到综述篇,简化之后的语法树

						filter (FilterPlanNode)
                        ||
						join (NestedLoopJoinPlanNode)

					/			\
t1 (SeqScanPlanNode)          t2 (SeqScanPlanNode)

优化后变成了

    					join (HashJoinPlanNode)

					/				       \
				   /	
t1 (SeqScanPlanNode)    				filter (FilterPlanNode)      
												\
											t2 (SeqScanPlanNode)

所以当我们检测到 当前节点类型为Filter 且 它的唯一Child为 NestedLoopJoin,就可以开始着手两个方向的优化:

HashJoinPlanNode(
    // 联表之后输出列
    SchemaRef output_schema, 
    // 左表
    AbstractPlanNodeRef left,
    // 右表
    AbstractPlanNodeRef right,
    // 左联表键
    AbstractExpressionRef left_key_expression, 
    // 右联表键
    AbstractExpressionRef right_key_expression,
    // 联表类型 LEFT、RIGHT、INNER
    JoinType join_type
)
FilterPlanNode(SchemaRef output, AbstractExpressionRef predicate, AbstractPlanNodeRef child)

2.3 下推实现

首先回顾下,优化之后的逻辑计划。

 === PLANNER ===                                                                                                                             
 Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)                                
   Filter { predicate=(#0.0=100) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)                                            
     NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)                    
       SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)                                                                                
       SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)                                                                                           
 === OPTIMIZER ===                                                                                                                           
 HashJoin { type=Inner, left_key=#0.0, right_key=#1.0 } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)                      
   SeqScan { table=t1, filter=(#0.0=100) } | (t1.v1:INTEGER, t1.v2:INTEGER)                                                                   
   SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER) 

首先,我们需要明确下可以下推的条件

2.3.1 下推条件确认

# 子比较表达式 各属于一个表
t1.v1 = 3 and t2.v3 > 4

# 子比较表达式 有一个属于一个表,这种就在只能下推部分,然后剩余部分继续留在原始Filter部分
t1.v1 = 3 and t1.v2 > t2.v4

这样的子表达式也有一个专门的术语: conjunction

我们把过滤条件按照最外层的AND拆分之后的单元叫做conjunction,比如 ((A & B) | C) & D & E 就是由 ((A & B) | C), D,E 三个conjunction组成的。只所以这么定义是,conjunction是是否下推到存储的最小单元。 一个conjunction里面的条件要么都下推,要么都不下推

# 子比较表达式 都属于左表
t1.v1 = 3 or t1.v2 > 4

# 子比较表达式 都属于右表
t2.v3 = 3 or t2.v4 > 4

2.3.2 下推条件拆分

这部分如果对于Spark SQL比较熟悉的,可以去参考org.apache.spark.sql.catalyst.expressions.PredicateHelper#splitConjunctivePredicates实现。当然大概率所有数据库都有这块相关的优化。

auto SplitConjunctivePredicates(const AbstractExpressionRef &expr, std::vector<AbstractExpressionRef> &collector) -> void {
    if (
      const auto *logic_expr = dynamic_cast<const LogicExpression *>(expr.get());
      (logic_expr != nullptr && logic_expr->logic_type_ == LogicType::And)
    ) {
      SplitConjunctivePredicates(logic_expr->GetChildAt(0), collector);
      SplitConjunctivePredicates(logic_expr->GetChildAt(1), collector);
    } else {
      collector.emplace_back(expr);
    }
}

上面逻辑大致实现

struct LeftRightCommonConditions {

  std::vector<AbstractExpressionRef> left;

  std::vector<AbstractExpressionRef> right;

  std::vector<AbstractExpressionRef> common;

};

auto Split(const AbstractExpressionRef &expr, size_t left_column_cnt, size_t right_column_cnt) -> LeftRightCommonConditions {
    std::vector<AbstractExpressionRef> collector{};
    
    SplitConjunctivePredicates(expr, collector);
    
    std::vector<AbstractExpressionRef> left_conditions{};
    std::vector<AbstractExpressionRef> right_conditions{};
    // left.A1 > right.A2
    std::vector<AbstractExpressionRef> common_conditions{};
    
    for (const auto &expression : collector) {
        // ! 判断是否为比较表达式 要是有模式匹配 就很方便了
        if (
           const auto *comp_expression = dynamic_cast<const ComparisonExpression *>(expression.get()); comp_expression != nullptr 	
        ) {
            if (
              const auto *column_value_expression = dynamic_cast<const ColumnValueExpression *>(comp_expression->GetChildAt(0).get());
              column_value_expression != nullptr
            ) {
             	bool right_expression_is_column =
		            dynamic_cast<const ColumnValueExpression *>(comp_expression->GetChildAt(1).get()) != nullptr;   
                
                auto col_idx = column_value_expression->GetColIdx();
                if (col_idx < left_column_cnt && !right_expression_is_column) {
                    //! 左边的条件
		            left_conditions.emplace_back(expression);
                } else if (
           			(col_idx >= left_column_cnt && col_idx < left_column_cnt + right_column_cnt) && !right_expression_is_column
		        ) {
                    //! note 改写column value,使得其在对应的表中,列处于正确的位置
                    right_conditions.emplace_back(
                      std::make_shared<ComparisonExpression>(
                        std::make_shared<ColumnValueExpression>(
                          1, col_idx - left_column_cnt, column_value_expression->GetReturnType()
                        ),
                        comp_expression->GetChildAt(1),
                        comp_expression->comp_type_
                      )
		            );
                } else {
                    common_conditions.emplace_back(expression);
                }
            }
        }
    }
    
    // 如果此时三部分条件都为空,说明第一部分是AND逻辑表达式,需要进一步拆分
    ...
}

2.3.3 优化逻辑计划看下推效果

explain select * from t1 inner join t2 on v1 = v3 where v1 > 5 and v2 > v3;
=== PLANNER ===
Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  Filter { predicate=((#0.0>5)and(#0.1>#0.2)) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
    NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
      SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
      SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
=== OPTIMIZER ===
Filter { predicate=(#0.1>#0.2) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  HashJoin { type=Inner, left_key=#0.0, right_key=#1.0 } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
    SeqScan { table=t1, filter=(#0.0>5) } | (t1.v1:INTEGER, t1.v2:INTEGER)
    SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)

可以看到优化的执行计划中,一部分条件被下推到t1的SeqScan,另外一部分还是停留在Join之后的筛选条件

statement ok
explain select * from t1 inner join t2 on v1 = v3 where v2 = 10 or v1 > 5;
=== PLANNER ===
Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  Filter { predicate=((#0.1=10)or(#0.0>5)) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
    NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
      SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
      SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
=== OPTIMIZER ===
HashJoin { type=Inner, left_key=#0.0, right_key=#1.0 } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  SeqScan { table=t1, filter=((#0.1=10)or(#0.0>5)) } | (t1.v1:INTEGER, t1.v2:INTEGER)
  SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)

虽然是OR,但是来源于同一个表,所以可以直接下推。

statement ok
explain select * from (select * from t1 where v1 = 1000) t1 inner join t2 on v1 = v3 where v2 = 10 and v1 > 5;
=== PLANNER ===
Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  Filter { predicate=((#0.1=10)and(#0.0>5)) } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
    NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
      Projection { exprs=[#0.0, #0.1] } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER)
        Projection { exprs=[#0.0, #0.1] } | (t1.v1:INTEGER, t1.v2:INTEGER)
          Filter { predicate=(#0.0=1000) } | (t1.v1:INTEGER, t1.v2:INTEGER)
            SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
      SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
=== OPTIMIZER ===
HashJoin { type=Inner, left_key=#0.0, right_key=#1.0 } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
  SeqScan { table=t1, filter=((#0.0=1000)and((#0.1=10)and(#0.0>5))) } | (t1.t1.v1:INTEGER, t1.t1.v2:INTEGER)
  SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)

可以看到除了下推,还进行了筛选条件合并,filter=((#0.0=1000)and((#0.1=10)and(#0.0>5)))

3. 总结

至此基于BusTub,扩展优化规则之Join情况下的筛选条件下推,就基本梳理清楚了。当然还是偏学习型的,生产级别的数据库这块肯定比这里复杂更大,但对于理解基本原理足以。

参考

> Spark SQL

> aliyun 一文读懂AnalyticDB MySQL过滤条件智能下推原理

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方