软件开发定制定制大数据系列文章:
文章目录
在 SQL 软件开发定制定制中有两种方式可以在 DataFrame 和 RDD 软件开发定制定制中进行转换:
- ① 软件开发定制定制利用反射机制,推导包含某种类型的 RDD,通过反射将其转换为指定类型的 DataFrame,适用于提前知道 RDD 的 Schema。
- ② 通过编程借口与 RDD 进行交互获取 Schema,并动态创建 DataFrame,在运行时决定列及其类型。
中的数据结构信息,即为 Scheme
① 通过获取 RDD 内的 Scheme
(使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。在 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。
其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._
- 这里的 sqlContext 不是包名,而是创建的 SparkSession 对象(这里为 SQLContext 对象)的变量名称,所以必须先创建 SparkSession 对象再导入。
- 这里 sqlContext 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。
SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。
package sparksql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object DataFrametoRDDofReflection { def main(args: Array[String]): Unit = { } def method1():Unit = { val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) // 引入 sqlContext.implicits._ import sqlContext.implicits._ // 将 RDD 转成 DataFrame /*val people = sc.textFile("people.txt").toDF()*/ val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF() people.show() people.registerTempTable("people") val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19") teenagers.show() // DataFrame 转成 RDD 进行操作:根据索引号取值 teenagers.map(t=>"Name:" + t(0)).collect().foreach(println) // DataFrame 转成 RDD 进行操作:根据字段名称取值 teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println) // DataFrame 转成 RDD 进行操作:一次返回多列的值 teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println) sc.stop() } /** * 定义 Person 类 * @param name 姓名 * @param age 年龄 */ case class Person(name:String,age:Int) }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
② 通过编程接口执行 Scheme
通过 Spark SQL 的接口创建 RDD 的 ,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:
- 第一步将 RDD 转为包含 row 对象的 RDD
- 第二步基于 structType 类型创建 Schema,与第一步创建的 RDD 想匹配
- 第三步通过 SQLContext 的 createDataFrame 方法对第一步的 RDD 应用 Schema
package sparksql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object DataFrametoRDDofInterface { def main(args: Array[String]): Unit = { method2() } def method2(): Unit = { val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val people = sc.textFile("people.txt") // 以字符串的方式定义 DataFrame 的 Schema 信息 val schemaString = "name age" // 导入所需要的类 import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType} // 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema val schema = StructType( schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true))) // 将 RDD 转换成 Row val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim)) // 将 Schema 作用到 RDD 上 val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema) // 将 DataFrame 注册成临时表 peopleDataFrame.registerTempTable("people") // 获取 name 字段的值 val results = sqlContext.sql("SELECT name FROM people") results.map(t => "Name" + t(0)).collect().foreach(println) sc.stop() } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48