查询并行执行(1)-datafusion
datafusion是pull base、采用exchange算子的单机查询引擎,本文探讨其在并行计划生成以及并行计划执行过程。
并行计划生成
在查询的并行执行过程中,会引入算子间的数据分布对齐问题:每个算子的输入与输出数据都具有特定的分布要求,而要实现真正的并行执行,就需要确保上游算子的输出分布能够满足下游算子的输入分布需求。以如下图为例:
- 下游的 Agg Final 算子要求其输入数据按照某个 Partition 规则分布,而上游的 Agg Partial 输出则是无规则的并行分布,二者不匹配;
- 下游的 Agg Partial 期望输入数据具有并行度为 3 的分布,而上游的 Scan 算子输出仅为并行度 1,无法满足要求。
Datafusion中,物理优化器会检查上下游分布式是否对齐,若上游算子无法满足下游算子需求,则需要引入Repartition算子对上游输出数据进行重新分发,以满足下游算子的输入分布要求。
在执行时,Repartition的位置即为计划的拆分边界,若算子之间不存在Repartition,则会被归类到同一个Task,由同一个计算资源(协程)执行,具体拆分机制实现见下文执行机制。
执行机制
DataFusion的执行任务启动遵循Pull模式,每个算子提供一个方法execute返回Stream,对该Stream进行Poll可以返回相应的计算结果。查询任务启动由root节点即最下游开始执行,之后递归不断调用上游的execute,直至将所有Stream建立完成。
Repartition,Coalesce这类算子为Task的边界点,在这类算子的execute函数中,其会将上游算子启动为独立task以及创建相应的channel进行数据传输。
为了让上游算子负责不同的分区任务,execute函数包含一个partition参数用于指定该execute后续负责的分区任务。
pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
}
对于这种简单、无状态的算子,如filter,其并行度通常会为下游保持一致,处于同一任务内。如下图,Filter算子的execute很简单,直接将两个流嵌套起来。由于这里没有spawn任务,因此上下游算子处于同一个任务。
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!("Start FilterExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(FilterExecStream {
schema: self.input.schema(),
predicate: self.predicate.clone(),
input: self.input.execute(partition, context)?,
baseline_metrics,
}))
}