app开发定制sparkSql数据离线处理--整理记录

sparkSqlapp开发定制数据离线处理

前言:app开发定制本文作为本人学习sparkSqlapp开发定制离线数据抽取,app开发定制离线数据处理的学习整理记录,app开发定制文中参考博客均附上原文链接。

一、Hive环境准备

1、app开发定制配置文件准备:

/opt/hive/conf/hive-site.xml:(2021/12/31修改,添加了&useSSL=false&useUnicode=true&characterEncoding=utf8app开发定制支持中文编码)

<?xml version="1.0" encoding="UTF-8" standalone="no"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration>  <property>    <name>javax.jdo.option.ConnectionURL</name>    <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8</value>    <description>hiveapp开发定制的元数据库 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionDriverName</name>    <value>com.mysql.jdbc.Driver</value>    <description>mysql的驱动jar包 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionUserName</name>    <value>root</value>    <description>app开发定制设定数据库的用户名 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionPassword</name>    <value>xxx</value>    <description>app开发定制设定数据库的密码</description>   </property><!--zbt添加-->   <property>      <name>hive.exec.max.dynamic.partitions</name>      <value>100000</value>      <description>app开发定制app开发定制在所有执行MR的节点上,app开发定制最大一共可以创建多少app开发定制个动态分区</description>   </property>   <property>      <name>hive.exec.max.dynamic.partitions.pernode</name>      <value>100000</value>      <description>在所有执行MR的节点上,app开发定制最大可以创建多少个动态分区</description>  </property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

若要在ideaapp开发定制环境下运行要把

hdfs-site.xml

core-site.xml

hive-site.xml

放到resources文件夹中

否则hive.exec.max.dynamic.partitions.pernode,hive.exec.max.dynamic.partitions

app开发定制配置不生效

2、hosts设置

app开发定制若在不同网络环境下

app开发定制需设置本地hosts

app开发定制设置的内容为集群主机名

Ubuntu的hosts文件在 /etc

参考资料:

3、app开发定制远程连接服务开启

hive --service metastore

参考资料:

4、其他

mysql服务启动

service mysqld start

app开发定制防火墙关闭

systemctl stop firewalld

二、IDEA环境准备

1、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>sparkDome1</artifactId>        <groupId>org.example</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>HiveAndMysql</artifactId>    <properties>        <maven.compiler.source>8</maven.compiler.source>        <maven.compiler.target>8</maven.compiler.target>        <hadoop.version>2.7.7</hadoop.version>        <spark.version>2.1.1</spark.version>        <scala.version>2.11</scala.version>    </properties>    <dependencies>        <!--hadoop依赖-->        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-client</artifactId>            <version>${hadoop.version}</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-common</artifactId>            <version>${hadoop.version}</version>        </dependency>        <dependency>            <groupId>org.apache.hadoop</groupId>            <artifactId>hadoop-hdfs</artifactId>            <version>${hadoop.version}</version>        </dependency>        <!--scala依赖-->        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>2.11.0</version>        </dependency>        <!--spark依赖-->        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-mllib_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <!--hive依赖-->        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-hive_${scala.version}</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.48</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.scala-tools</groupId>                <artifactId>maven-scala-plugin</artifactId>                <version>2.15.2</version>                <executions>                    <execution>                        <id>scala-compile</id>                        <goals>                            <goal>compile</goal>                        </goals>                        <configuration>                            <!--includesapp开发定制是一个数组,app开发定制包含要编译的code-->                            <includes>                                <include>**/*.scala</include>                            </includes>                        </configuration>                    </execution>                    <execution>                        <id>scala-test-compile</id>                        <goals>                            <goal>testCompile</goal>                        </goals>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110

2、Hadoop环境

windowapp开发定制下运行需要准备Hadoop环境

在代码编写中指定hadoop.home.dir

System.setProperty("hadoop.home.dir","........")

3、其他

Scala插件依赖需先下载好

注意环境与集群对应,本文档的环境为Scala-11

三、代码编写

1、全量抽取

import org.apache.spark.sql.SparkSession/**   * mysql->hive 全量抽取   */   object ShopTest {  def main(args: Array[String]): Unit = {	//设置用户名,防止因为权限不足无法创建文件    System.setProperty("HADOOP_USER_NAME", "root")    //获取实例对象    val spark = SparkSession.builder()      .appName("ShopTest")      .master("local[*]")      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")      .enableHiveSupport()      .getOrCreate()    //jdbc连接配置    val mysqlMap = Map(      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false",      "user" -> "root",      "password" -> "xxx",      "driver" -> "com.mysql.jdbc.Driver"    )    //使用jdbc抽取mysql表数据    val inputTable = spark.read.format("jdbc")      .options(mysqlMap)      .option("dbtable", "EcData_tb_1")      .load()        //    inputTable.show()          //将mysql表数据创建为临时表    inputTable.createOrReplaceTempView("inputTable")    //hive动态分区开启    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")    //hive分区模式设置,默认为strict严格模式,若设置分区必须要有一个静态分区    //需要设置为nonstrict模式,可以都是动态分区    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")    //hive分区数设置,目前版本已无法在程序中设置,参考上文Hive环境准备-配置文件准备    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions.pernode = 10000")    spark.sqlContext.sql("set hive.exec.max.dynamic.partitions = 10000")    // mysql表结构,通过desc table tb_name;命令可获取    /*    +-------------+---------+------+-----+---------+-------+        | Field       | Type    | Null | Key | Default | Extra |        +-------------+---------+------+-----+---------+-------+        | InvoiceNo   | text    | YES  |     | NULL    |       |        | StockCode   | text    | YES  |     | NULL    |       |        | Description | text    | YES  |     | NULL    |       |        | Quantity    | int(11) | YES  |     | NULL    |       |        | InvoiceDate | text    | YES  |     | NULL    |       |        | UnitPrice   | double  | YES  |     | NULL    |       |        | CustomerID  | int(11) | YES  |     | NULL    |       |        | Country     | text    | YES  |     | NULL    |       |        +-------------+---------+------+-----+---------+-------+*/        //于hive数据库,ods层中创建表    spark.sqlContext.sql(      """        |create table if not exists clown_test_db.ShopTest_ods_tb_1        |(        | InvoiceNo string ,        | StockCode string ,        | Description string ,        | Quantity int ,        | InvoiceDate string ,        | UnitPrice double ,        | CustomerID int ,        | Country string        |)        |partitioned by (country_pid string,customer_pid int)        |row format delimited        |fields terminated by '\t'  //本数据中字段值存在','不能用','作为分隔符        |lines terminated by ''        |stored as textfile        |""".stripMargin)          //使用sql-insert into 语句将mysql数据全部导入hive表中    spark.sqlContext.sql(      """        |insert into table clown_test_db.ShopTest_ods_tb_1 partition(country_pid,customer_pid)        |select *,Country,CustomerID from inputTable        |""".stripMargin)  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

2、增量抽取

import java.text.SimpleDateFormatimport org.apache.spark.sql.{SaveMode, SparkSession}/** * hive_ods -> hive_dwd 增量抽取   */   object ShopTest2 {  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME","root")    val spark = SparkSession.builder()      .appName("ShopTest2")      .master("local[*]")      .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")      .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")      .enableHiveSupport()      .getOrCreate()        /*    +-------------+---------+------+-----+---------+-------+    | Field       | Type    | Null | Key | Default | Extra |    +-------------+---------+------+-----+---------+-------+    | InvoiceNo   | text    | YES  |     | NULL    |       |    | StockCode   | text    | YES  |     | NULL    |       |    | Description | text    | YES  |     | NULL    |       |    | Quantity    | int(11) | YES  |     | NULL    |       |    | InvoiceDate | text    | YES  |     | NULL    |       |    | UnitPrice   | double  | YES  |     | NULL    |       |    | CustomerID  | int(11) | YES  |     | NULL    |       |    | Country     | text    | YES  |     | NULL    |       |    +-------------+---------+------+-----+---------+-------+*/	//隐式转换,sql方法导入    import spark.implicits._    import org.apache.spark.sql.functions._        spark.sqlContext.sql("set hive.exec.dynamic.partition = true")    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")        //直接通过sql语句获取到hive ods层中的表数据    val inputData = spark.sqlContext.sql("select * from clown_test_db.ShopTest_ods_tb_1")        //设置时间条件    val timeStr = "2011/01/01 00:00"    val timeTemp = new SimpleDateFormat("yyyy/MM/dd HH:mm").parse(timeStr).getTime//单位为ms    println(timeTemp)        //未转换前的数据格式为:12/8/2010 9:53    val timeFormat = inputData      .withColumn("InvoiceDate",unix_timestamp($"InvoiceDate","MM/dd/yyyy HH:mm"))//时间戳获取,单位为s      .where(s"InvoiceDate>$timeTemp/1000")//增量条件判断      .withColumn("InvoiceDate",from_unixtime($"InvoiceDate","yyyy/MM/dd HH:mm"))//时间格式转换      .where("Country='United Kingdom' or Country = 'Finland'")//筛选出国家名为United Kingdom 或 Finland的数据        //由于该ods层表与目标dwd层表结构相同,直接用like语句创建结构相同的dwd表    spark.sqlContext.sql(      """        |create table if not exists clown_dwd_db.shoptest_dwd_tb_1        |like clown_test_db.ShopTest_ods_tb_1        |""".stripMargin)    //使用sparkSql算子将数据由ods表数据增量抽取到dwd表中    timeFormat.write.format("hive")      .mode(SaveMode.Append)      .insertInto("clown_dwd_db.shoptest_dwd_tb_1")  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

3、数据清洗

import org.apache.spark.sql.{SaveMode, SparkSession}/** * hive_dwd->hive_dwd 缺失值剔除与填充 */object ShopTest3 { /*+-------------+---------+------+-----+---------+-------+   | Field       | Type    | Null | Key | Default | Extra |   +-------------+---------+------+-----+---------+-------+   | InvoiceNo   | text    | YES  |     | NULL    |       |   | StockCode   | text    | YES  |     | NULL    |       |   | Description | text    | YES  |     | NULL    |       |   | Quantity    | int(11) | YES  |     | NULL    |       |   | InvoiceDate | text    | YES  |     | NULL    |       |   | UnitPrice   | double  | YES  |     | NULL    |       |   | CustomerID  | int(11) | YES  |     | NULL    |       |   | Country     | text    | YES  |     | NULL    |       |   +-------------+---------+------+-----+---------+-------+*/def main(args: Array[String]): Unit = {   System.setProperty("HADOOP_USER_NAME","root")   val spark = SparkSession.builder()     .appName("ShopTest3")     .master("local[*]")     .config("spark.sql.warehouse.dir","hdfs://xx.xxx.x.x:8020/user/hive/warehouse")     .config("hive.metastore.uris","thrift://xx.xxx.x.x:9083")     .enableHiveSupport()     .getOrCreate()   import spark.implicits._   import org.apache.spark.sql.functions._   spark.sqlContext.sql("set hive.exec.dynamic.partition = true")   spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")   val data = spark.sqlContext.sql("select * from clown_dwd_db.shoptest_dwd_tb_1")   spark.sqlContext.sql(     """       |create table if not exists clown_dwd_db.shopTest_dwd_tb_3       |(       | InvoiceNo string ,       | StockCode string ,       | Description string ,       | Quantity int ,       | InvoiceDate string ,       | UnitPrice double ,       | CustomerID int ,       | Country string       |)       |partitioned by (country_pid string)       |row format delimited       |fields terminated by '\t'       |lines terminated by ''       |stored as textfile       |""".stripMargin)  //使用na.fill对缺失值进行填充  //使用na.drop对缺失值进行剔除   data.na.fill(     Map(       "Country"->"Country_Null",       "CustomerID"->0     )   )     .na.drop(     Seq("UnitPrice","Quantity")   )      .selectExpr("InvoiceNo","StockCode","Description","Quantity","InvoiceDate","UnitPrice","CustomerID","Country","Country")//由于数据中存在分区表字段,且该字段关联数据已改变,需要重新进行赋值     .limit(10000)     .write     .format("hive")     .mode(SaveMode.Append)     .insertInto("clown_dwd_db.shopTest_dwd_tb_3")  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

4、指标计算

import org.apache.spark.sql.SparkSession/**  * sparkSql算子实现指标计算  */object ShopTest4 {  /*    +-------------+---------+------+-----+---------+-------+| Field       | Type    | Null | Key | Default | Extra |+-------------+---------+------+-----+---------+-------+| InvoiceNo   | text    | YES  |     | NULL    |       || StockCode   | text    | YES  |     | NULL    |       || Description | text    | YES  |     | NULL    |       || Quantity    | int(11) | YES  |     | NULL    |       || InvoiceDate | text    | YES  |     | NULL    |       || UnitPrice   | double  | YES  |     | NULL    |       || CustomerID  | int(11) | YES  |     | NULL    |       || Country     | text    | YES  |     | NULL    |       |+-------------+---------+------+-----+---------+-------+*/  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "root")        val spark = SparkSession.builder()      .appName("ShopTest4")      .master("local[*]")      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")      .enableHiveSupport()      .getOrCreate()        import spark.implicits._    import org.apache.spark.sql.functions._        spark.sqlContext.sql("set hive.exec.dynamic.partition = true")    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")        val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")        /**     * 统计每个国家的客户数,输出结果。     * 排序后输出客户最多的10个国家     */            data.dropDuplicates("CustomerID","Country")//去重          .withColumn("x",lit(1))//添加一列数据都为1          .groupBy("Country")//聚合国家字段          .sum("x")//对1数据字段进行累加          .show(20)        /**     * 统计各个国家的总销售额分布情况     */        data.withColumn("x", $"Quantity" * $"UnitPrice")//添加销售额字段,值为数量*单价      .groupBy("Country")//聚合国家字段      .sum("x")//计算总销售额      .withColumn("sum(x)", round($"sum(x)", 2))//对结果字段进行四舍五入到两位,但round会对最后一位0省略,最好使用其他函数    /*若题目要求输出格式可进行rdd转换      .rdd      .map(x=>x.mkString(","))      .foreach(println(_))    */    /**     * 统计每种商品的销量,输出结果     * 排序后输出销量最高的10种商品     */    data.groupBy("StockCode")//聚合商品编码字段      .sum("Quantity")//计算销量      .coalesce(1)//将spark分区设置为1,防止后面排序混乱      .orderBy(desc("sum(Quantity)"))//由大到小排序      .show(10)        /**     * 统计月销售额随时间的变化趋势     * [月份,销售额]     */    data.withColumn("InvoiceDate",substring_index($"InvoiceDate","/",2))//由于数据在增量抽取阶段已进行时间格式转换,可直接进行切割得出 年份/月份 的格式,substring_index与split不同      .withColumn("x",$"Quantity"*$"UnitPrice")//计算销售额      .groupBy("InvoiceDate")//对月份进行聚合      .sum("x")//计算总销售额      .coalesce(1)//设置spark分区为1      .orderBy(desc("InvoiceDate"))//由大到小排序      .withColumn("sum(x)",round($"sum(x)",2))//四舍五入到2位      .show(100)        /**     * 统计商品描述中,排名前300(Top300)的热门关键词     */    data.select(col("Description"))//商品将描述字段单独查询      .flatMap(x=>x.toString().split("\\W"))//进行flatMap 切割后展平,切割\\W为正则匹配模式,匹配所有符号      .withColumn("x",lit(1))//增加1的数据列      .groupBy("value")//展平后字段名为value,进行聚合      .sum("x")//累加1数据      .where("value != '' ")//筛除空白数据      .coalesce(1)//设置spark分区为1      .orderBy(desc("sum(x)"))//由大到小排序      .show(300)//展示300条  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
import org.apache.spark.sql.SparkSession/**  *  sql语句实现指标计算  */object ShopTest5 {  /*    +-------------+---------+------+-----+---------+-------+| Field       | Type    | Null | Key | Default | Extra |+-------------+---------+------+-----+---------+-------+| InvoiceNo   | text    | YES  |     | NULL    |       || StockCode   | text    | YES  |     | NULL    |       || Description | text    | YES  |     | NULL    |       || Quantity    | int(11) | YES  |     | NULL    |       || InvoiceDate | text    | YES  |     | NULL    |       || UnitPrice   | double  | YES  |     | NULL    |       || CustomerID  | int(11) | YES  |     | NULL    |       || Country     | text    | YES  |     | NULL    |       |+-------------+---------+------+-----+---------+-------+*/  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "root")        val spark = SparkSession.builder()      .appName("ShopTest5")      .master("local[*]")      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")      .enableHiveSupport()      .getOrCreate()        import spark.implicits._    import org.apache.spark.sql.functions._        spark.sqlContext.sql("set hive.exec.dynamic.partition = true")    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")        val data = spark.sqlContext.sql("select * from clown_dwd_db.shopTest_dwd_tb_3")        data.createOrReplaceTempView("dataTable")        /**     * 统计每个国家的客户数,输出结果。     * 排序后输出客户最多的10个国家     */   //对去重后的Country,CustomerID进行聚合统计即可得出各个国家的客户数   spark.sqlContext.sql(     """       |select Country,count(distinct Country,CustomerID) from dataTable group by Country       |""".stripMargin)     .show()    /**     * 统计各个国家的总销售额分布情况     */        spark.sqlContext.sql(      """        |select Country ,round(sum(Quantity*UnitPrice),2)        |from dataTable        |group by Country        |""".stripMargin)      .show()        /**     * 统计每种商品的销量,输出结果     * 排序后输出销量最高的10种商品     */        spark.sqlContext.sql(      """        |select StockCode,round(sum(Quantity*UnitPrice),2) as xl        |from dataTable        |group by StockCode        |order by xl desc        |""".stripMargin)      .show(10)        /**     * 统计月销售额随时间的变化趋势     * [月份,销售额]     */        //group by执行优先度可能高于 as 重命名,因此as后的名字无法用于group by 聚合    spark.sqlContext.sql(      """        |select substring_index(InvoiceDate,"/",2) as time,round(sum(Quantity*UnitPrice),2) as sum        |from dataTable        |group by substring_index(InvoiceDate,"/",2)        |order by substring_index(InvoiceDate,"/",2)        |""".stripMargin)      .show()        /**     * 统计商品描述中,排名前300(Top300)的热门关键词     */        //目前认为该题用sql解法没有必要      //- -   }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103

四、其他

1、hive分区的增删改查

参考资料:

hive表新增分区:[]内的不必要

alter table tb_name add partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 
  • 1

多个分区

alter table tb_name add partition (pid1 = ‘’,pid2 = ) partition (pid1 = ‘’,pid2 = ) [location ‘xxx’] 
  • 1

hive表修改分区:

alter table tb_name partition(pid1='') rename to partition(pid1='');/*修改分区名*/alter table tb_name partition(pid1='') set location 'hdfs://master:8020/....';/*修改分区路径,注意使用绝对路径*/  alter table tb_name partition column (pid1 string);/*修改分区字段数据类型*/
  • 1
  • 2
  • 3

hive表删除分区:

alter table tb_name drop partition (pid1 = ‘’,pid2 = )[ partition (pid1 = ‘’,pid2 = )] 
  • 1

hive分区值查询:

show partitions tb_name;
  • 1

2、打包运行

命令:

spark-submit --class ShopTest4 --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

若使用了jdbc连接,需要指明驱动jar包 mysql-connector-java-5.1.48.jar

spark-submit --jars mysql-connector-java-5.1.48.jar --class ShopTest --master spark://xx.xxx.x.x:7077 --executor-memory 512m --total-executor-cores 1 untitled-1.0-SNAPSHOT.jar

或者将mysql驱动放至 $‘spark_home’/jars 目录下

3、时间格式

时间模式字符串用来指定时间格式。在此模式中,所有的 ASCII 字母被保留为模式字母,定义如下:

字母描述示例
G纪元标记AD
y四位年份2001
M月份July or 07
d一个月的日期10
hA.M./P.M. (1~12)格式小时12
H一天中的小时 (0~23)22
m分钟数30
s秒数55
S毫秒数234
E星期几Tuesday
D一年中的日子360
F一个月中第几周的周几2 (second Wed. in July)
w一年中第几周40
W一个月中第几周1
aA.M./P.M. 标记PM
k一天中的小时(1~24)24
KA.M./P.M. (0~11)格式小时10
z时区Eastern Standard Time
文字定界符Delimiter
"单引号`

4、Scala正则表达式

Scala 的正则表达式继承了 Java 的语法规则,Java 则大部分使用了 Perl 语言的规则。

下表我们给出了常用的一些正则表达式规则:(注意:\需要转义,算子中写为\,sql语句中写为\\\)

表达式匹配规则
^匹配输入字符串开始的位置。
$匹配输入字符串结尾的位置。
.匹配除"\r\"之外的任何单个字符。
[…]字符集。匹配包含的任一字符。例如,"[abc]“匹配"plain"中的"a”。
[^…]反向字符集。匹配未包含的任何字符。例如,"[^abc]“匹配"plain"中"p”,“l”,“i”,“n”。
\A匹配输入字符串开始的位置(无多行支持)
\z字符串结尾(类似$,但不受处理多行选项的影响)
\Z字符串结尾或行尾(不受处理多行选项的影响)
re*重复零次或更多次
re+重复一次或更多次
re?重复零次或一次
re{ n}重复n次
re{ n,}
re{ n, m}重复n到m次
a|b匹配 a 或者 b
(re)匹配 re,并捕获文本到自动命名的组里
(?: re)匹配 re,不捕获匹配的文本,也不给此分组分配组号
(?> re)贪婪子表达式
\w匹配字母或数字或下划线或汉字
\W匹配任意不是字母,数字,下划线,汉字的字符
\s匹配任意的空白符,相等于 [\t\ \f]
\S匹配任意不是空白符的字符
\d匹配数字,类似 [0-9]
\D匹配任意非数字的字符
\G当前搜索的开头
换行符
\b通常是单词分界位置,但如果在字符类里使用代表退格
\B匹配不是单词开头或结束的位置
\t制表符
\Q开始引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。
\E结束引号:\Q(a+b)*3\E 可匹配文本 “(a+b)*3”。

正则表达式实例

实例描述
.匹配除"\r\"之外的任何单个字符。
[Rr]uby匹配 “Ruby” 或 “ruby”
rub[ye]匹配 “ruby” 或 “rube”
[aeiou]匹配小写字母 :aeiou
[0-9]匹配任何数字,类似 [0123456789]
[a-z]匹配任何 ASCII 小写字母
[A-Z]匹配任何 ASCII 大写字母
[a-zA-Z0-9]匹配数字,大小写字母
[^aeiou]匹配除了 aeiou 其他字符
[^0-9]匹配除了数字的其他字符
\d匹配数字,类似: [0-9]
\D匹配非数字,类似: [^0-9]
\s匹配空格,类似: [ \t\r\ ]
\S匹配非空格,类似: [^ \t\r\ ]
\w匹配字母,数字,下划线,类似: [A-Za-z0-9_]
\W匹配非字母,数字,下划线,类似: [^A-Za-z0-9_]
ruby?匹配 “rub” 或 “ruby”: y 是可选的
ruby*匹配 “rub” 加上 0 个或多个的 y。
ruby+匹配 “rub” 加上 1 个或多个的 y。
\d{3}刚好匹配 3 个数字。
\d{3,}匹配 3 个或多个数字。
\d{3,5}匹配 3 个、4 个或 5 个数字。
\D\d+无分组: + 重复 \d
(\D\d)+/分组: + 重复 \D\d 对
([Rr]uby(, )?)+匹配 “Ruby”、“Ruby, ruby, ruby”,等等

常用可以应用正则的函数:

.split(“”)切割字符串

.regexp_extract(string subject, string pattern, int index) 将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符

.regexp_replace(string A, string B, string C) 将字符串A中的符合正则表达式B的部分替换为C

.equals(“”)匹配

5、SQL like与rlike

like为通配符匹配,不是正则

%:匹配零个及多个任意字符

_:与任意单字符匹配

[]:匹配一个范围

[^]:排除一个范围

rlike为正则匹配

regexp与rlike功能相似

参考资料:

6、中文数据

关于csv文件若包含中文,可在读取时设置option参数

/** * 注意option的设置 * 读取本地文件需要加上file:///否则默认读hdfs文件 */val inputData = spark.sqlContext.read.format("csv")  .option("sep","\t")  .option("encoding","GBK")  .option("header","true") .load("file:///C:\\Users\\61907\\Desktop\\BigData\\Spark\\sparkDome1\\HiveAndMysql\\src\\main\\resources\\cov19.csv")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

jdbc读取数据库数据时,若有中文需设置jdbc连接参数

&useUnicode=true&characterEncoding=utf8

//    jdbc中文编码设置    val mysqlMap = Map(      "url"->"jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",      "user"->"root",      "password"->"xxx",      "driver"->"com.mysql.jdbc.Driver"    )
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

关于hive中存储中文数据,中文注释,中文分区(索引)

Ⅰ~Ⅲ参考资料:

Ⅰ.元数据库设置

元数据库需设置为utf-8编码

##创建hive元数据库hive,并指定utf-8编码格式mysql>create database hive DEFAULT CHARSET utf8 COLLATE utf8_general_ci;##修改已存在的hive元数据库,字符编码格式为utf-8mysql>alter database hive character set utf8;     ##进入hive元数据库mysql>use hive;##查看元数据库字符编码格式mysql>show variables like 'character_set_database';  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

Ⅱ.相关表设置

1).修改字段注释字符集

mysql>alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
  • 1

2).修改表注释字符集

mysql>alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
  • 1

类似的,PARAM_KEY若需要中文也可设置为utf8

3).修改分区表参数,以支持分区能够用中文表示

mysql>alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;mysql>alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
  • 1
  • 2
  • 3
  • 4

另外,PARTITIONS表中存放分区名的字段也需要修改为utf8

mysql>alter table PARTITIONS modify column PART_name varchar(4000) character set utf8;
  • 1

4).修改索引注解

mysql>alter table INDEX_PARAMS modify column PARAM_VALUE varchar(250) character set utf8;
  • 1

Ⅲ.hive-site.xml配置文件设置

需要在jdbc连接中设置支持中文编码

&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8

其中&需要使用&amp;转义

参考资料:

/opt/hive/conf/hive-site.xml:

<?xml version="1.0" encoding="UTF-8" standalone="no"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration>  <property>    <name>javax.jdo.option.ConnectionURL</name>    <value>jdbc:mysql://localhost:3306/hive_demo?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=utf8</value>    <description>hive的元数据库 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionDriverName</name>    <value>com.mysql.jdbc.Driver</value>    <description>mysql的驱动jar包 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionUserName</name>    <value>root</value>    <description>设定数据库的用户名 </description>  </property>  <property>    <name>javax.jdo.option.ConnectionPassword</name>    <value>xxx</value>    <description>设定数据库的密码</description>   </property><!--zbt添加-->   <property>      <name>hive.exec.max.dynamic.partitions</name>      <value>100000</value>      <description>在所有执行MR的节点上,最大一共可以创建多少个动态分区</description>   </property>   <property>      <name>hive.exec.max.dynamic.partitions.pernode</name>      <value>100000</value>      <description>在所有执行MR的节点上,最大可以创建多少个动态分区</description>  </property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

Ⅳ.未解决问题

hdfs文件系统中显示

虽然正常显示中文但在文件夹中会出现

Path does not exist on HDFS or WebHDFS is disabled. Please check your path or enable WebHDFS

可能是中文路径导致的错误,但该错误目前未影响到分区表的正常操作,具体影响仍需实验。

Ⅴ.暴力脚本- -

参考资料:

alter database hive_meta default character set utf8;alter table BUCKETING_COLS default character set utf8;alter table CDS default character set utf8;alter table COLUMNS_V2 default character set utf8;alter table DATABASE_PARAMS default character set utf8;alter table DBS default character set utf8;alter table FUNCS default character set utf8;alter table FUNC_RU default character set utf8;alter table GLOBAL_PRIVS default character set utf8;alter table PARTITIONS default character set utf8;alter table PARTITION_KEYS default character set utf8;alter table PARTITION_KEY_VALS default character set utf8;alter table PARTITION_PARAMS default character set utf8;-- alter table PART_COL_STATS default character set utf8;alter table ROLES default character set utf8;alter table SDS default character set utf8;alter table SD_PARAMS default character set utf8;alter table SEQUENCE_TABLE default character set utf8;alter table SERDES default character set utf8;alter table SERDE_PARAMS default character set utf8;alter table SKEWED_COL_NAMES default character set utf8;alter table SKEWED_COL_VALUE_LOC_MAP default character set utf8;alter table SKEWED_STRING_LIST default character set utf8;alter table SKEWED_STRING_LIST_VALUES default character set utf8;alter table SKEWED_VALUES default character set utf8;alter table SORT_COLS default character set utf8;alter table TABLE_PARAMS default character set utf8;alter table TAB_COL_STATS default character set utf8;alter table TBLS default character set utf8;alter table VERSION default character set utf8;alter table BUCKETING_COLS convert to character set utf8;alter table CDS convert to character set utf8;alter table COLUMNS_V2 convert to character set utf8;alter table DATABASE_PARAMS convert to character set utf8;alter table DBS convert to character set utf8;alter table FUNCS convert to character set utf8;alter table FUNC_RU convert to character set utf8;alter table GLOBAL_PRIVS convert to character set utf8;alter table PARTITIONS convert to character set utf8;alter table PARTITION_KEYS convert to character set utf8;alter table PARTITION_KEY_VALS convert to character set utf8;alter table PARTITION_PARAMS convert to character set utf8;-- alter table PART_COL_STATS convert to character set utf8;alter table ROLES convert to character set utf8;alter table SDS convert to character set utf8;alter table SD_PARAMS convert to character set utf8;alter table SEQUENCE_TABLE convert to character set utf8;alter table SERDES convert to character set utf8;alter table SERDE_PARAMS convert to character set utf8;alter table SKEWED_COL_NAMES convert to character set utf8;alter table SKEWED_COL_VALUE_LOC_MAP convert to character set utf8;alter table SKEWED_STRING_LIST convert to character set utf8;alter table SKEWED_STRING_LIST_VALUES convert to character set utf8;alter table SKEWED_VALUES convert to character set utf8;alter table SORT_COLS convert to character set utf8;alter table TABLE_PARAMS convert to character set utf8;alter table TAB_COL_STATS convert to character set utf8;alter table TBLS convert to character set utf8;alter table VERSION convert to character set utf8;-- alter table PART_COL_STATS convert to character set utf8;SET character_set_client = utf8 ;-- SET character_set_connection = utf8 ;-- alter table PART_COL_STATS convert to character set utf8;SET character_set_database = utf8 ;SET character_set_results = utf8 ;SET character_set_server = utf8 ;-- SET collation_connection = utf8 ;-- SET collation_database = utf8 ;-- SET collation_server = utf8 ;SET NAMES 'utf8';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

只复制了博客中修改表字段的部分

看看就好,最好还是根据需求修改。

Ⅵ.实例

import org.apache.spark.sql.{SaveMode, SparkSession}object CNHivePartitionTest {  def main(args: Array[String]): Unit = {    System.setProperty("hadoop.home.dir", "D:\\BaiduNetdiskDownload\\hadoop-2.7.3")    System.setProperty("HADOOP_USER_NAME", "root")    val spark = SparkSession.builder()      .appName("Cov19DataDome4")      .master("local[*]")      .config("spark.sql.warehouse.dir", "hdfs://xx.xxx.x.x:8020/user/hive/warehouse")      .config("hive.metastore.uris", "thrift://xx.xxx.x.x:9083")      .enableHiveSupport()      .getOrCreate()    import spark.implicits._    import org.apache.spark.sql.functions._    spark.sqlContext.sql("set hive.exec.dynamic.partition = true")    spark.sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")    val mysqlMap = Map(      "url" -> "jdbc:mysql://xx.xxx.x.x:3306/clown_db?useSSL=false&useUnicode=true&characterEncoding=utf8",      "user" -> "root",      "password" -> "xxx",      "driver" -> "com.mysql.jdbc.Driver"    )    val mysqlData = spark.read.format("jdbc")      .options(mysqlMap)      .option("dbtable","tc_hotel2")      .load()    spark.sqlContext.sql(      """        |create table if not exists clown_test_db.CNTest        |(        |  `hname` string,        |  `hbrand` string,        |  `province` string,        |  `city` string,        |  `starlevel` string,        |  `rating` string,        |  `comment_count` string,        |  `price` string        |)        |partitioned by (pid string)        |row format delimited        |fields terminated by '\t'        |lines terminated by ''        |stored as textfile        |""".stripMargin)    mysqlData      .select(col("*"),col("province"))      .write      .format("hive")      .mode(SaveMode.Append)      .insertInto("clown_test_db.CNTest")  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

7、表连接join/union

参考资料:https://blog.csdn.net/m0_37809146/article/details/91282446

val tb1 = spark.read.format("jdbc")  .options(mysqlMap)  .option("dbtable", "cov19_test_tb")  .load()val tb2 = spark.read.format("jdbc")  .options(mysqlMap)  .option("dbtable", "cov19_test_tb_2")  .load()  .withColumnRenamed("", "")/** * inner 交集,只会联合给出字段都存在的数据 */tb1.join(tb2, Seq("provinceName", "cityName"), "inner")//      .show(100)/** * right 右链接,展示右边表所有数据 */tb1.join(tb2, Seq("provinceName", "cityName"), "right")//      .show(100)/** * left 左链接,展示左边表所有数据 */tb1.join(tb2, Seq("provinceName", "cityName"), "left")//      .show(100)val testTb1 = tb1.withColumnRenamed("cityName", "tb1CN")val testTb2 = tb2.withColumnRenamed("cityName", "tb1CN")//默认 inner连接,进行连接的条件字段必须两边表都存在testTb1.join(testTb2, "tb1CN")//      .show()/** * right_outer 右外连接,相当于左连接 */tb1.join(tb2, Seq("provinceName", "cityName"), "right_outer")//      .show(100)/** * left_outer 左外连接,相当于右连接 */tb1.join(tb2, Seq("provinceName", "cityName"), "left_outer")//      .show(100)/** * 外连接 类似把左右连接出的集合加起来- - */tb1.join(tb2, Seq("provinceName", "cityName"), "outer")//      .show(100)/** * 全连接 */tb1.join(tb2, Seq("provinceName", "cityName"), "full")//      .show(100)/** * 全外连接 */tb1.join(tb2, Seq("provinceName", "cityName"), "full_outer")//      .show(100)/** * 交集 */tb1.join(tb2, Seq("provinceName", "cityName"), "left_semi")  .show(100)/** * 差集 */tb1.join(tb2, Seq("provinceName", "cityName"), "left_anti")  .show(100)/** * https://blog.csdn.net/wcc27857285/article/details/86439313 * 其他知识点: * HAVING 子句 * 在 SQL 中增加 HAVING 子句原因是,WHERE 关键字无法与聚合函数一起使用。 * * SQL HAVING 语法 * SELECT column_name, aggregate_function(column_name) * FROM table_name * WHERE column_name operator value * GROUP BY column_name * HAVING aggregate_function(column_name) operator value * * * --- JOIN ON * JOIN写连接字段 * ON写匹配条件 * */
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

8、自定义UDF,UDAF函数

[(17条消息)

9、数据集获取

UCI机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类、回归、聚类和推荐系统任务。数据集列表位于:http://archive.ics.uci.edu/ml/

Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问。这些数据集包括人类基因组项目、Common Craw网页语料库、维基百科数据和Google Books Ngrams。相关信息可参见:http://aws.amazon.com/publicdatasets/

Kaggle:这里集合了Kaggle举行的各种机器学习竞赛所用的数据集。它们覆盖分类、回归、排名、推荐系统以及图像分析领域,可从

Competitions区域下载:http://www.kaggle.com/competitions

KDnuggets:这里包含一个详细的公开数据集列表,其中一些上面提到过的。该列表位于:http://www.kdnuggets.com/datasets/index.html

10、数仓分层概念

参考资料:

五、实战复盘

1、2022/1/3

题目:

数据源:

csv文件(未修改)

mysql表格(增加脏数据)

环境准备:

1.mysql数据表格 2.hive目标表 3.pom文件

完成速度:

3h+

遇到问题:

1.data->mysql,数据保存

SaveMode.Overwrite 保存至mysql数据库,不仅会覆盖数据格式,字段名也会被覆盖

在做题途中遇到了保存SaveMode.Append失败的错误,修改为Overwrite 不报错,原因不明

是否解决:

出现错误

Unknown column 'sum' in 'field list'
  • 1

原因是字段名与mysql数据库目标表中的字段名不同

修改字段名相同即可

.withColumnRenamed("sum","total_price")
  • 1

在hive中是否有相同特性?

2.Join等表连接的使用

Join,union仍不熟悉 select子查询也比较生疏

是否解决: ✔?

join理解下图足够

union联合要求字段相同 否则报错

3.Date计算

参考资料:https://blog.csdn.net/wybshyy/article/details/52064337

使用datediff不需要转换时间格式

是否解决:

参考资料:

datediff 计算两个时间差天数 结果返回一个整数

对时间格式可能有要求例如‘2021/1/4‘这样的时间格式无法被计算(sql中,算子貌似没有这个问题)

sql写法:

spark.sql(  """    |select datediff('2021-1-4','2020-12-30')    |""".stripMargin).show()
  • 1
  • 2
  • 3
  • 4

算子写法:

.withColumn("o",datediff(col("delivery_date"),col("order_date")))
  • 1

months_between计算两个时间差月数 结果返回一个浮点数

sql写法:

spark.sql(  """    |select months_between('2021-1-4','2020-12-30')    |""".stripMargin).show()
  • 1
  • 2
  • 3
  • 4

返回:0.16129032

若想返回整数月份可以将天数删除:

spark.sql(  """    |select months_between('2021-1','2020-12')    |""".stripMargin).show()
  • 1
  • 2
  • 3
  • 4

返回:1.0

算子写法:

.withColumn("o",months_between(col("delivery_date"),col("order_date")))
  • 1

直接用时间戳相减通过计算也可以

spark.sql(  """    |select (unix_timestamp('2022/1/1','yyyy/MM/dd') - unix_timestamp('2021/12/31','yyyy/MM/dd'))/60/60/24    |""".stripMargin).show()
  • 1
  • 2
  • 3
  • 4

2022-4-20补充:
第三部分数据源:
链接:https://pan.baidu.com/s/1U7BF0eDC56ea3XfcqejftA 提取码:zzzz
–来自百度网盘超级会员V5的分享
拿完点赞支持一下~

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发