查询并行执行(0)-一些思考


查询并行执行的目的是利用更多计算资源进行查询的加速,其抽象流程主要分为两个阶段:

  1. 并行计划生成:在这个阶段,优化器或者相关组件负责将执行计划(PlanTree)拆分为多个子计划(PartialPlan,Pipeline)。
  2. 并行计划执行:并行执行分为两个维度,首先子计划之间并行,其次子计划本身也可以有多个并行实例。

针对以上两个阶段,各家数据库、计算引擎产品都有不同的实现,该系列文章旨在对各类方案进行讨论和整理。本文作为该系列的第一篇,主要讨论一下并行执行下的几个设计点。

Pipeline

这里Pipeline概念来自论文Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age。这篇文章解决的问题是:过去的并行执行是通过Exchange算子将Query Plan分割为多个Query Plan,之后设置好并行度将Query Plan分配给多个线程进行执行,这种固定调度的方法不灵活,容易有以下:

  1. 各个并行度之间出现DataSkew,处理不均匀
  2. 不容易做NUMA Aware

此文章核心提出把Query Plan拆分为Pipeline(本质还是子Query Plan?),把Pipeline包装成任务,由一个中心调度器进行调度执行。通过中心调度器执行就灵活多了,比如调度器中Worker可以:

  • 任务先执行完了就取下一个,解决处理不均衡
  • 优先拿自己本地NUMA的任务执行(NUMA-Aware

💡 个人认为Morsel-Driven Parallelism声称优化与Exchange算子关系不大?待验证

  • Exchange算子同样可以打包为一个Task,比如DataFusion中Query Plan的执行实例是一个协程Task,协程Task由线程Worker执行,理论上对协程调度器进行干涉也可以达到动态负载的效果?

  • Exchange算子通过设置Local Buffer,干涉调度器也可以做到NUMA-Aware?

相关:https://github.com/apache/datafusion/issues/7001

Exchange Or No Exchange

通过Exchange算子进行并行是经典做法,好处是为其余算子屏蔽并行逻辑,并行逻辑(数据分发)都由Exchange算子进行处理。Exchange算子的缺点是可能造成数据物化开销。

DuckDB直接将Exchange算子去掉,设置了如下感知并行的Sink接口:每个并行实例运行时调用Sink,运行结束后调用Combine,所有实例运行结束后执行Finalize。算子可以将并行处理的逻辑实现在Sink中以避免额外的物化开销,比如对于Hash Join算子Build侧,可以直接通过Sink构建HashTable。

virtual SinkResultType Sink(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate,
                            DataChunk &input) const;
virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const;
//! The finalize is called when ALL threads are finished..
virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,
                                  GlobalSinkState &gstate) const;

然而,这种方法对于跨机器通过网络传输的并行计划是难以实现的(比如无法在多机场景下构建统一的数据结构),因此跨机器并行执行场景下还是会采用Exchange算子的方案(比如StarRock,ClickHouse)。

Push Base

Push Or Pull的区别目前理解还不深。从几家的实现上看,Pull和Push都有一些优点也有一些比较蛋疼的点。

  • 比如上面的Sink接口本质上是一种Push实现,用纯Pull很难实现类似效果。
  • Pull在某些实现上确实会更方便但是需要将控制流包含在内部,有些调度逻辑做起来比较麻烦。比如提前暂停执行。
  • Push会导致调度方面的逻辑比较麻烦,要返回多种结果决定后续如何调度。

从性能上看,DataFusion证明了Pull模型同样可以实现很好的性能。

Ref

#Query #Compute