datafusion-并行执行

date
slug
tags
summary
type
status

Intro

datafusion是一个基于arrow的查询引擎,其执行过程如上:
notion image
用户首先可以创建一个Session来执行查询请求。datafusion支持外部数据源,用户可以通过在Session注册相应catalog,schema,table来接入外部数据源。之后通过sql或者DataFrame API来查询数据源。
SQL和DataFrame的查询请求会经过计划和优化,转化为相应Logical Plan以及Execution Plan。之后Execution Plan可以在TaskContext中进行执行。后文将探讨data fusion具体的执行机制。

执行机制

notion image
首先,对于一个查询请求,通常会被转化为以上的查询计划。Datafusion支持并行执行:数据并行以及计算并行。
  • 数据并行即将数据进行partition,执行时创建多个算子实例来处理不同的分区。
  • 计算并行即查询计划会被分成多个部分,每个部分可以作为一个独立的执行单元(比如协程,线程),执行单元之间可以并行执行,通过channel进行数据交互。
针对不同算子的性质,前后算子需要如何进行并行需要在优化阶段进行检查以及相应的修正,比如下游算子进行数据并行,要求每个算子实例接收到数据必须属于某种分布,这时候上游算子则需要按照该规定进行调整:比如将和下游保持一样的分布,或者通过接入一个Repartition算子来满足下游的要求。
这里不过多探讨以上的问题,这里假设查询计划是已经调整过的,且每个算子都有一个设定好的out partition,表示其并行度,DataFusion是如何将计划调度起来的。
notion image
我们假设算子的调度情况如上图所示:
  • 数据并行:task1与task2,task3与task3与task5
  • 计算并行:task1与task3
计算任务是如何被启动的?计算任务之间的通信是如何建立的?计算并行是按照什么分块的?
DataFusion采用了pull-base的执行模式,每个算子提供一个方法execute,返回一个Stream,对该Stream进行Poll可以返回相应的计算结果。

Coalesce

Coalesce算子用于将上游并行数据收集在一起。从源码注释中可以看到,对于并行度大于1的input算子,其执行函数会spawn多个input task,以此实现上图task3,task4,task5的计算并行。

Filter

以Filter例子,对于这种简单,无状态的算子,其并行度通常会为上游保持一致。Filter算子的execute很简单,通过接收到的partition参数继续执行上游算子的execute,将两个流嵌套起来。由于这里没有spawn任务,因此上下游算子处于同一个任务。

Repartition

从上图可以看到,Filter到Project之间实质是N:M(M>1)的重新分布,对于并行情况,DataFusion引用了Repartition算子作为一种通用解决手段。这种方式在很早就提出来称为Exchange算子。
notion image
如上图所示,Repartition算子在第一次执行时会创建好上下游通信的channel。之后对于上游算子的启动,会按照input partition的数量spawn专门的input task,input task负责执行后续的算子任务,并将其通过channel将数据分布到下游任务中。

Summary

从三个例子可以看到,DataFusion的执行任务启动是自下而上,由下游决定是否创建独立任务并行执行上游以及建立相应的通信方式:
  • 比如对于Coalesce和Repartition,其会将上游算子启动为独立task以及创建相应的channel进行数据传输。
  • 而对于Filter,其会直接通过流的方式获取上游算子的数据,实现上下游算子合并在同一个task中。
Loading...

© ZENOTME 2021-2025