查询并行执行(1)-datafusion


datafusion是pull base、采用exchange算子的单机查询引擎,本文探讨其在并行计划生成以及并行计划执行过程。

并行计划生成

在查询的并行执行过程中,会引入算子间的数据分布对齐问题:每个算子的输入与输出数据都具有特定的分布要求,而要实现真正的并行执行,就需要确保上游算子的输出分布能够满足下游算子的输入分布需求。以如下图为例:

  • 下游的 Agg Final 算子要求其输入数据按照某个 Partition 规则分布,而上游的 Agg Partial 输出则是无规则的并行分布,二者不匹配;
  • 下游的 Agg Partial 期望输入数据具有并行度为 3 的分布,而上游的 Scan 算子输出仅为并行度 1,无法满足要求。

image.png

Datafusion中,物理优化器会检查上下游分布式是否对齐,若上游算子无法满足下游算子需求,则需要引入Repartition算子对上游输出数据进行重新分发,以满足下游算子的输入分布要求。

在执行时,Repartition的位置即为计划的拆分边界,若算子之间不存在Repartition,则会被归类到同一个Task,由同一个计算资源(协程)执行,具体拆分机制实现见下文执行机制。

image.png

执行机制

DataFusion的执行任务启动遵循Pull模式,每个算子提供一个方法execute返回Stream,对该Stream进行Poll可以返回相应的计算结果。查询任务启动由root节点即最下游开始执行,之后递归不断调用上游的execute,直至将所有Stream建立完成。

Repartition,Coalesce这类算子为Task的边界点,在这类算子的execute函数中,其会将上游算子启动为独立task以及创建相应的channel进行数据传输。

为了让上游算子负责不同的分区任务,execute函数包含一个partition参数用于指定该execute后续负责的分区任务。

pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream>;
}

对于这种简单、无状态的算子,如filter,其并行度通常会为下游保持一致,处于同一任务内。如下图,Filter算子的execute很简单,直接将两个流嵌套起来。由于这里没有spawn任务,因此上下游算子处于同一个任务。

fn execute(
   &self,
   partition: usize,
   context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
   trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
   let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
   Ok(Box::pin(FilterExecStream {
       schema: self.input.schema(),
       predicate: self.predicate.clone(),
       input: self.input.execute(partition, context)?,
       baseline_metrics,
    }))
}

Ref

  1. https://io-meter.com/2020/01/04/olap-distributed/
#OLAP #Compute