推进乡镇综合行政执法改革 大大提升了基层治理水平
今年以来,平顺县聚焦乡镇执法赋权不足、资源不够、力量分散等突出问题,积极探索、创新举措,联通一张网络,从传统型向智慧型优化升级,推
本篇主要介绍了一种使用Rust语言编写的查询引擎——DataFusion,其使用了基于Arrow格式的内存模型,结合Rust语言本身的优势,达成了非常优秀的性能指标
DataFusion是一个查询引擎而非数据库,因此其本身不具备存储数据的能力。但正因为不依赖底层存储的格式,使其成为了一个灵活可扩展的查询引擎。它原生支持了查询CSV,Parquet,Avro,Json等存储格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多种数据源。同时还提供了丰富的扩展接口,可以方便的让我们接入自定义的数据格式和数据源。
【资料图】
DataFusion具有以下特性:
高性能:基于Rust,不用进行垃圾回收;基于Arrow内存模型,列式存储,方便向量化计算连接简单:能够与Arrow的其他生态互通集成和定制简单:可以扩展数据源,方法和算子等完全基于Rust编写:高质量基于DataFusion我们可以轻松构建高性能、高质量、可扩展的数据处理系统。
DBMS 与 Query Engine 的区别DBMS: DataBase Management SystemDBMS是一个包含完整数据库管理特性的系统,主要包含以下几个模块:
存储系统元数据(Catalog)查询引擎(Query Engine)访问控制和权限资源管理管理工具客户端多节点管理Query EngineDataFusion是一种查询引擎,查询引擎属于数据库管理系统的一部分。查询引擎是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据。
DataFusion架构架构详情DataFusion查询引擎主要由以下几部分构成:
前端语法解析语义分析Planner:语法树转换成逻辑计划查询中间表示Expression(表达式)/ Type system(类型系统)Query Plan / Relational Operators(关系算子)Rewrites / Optimizations(逻辑计划优化)主要涉及
DFParser
和SqlToRel
这两个struct
查询底层表示Statistics(物理计划算子的统计信息,辅助物理计划优化)Partitions(分块,多线程执行物理计划算子)Sort orders(物理计划算子对数据是否排序)Algorithms(物理计划算子的执行算法,如Hash join和Merge join)Rewrites / Optimizations(物理计划优化)主要涉及
LogicalPlan
和Expr
这两个枚举类
执行运行时(算子)分配资源向量化计算主要涉及
PyhsicalPlanner
这个trait
实现的逻辑计划到物理计划的转换,其中主要的关键点是ExecutionPlan
和PhysicalExpr
扩展点主要涉及所有执行算子,如
GroupedHashAggregateStream
DataFusion查询引擎的架构还是比较简单的,其中的扩展点也非常清晰,我们可以从以下几个方面对DataFusion进行扩展:
用户自定义函数UDF无状态方法
/// 逻辑表达式枚举类pub enum Expr { ... ScalarUDF { /// The function fun: Arc, /// List of expressions to feed to the functions as arguments args: Vec, }, ...}/// UDF的逻辑表达式pub struct ScalarUDF { /// 方法名 pub name: String, /// 方法签名 pub signature: Signature, /// 返回值类型 pub return_type: ReturnTypeFunction, /// 方法实现 pub fun: ScalarFunctionImplementation,}/// UDF的物理表达式pub struct ScalarFunctionExpr { fun: ScalarFunctionImplementation, name: String, /// 参数表达式列表 args: Vec>, return_type: DataType,}
用户自定义聚合函数UADF有状态方法
/// 逻辑表达式枚举类pub enum Expr { ... AggregateUDF { /// The function fun: Arc, /// List of expressions to feed to the functions as arguments args: Vec, /// Optional filter applied prior to aggregating filter: Option>, }, ...}/// UADF的逻辑表达式pub struct AggregateUDF { /// 方法名 pub name: String, /// 方法签名 pub signature: Signature, /// 返回值类型 pub return_type: ReturnTypeFunction, /// 方法实现 pub accumulator: AccumulatorFunctionImplementation, /// 需要保存的状态的类型 pub state_type: StateTypeFunction,}/// UADF的物理表达式pub struct AggregateFunctionExpr { fun: AggregateUDF, args: Vec>, data_type: DataType, name: String,}
用户自定义优化规则Optimizer
定义了承载优化规则的结构体,其中optimize
方法实现了逻辑计划优化的过程。优化规则列表中的每个优化规则会被以TOP-DOWN
或BOTTOM-UP
方式作用于逻辑计划树,优化规则列表会被实施多个轮次。我们可以通过实现OptimizerRule
这个trait
来实现自己的优化逻辑。
pub struct Optimizer { /// All rules to apply pub rules: Vec>,}pub trait OptimizerRule { /// Try and rewrite `plan` to an optimized form, returning None if the plan cannot be /// optimized by this rule. fn try_optimize( &self, plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result
用户自定义逻辑计划算子/// 逻辑计划算子枚举类pub enum LogicalPlan { ... Extension(Extension), ...}/// 自定义逻辑计划算子pub struct Extension { /// The runtime extension operator pub node: Arc,}/// 自定义逻辑计划算子需要实现的traitpub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { ... }
用户自定义物理计划算子/// 为自定义的逻辑计划算子`UserDefinedLogcialNode`生成对应的物理计划算子pub trait ExtensionPlanner { async fn plan_extension( &self, planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, logical_inputs: &[&LogicalPlan], physical_inputs: &[Arc], session_state: &SessionState, ) -> Result
用户自定义数据源可以看出,自定义数据源其实就是生成一个对应的ExecutionPlan执行计划,这个执行计划实施的是扫表的任务。如果数据源支持下推的能力,我们在这里可以将projection
filters
limit
等操作下推到扫表时。
/// 自定义数据源需要实现的traitpub trait TableProvider: Sync + Send { ... async fn scan( &self, state: &SessionState, projection: Option<&Vec>, filters: &[Expr], limit: Option, ) -> Result>; ...}
用户自定义元数据pub trait CatalogProvider: Sync + Send { ... /// 根据名称获取Schema fn schema(&self, name: &str) -> Option>; /// 注册Schema fn register_schema( &self, name: &str, schema: Arc, ) -> Result
逻辑计划(LogicalPlan)逻辑计划其实就是数据流图,数据从叶子节点流向根节点
let df: DataFrame = ctx.read_table("http_api_requests_total")? .filter(col("path").eq(lit("/api/v2/write")))? .aggregate([col("status")]), [count(lit(1))])?;
这里我们就使用DataFusion的API接口构造了一个数据流,首先read_table
节点会从数据源中扫描数据到内存中,然后经过filter
节点按照条件进行过滤,最后经过aggregate
节点进行聚合。数据流过最后的节点时,就生成了我们需要的数据。
上述链式调用的API接口实际上并没有真正执行对数据的操作,这里实际上是使用了建造者模式构造了逻辑计划树。最终生成的DataFrame
实际上只是包含了一下信息:
pub struct DataFrame { /// 查询上下文信息,包含了元数据,用户注册的UDF和UADF,使用的优化器,使用的planner等信息 session_state: SessionState, /// 逻辑计划树的根节点 plan: LogicalPlan,}
支持的逻辑计划算子
ProjectionFilterWindowAggregateSortJoinTableScanRepartitionUnionSubqueryLimitExtensionDistinctValuesExplainAnalyzeSetVariablePrepareDml(...)CreateExternalTableCreateViewCreateCatalogSchemaCreateCatalogDropTableDropView
目标:确保结果相同的情况下,执行更快
初始的逻辑计划,需要经过多个轮次的优化,才能生成执行效率更高的逻辑计划。DataFusion本身的优化器内置了很多优化规则,用户也可以扩展自己的优化规则。
内置优化轮次下推(Pushdown):减少从一个节点到另一个节点的数据的行列数
PushDownProjection
PushDownFilter
PushDownLimit
简化(Simplify):简化表达式,减少运行时的运算。例如使用布尔代数的法则,将b > 2 AND b > 2
简化成b > 2
。
SimplifyExpressions
UnwrapCastInComparison
简化(Simplify):删除无用的节点
平铺子查询(Flatten Subqueries):将子查询用join重写
DecorrelateWhereExists
DecorrelatedWhereIn
ScalarSubqueryToJoin
优化join:识别join谓词
ExtractEqualJoinPredicate
RewriteDisjunctivePredicate
FilterNullJoinKeys
优化distinct
SingleDistinctToGroupBy
ReplaceDistinctWithAggregate
表达式运算(Expression Evaluation)假设现在有这样一个谓词表达式
path = "/api/v2/write" or path is null
经过语法解析和转换后,可以用如下表达式树表示:
DataFusion在实施表达式运算时,使用了Arrow提供的向量化计算方法来加速运算
物理计划(ExecutionPlan)调用DataFusion提供的DefaultPhysicalPlanner
中的create_physical_plan
方法,可以将逻辑计划树转换成物理计划树。其中物理计划树中的每个节点都是一个ExecutionPlan
。执行物理计划树时,会从根节点开始调用execute
方法,调用该方法还没有执行对数据的操作,仅仅是将每个物理计划算子转换成一个RecordBatchStream
算子,形成数据流算子树。这些RecordBatchStream
算子都实现了future
包提供的Stream
特性,当我们最终调用RecordBatchStream
的collect
方法时,才会从根节点开始poll
一次来获取一下轮要处理的数据,根节点的poll
方法内会调用子节点的poll
方法,最终每poll
一次,整棵树都会进行一次数据从叶子节点到根节点的流动,生成一个RecordBatch
。
DataFusion实现的物理计划算子具有以下特性:
异步:避免了阻塞I/O流式:数据是流式处理的向量化:每次可以向量化地处理一个RecordBatch
分片:每个算子都可以并行,可以产生多个分片多核结语DataFusion本身只是一个简单,高效,可扩展的查询引擎框架,用户可以将DataFusion作为开发大型数据中台的基础组件,也可以轻易地将DataFusion嵌入服务中作为查询引擎,也可以使用DataFusion构建自己的数据库系统。如果期望使用分布式的查询引擎,可以关注基于Arrow
和DataFusion
搭建的分布式查询引擎Ballista。
标签:
今年以来,平顺县聚焦乡镇执法赋权不足、资源不够、力量分散等突出问题,积极探索、创新举措,联通一张网络,从传统型向智慧型优化升级,推
2022年6月15日,由中国建筑材料流通协会编制并发布的全国建材家居景气指数(简称BHI)显示,5月BHI为123 07,环比上涨8 13点,同比下跌6 96点
图①:山西临汾经济技术开发区兴荣供应链有限公司的货车整装待发。资料图片 图②:司机王勇平驾驶货车行驶在
2022年北京冬奥会的筹办过程,为中国冰雪运动发展提供了巨大动力。科技创新,成为中国冰雪运动前进道路上嘹亮的号角。在科学技术部社会发展
新华社香港2月6日电题:狮子山下的舞狮人新华社记者韦骅“左眼精,右眼灵,红光万象,富贵繁荣!”“口食八方财,
正在进行围封或强制检测的葵涌邨居民在登记(资料照片)。新华社发新华社香港2月6日电 题:凝聚香港社会共克时艰
2月6日,航拍青海省西宁市雪后美景。受较强冷空气影响,2月5日至6日,青海迎来大范围降雪天气过程,古城西宁银装
[ 相关新闻 ]