网站建设定制开发SparkSql Cache Table类语法编译原理

文章目录

背景

网站建设定制开发最近需要在自研的引擎实现SparkSQL的Cache Table逻辑,网站建设定制开发于是调研了一下相关语法的原理。

语法

CACHE TABLE

网站建设定制开发具体可以见官网

CACHE [ LAZY ] TABLE table_name    [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
  • 1
  • 2

可以组合出下面一下情况

节选自Spark源码的

// 这个单测检验三段式的表名,BQ解析不支持、Hive解析支持comparePlans(  parsePlan("CACHE TABLE a.b.c"),  CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))// 这个单测主要检验LAZY comparePlans(  parsePlan("CACHE LAZY TABLE a.b.c"),  CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty)) // 这个单测主要检验OPTIONS  comparePlans(  parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"),  CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))// cache query的时候不允许三段式了  intercept("CACHE TABLE a.b.c AS SELECT * FROM testData",  "It is not allowed to add catalog/namespace prefix a.b")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

可能是因为LogicalPlan不好直接构造,社区没有相应的单测。下面提供自己写的伪代码

comparePlans(parsePlan("CACHE TABLE t AS SELECT * FROM testData"),CacheTableStatement(Seq("t"), Project[*],  // 这里改成None运行即可看到,实际的plan是Projecttrue,Map("storageLevel" -> "DISK_ONLY"))) // 省略OPTIONS,默认是DISK_ONLY
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

再来看一下Spark没有列举出的情况

// Spark支持两段式comparePlans(  parsePlan("CACHE TABLE b.c"),  CacheTableStatement(Seq("b", "c"), None, false, Map.empty))// OPTIONS支持逗号隔开多组,但是官网只有storageLevel一个选项,额外的参数目前无意义(可能是spark为了扩展)comparePlans(  parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel'='DISK_ONLY', 'k'='v')"),  CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY", "k" -> "v")))// OPTIONS似乎没有做检验,但官网说非法的storageLevel会报错comparePlans(  parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel'='TEST')"),  CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "TEST")))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

UNCACHE TABLE

用户缓存了一张表后,也能主动释放掉它
UNCACHE TABLE [ IF EXISTS ] table_name
相应的单测也简单很多

comparePlans(  parsePlan("UNCACHE TABLE a.b.c"),  UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))comparePlans(  parsePlan("UNCACHE TABLE IF EXISTS a.b.c"),  UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

CLEAR CACHE

非常简单的语句,没有什么组合情况,直接清除所有的cache
CLEAR CACHE

REFRESH TABLE

REFRESH应该是用在cache table的source更新了只时,重新加载一次cache table

REFRESH [TABLE] tableIdentifier
官网给的案例也很简单

-- The cached entries of the table will be refreshed  -- The table is resolved from the current database as the table name is unqualified.REFRESH TABLE tbl1;-- The cached entries of the view will be refreshed or invalidated-- The view is resolved from tempDB database, as the view name is qualified.REFRESH TABLE tempDB.view1;   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

实现

SparkSQL DDL的实现其实也是继承了LogicalPlan,在statements.scala中许多DDL相关的class

CacheTableStatement

能找到CacheTableStatement、UncacheTableStatement、RefreshTableStatement三个statement,但是没有CLEAR CACHE的

CacheTableStatementcase class CacheTableStatement(    tableName: Seq[String],    plan: Option[LogicalPlan],    isLazy: Boolean,    options: Map[String, String]) extends ParsedStatement     // ParsedStatement是继承了LogicalPlan的一个抽象类
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

其实也普通的sql编译差不多,cache table也作为LogicalPlan和查询的QueryPlan组合在一起

UncacheTableStatement,RefreshTableStatemen也类似,不再赘述

ClearCacheCommand

debug相应的

这个单测还提供了一个实现cache table的思路:先用sparksql的api建立临时view,再cache临时view

test("Clear all cache") {  withTempView("t1", "t2") {    sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")    sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")    spark.catalog.cacheTable("t1")    spark.catalog.cacheTable("t2")    spark.catalog.clearCache()    assert(spark.sharedState.cacheManager.isEmpty)    sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")    sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")    spark.catalog.cacheTable("t1")    spark.catalog.cacheTable("t2")    sql("Clear CACHE")    assert(spark.sharedState.cacheManager.isEmpty)  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

发现会走到ClearCacheCommand这个object,它是一个可执行的LogicalPlan

// IgnoreCachedData定义为trait IgnoreCachedData extends LogicalPlan {}case object ClearCacheCommand extends RunnableCommand with IgnoreCachedData {  override def run(sparkSession: SparkSession): Seq[Row] = {    sparkSession.catalog.clearCache()    Seq.empty[Row]  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在analyzedPlan中记录的也是它

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发