文章目录
背景
网站建设定制开发最近需要在自研的引擎实现SparkSQL的Cache Table逻辑,网站建设定制开发于是调研了一下相关语法的原理。
语法
CACHE TABLE
网站建设定制开发具体可以见官网
CACHE [ LAZY ] TABLE table_name [ OPTIONS ( 'storageLevel' [ = ] value ) ] [ [ AS ] query ]
- 1
- 2
可以组合出下面一下情况
节选自Spark源码的
// 这个单测检验三段式的表名,BQ解析不支持、Hive解析支持comparePlans( parsePlan("CACHE TABLE a.b.c"), CacheTableStatement(Seq("a", "b", "c"), None, false, Map.empty))// 这个单测主要检验LAZY comparePlans( parsePlan("CACHE LAZY TABLE a.b.c"), CacheTableStatement(Seq("a", "b", "c"), None, true, Map.empty)) // 这个单测主要检验OPTIONS comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel' 'DISK_ONLY')"), CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY")))// cache query的时候不允许三段式了 intercept("CACHE TABLE a.b.c AS SELECT * FROM testData", "It is not allowed to add catalog/namespace prefix a.b")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
可能是因为LogicalPlan不好直接构造,社区没有相应的单测。下面提供自己写的伪代码
comparePlans(parsePlan("CACHE TABLE t AS SELECT * FROM testData"),CacheTableStatement(Seq("t"), Project[*], // 这里改成None运行即可看到,实际的plan是Projecttrue,Map("storageLevel" -> "DISK_ONLY"))) // 省略OPTIONS,默认是DISK_ONLY
- 1
- 2
- 3
- 4
- 5
- 6
再来看一下Spark没有列举出的情况
// Spark支持两段式comparePlans( parsePlan("CACHE TABLE b.c"), CacheTableStatement(Seq("b", "c"), None, false, Map.empty))// OPTIONS支持逗号隔开多组,但是官网只有storageLevel一个选项,额外的参数目前无意义(可能是spark为了扩展)comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel'='DISK_ONLY', 'k'='v')"), CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "DISK_ONLY", "k" -> "v")))// OPTIONS似乎没有做检验,但官网说非法的storageLevel会报错comparePlans( parsePlan("CACHE LAZY TABLE a.b.c OPTIONS('storageLevel'='TEST')"), CacheTableStatement(Seq("a", "b", "c"), None, true, Map("storageLevel" -> "TEST")))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
UNCACHE TABLE
用户缓存了一张表后,也能主动释放掉它
UNCACHE TABLE [ IF EXISTS ] table_name
相应的单测也简单很多
comparePlans( parsePlan("UNCACHE TABLE a.b.c"), UncacheTableStatement(Seq("a", "b", "c"), ifExists = false))comparePlans( parsePlan("UNCACHE TABLE IF EXISTS a.b.c"), UncacheTableStatement(Seq("a", "b", "c"), ifExists = true))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
CLEAR CACHE
非常简单的语句,没有什么组合情况,直接清除所有的cache
CLEAR CACHE
REFRESH TABLE
REFRESH应该是用在cache table的source更新了只时,重新加载一次cache table
REFRESH [TABLE] tableIdentifier
官网给的案例也很简单
-- The cached entries of the table will be refreshed -- The table is resolved from the current database as the table name is unqualified.REFRESH TABLE tbl1;-- The cached entries of the view will be refreshed or invalidated-- The view is resolved from tempDB database, as the view name is qualified.REFRESH TABLE tempDB.view1;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
实现
SparkSQL DDL的实现其实也是继承了LogicalPlan,在statements.scala中许多DDL相关的class
CacheTableStatement
能找到CacheTableStatement、UncacheTableStatement、RefreshTableStatement三个statement,但是没有CLEAR CACHE的
CacheTableStatementcase class CacheTableStatement( tableName: Seq[String], plan: Option[LogicalPlan], isLazy: Boolean, options: Map[String, String]) extends ParsedStatement // ParsedStatement是继承了LogicalPlan的一个抽象类
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
其实也普通的sql编译差不多,cache table也作为LogicalPlan和查询的QueryPlan组合在一起
UncacheTableStatement,RefreshTableStatemen也类似,不再赘述
ClearCacheCommand
debug相应的
这个单测还提供了一个实现cache table的思路:先用sparksql的api建立临时view,再cache临时view
test("Clear all cache") { withTempView("t1", "t2") { sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1") sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2") spark.catalog.cacheTable("t1") spark.catalog.cacheTable("t2") spark.catalog.clearCache() assert(spark.sharedState.cacheManager.isEmpty) sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1") sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2") spark.catalog.cacheTable("t1") spark.catalog.cacheTable("t2") sql("Clear CACHE") assert(spark.sharedState.cacheManager.isEmpty) }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
发现会走到ClearCacheCommand这个object,它是一个可执行的LogicalPlan
// IgnoreCachedData定义为trait IgnoreCachedData extends LogicalPlan {}case object ClearCacheCommand extends RunnableCommand with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { sparkSession.catalog.clearCache() Seq.empty[Row] }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
在analyzedPlan中记录的也是它