应用系统定制开发20天学习Spark(0)之最简单版Spark入门

是一种由scala应用系统定制开发编写的快速、通用、应用系统定制开发可扩展的大数据分析引擎,应用系统定制开发所谓大数据分析主要是应用系统定制开发对大量数据进行分析处理,应用系统定制开发是目前大数据开发职业必备技能

一、简单介绍

下面是对spark应用系统定制开发的简单介绍,嗯,应用系统定制开发主要就是说下spark应用系统定制开发有多好多好的,应用系统定制开发不想看的可以直接去第二步

特点

1)快:与Hadoop的MapReduce相比,Spark应用系统定制开发基于内存的运算要快100倍以上,应用系统定制开发基于硬盘的运算也要快10倍以上。Spark应用系统定制开发实现了高效的DAG执行引擎,应用系统定制开发可以通过基于内存来高应用系统定制开发效处理数据流。应用系统定制开发计算的中间结果是存在应用系统定制开发于内存中的。

2)易用:Spark支持Java、Python和Scala的API,应用系统定制开发还支持超过80应用系统定制开发种高级算法,应用系统定制开发使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

3)通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

4)兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

二、20行代码进行spark入门

备注:当前环境是idea + jdk8,本文所有代码,内容基于之前有java编程基础。

1、jar包

首先在idea中创建一个普通maven项目,进入到项目pom文件下引入maven依赖,如果引入过了就不需要引入了。

<dependencies>        <!-- scala依赖 开始 -->        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-reflect</artifactId>            <version>2.12.8</version>            <scope>compile</scope>        </dependency>        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>2.12.8</version>            <scope>compile</scope>        </dependency>        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-compiler</artifactId>            <version>2.12.8</version>            <scope>compile</scope>        </dependency>        <!-- scala依赖 结束 -->        <!-- spark依赖 开始 -->        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql_2.12</artifactId>            <version>2.4.3</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-mllib_2.12</artifactId>            <version>2.4.3</version>        </dependency>        <!-- spark依赖 结束 -->    </dependencies>    <build>        <plugins>            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->            <plugin>                <groupId>net.alchim31.maven</groupId>                <artifactId>scala-maven-plugin</artifactId>                <version>3.4.6</version>                <executions>                    <execution>                        <!-- 声明绑定到 maven 的 compile 阶段 -->                        <goals>                            <goal>testCompile</goal>                        </goals>                    </execution>                </executions>            </plugin>            <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-assembly-plugin</artifactId>                <version>3.0.0</version>                <configuration>                    <descriptorRefs>                        <descriptorRef>jar-with-dependencies</descriptorRef>                    </descriptorRefs>                </configuration>                <executions>                    <execution>                        <id>make-assembly</id>                        <phase>package</phase>                        <goals>                            <goal>single</goal>                        </goals>                    </execution>                </executions>            </plugin>        </plugins>    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

2、创建数据集

在项目目录下创建data文件夹存放数据,创建word文件并输入下面的内容,

接下来的任务就是使用spark统计分析每个单词出现的数量

java~hadoop~javahtml~jsjavajs~jquery
  • 1
  • 2
  • 3
  • 4

3、创建WordCount.scala文件用于计算,注意文件类型为object,使用Spark的顺序大致为

​ 1、创建Spark上下文

​ 2、读取数据文件

​ 3、处理数据为合适的格式

​ 4、统计计算

​ 5、获得结果并保存

具体处理代码如下

object WordCount {  def main(args: Array[String]): Unit = {    // 第一个参数启动的方式,第二个参数启动任务的名称    val spark = new SparkContext("local",      "WordCount")    // 读取数据,word就是刚创建的文件    val data = spark.textFile("data/word")    // 方便计算的格式    val result = data      .flatMap(_.split("~")) // 使用~切分每条记录      .map((_,1)) // java,1 html,1     .countByKey() // 统计相同key值的数量    println(result)  }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

最后会打印每个单词出现的数量。

这个就是一个最为简单的Spark使用案例

三、代码运行流程介绍

上面代码是最基础的一个spark的案例代码,下面的内容是关于上述代码执行流程的介绍

3.1 常用关键词

在学习spark前,我们先整理一系列常用关键词,避免同学们看文章有不理解的地方

RDD

RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合,spark中的基本对象

缺陷:

​ 不支持细粒度的写和更新操作(如网络爬虫)
​ spark写数据是粗粒度的,所谓粗粒度,就是批量写入数据 (批量写)
​ 但是读数据是细粒度的也就是说可以一条条的读 (一条条读)
​ 不支持增量迭代计算,Flink支持

DAG有向无环图

无回路的有向图,任何顶点都无法经过若干条边回到该点,则这个图就没有环路
个人感觉:在spark中指的是各种RDD相互转换进行运算处理,最后得到结果

Transformation算子

transformation 转换,转型,指的是spark中的RDD转换的一系列方法,属于懒加载,不会立即执行,只有遇到action算子的时候才会执行

action算子

处理或者进行计算的方法

job

任务,当在程序中遇到一个action算子的时候,就会提交一个job,执行前面的一系列操作,一个任务有多个job,job按照串行执行

宽窄依赖

当算子在运行时会转换RDD,之前的RDD为父RDD,后面的子RDD,当一个父RDD进入到一个子RDD时,比如说map算子,成为窄依赖,如果RDD可通过一个或多个RDD进行转换生成,称为宽依赖,比如说groupByKey算子

stage(阶段)

一个job包含一个或多个stage。各个stage之间按照顺序执行。stage 的划分就是根据shuffle依赖进行的

stage 的划分:Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以taskSet的形式提交给TaskScheduler运行。

Task

Stage继续往下分解,就是Task,Task的数量其实就是stage的并行度,任务的最小运行单位,一个任务最终是task的形式运行在executor

Job,Stage,Task关系总结

Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;

Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;

Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。

worker

管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,一个worker进程会启动一个或多个executor线程来执行一个topology的component(Spout或Bolt)

Executor

spark任务(task)的执行单元,运行在worker上,实际上它是一个JVM进程,一组计算资源(cpu核心、memory)的集合。一个worker上的内存、cpu由多个executor共同分摊,spark应用启动的时候Executor同时启动,并伴随整个生命周期。主要核心功能有两个
1、负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程;
2、它们通过自身的块管理器( Block Manager)为用户程序中要求缓存的 RDD提供内存式存储。RDD 是直接缓存在 Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算

partition

spark数据的分区,大量数据分摊到各个节点的分区上进行处理

shuffle

一个job分为多个stage,当触发宽依赖后,进入到shuffle阶段,shuffle分为 map(Red ) 阶段和 reduce (Write )阶段。

map端任务数和分区个数一致,reduce端采用spark.default.parallelism 作为默认配置,如果没有则使用最后一个RDD分区数作为任务数

shuffle分为HashShuffle和SortShuffle

HashShuffle,对相同的 key 执行 hash 算法,从而将相同 key 都
写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task ,下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件 。

设置spark.shuffle.consolidateFiles 为true可以开启优化,会出现shuffleFileGroup ,执行时每一批task会复用shuffleFileGroup ,不创建新的磁盘文件,减少磁盘文件数量。

SortShuffe相对于HashShuffle增加了排序处理,spark1.2后默认使用SortShuffe,不同点在于每个task在进行shuffle操作的时候,虽然也会产生较多的临时磁盘文件,但是最后将所有临时文件合并(merge)成一个磁盘文件,并且有一个与之对应的索引文件

有两种运行方式,第一种普通模式,第二种bypass 模式(当read task的数量小于bypassMergeThreshold 参数的值的时候进入,或者不是聚合类shuffle算子的时候 )

bypass模式不同的地方:1、磁盘写机制不同;2、不会排序。由于不进行排序,这样可以节省这部分性能开销

master

管理集群和节点,不参与计算

worker

计算节点,进程本身不参与计算,和master汇报

管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,类似于包工头,管理分配新进程,做计算的服务

Driver

运行程序的main方法,属于程序入口,主要功能有 1、把用户程序转化为任务(job);2、向集群申请资源;3、负责作业的调度和解析;4、生成Stage并调度Task到Executor上

3.2 20行代码的处理流程

处理流程大致可以分为三部分,第一提交任务,第二查找对应资源,第二任务进行调度

程序启动有多种方式,运行方式取决于创建sparkContext对象时传入的master的值。可以使用本地运行的方式(一般测试代码时可以使用这个),

local:本地运行,一个进程,无并行 ; local[k]:k个进程运行; local[*]:进程数等于CPU核数

除了本地启动之外还可以使用集群方式启动,常规有三种,最常用的是YARN

启动方式特点方法
Standalone独立模式原生简单集群,自带完整服务spark://
Apache Mesos分布式资源管理框架mesos://
Hadoop YARN运行在Yarn【备注1】上,统一资源管理机制,根据driver位置不同分为yarn client和yarn clusteryarn client:Drive 运行在本地,Work运行在YRAN集群上,部署时使用–deploy-mode client; yarn cluster: Driver和Work都在集群上,部署时使用–deploy-mode cluster

备注1:YARN,Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

在我们上面的代码中使用的是local,也就是本地

val spark = new SparkContext("local","WordCount")
  • 1

​ 1、当运行到这里的时候,扫描依赖的文件和jar包,开始生成SparkContext,并初始化Driver端,以及准备好Executor,首先Driver 启动后向 Master 注册应用程序,Master 根据提交脚本的资源需求找到内部资源至少可以启动一个 Executor 的所有 Worker,然后在这些 Worker 之间分配 Executor,Worker上的 Executor 启动后会向 Driver 反向注册,所有的 Executor 注册完成后,环境初始化完成,开始执行任务

​ 2、Driver 开始执行 main 函数,构建DAG图,构建完毕后,将DAG图提交给DAGScheduler,DAGScheduler( DAG调度程序 )开始划分stage,按照 Action 算子开始划分 stage,上叙代码中

.countByKey()
  • 1

方法就属于Action算子,当执行到这一行时就会把上面的任务进行划分为一个stage,stage具有先后依赖,然后把stage中的TaskSet发送给TaskScheduler

​ 3、TaskScheduler(任务调度器)通过TaskSet(任务集)中的获得所有task,task的划分是按照宽窄依赖进行处理,就是下面这两行代码

.flatMap(_.split("~")) // 使用~切分每条记录.map((_,1)) // java,1 html,1
  • 1
  • 2

上面的算子都是和父级RDD属于1-1的关系,所以都是窄依赖。

宽依赖属于多个RDD进行整合拆分,触发shuffle涉及到网络传输,容易消耗资源。

​ 4、接下来把所有的task,数据,运行代码发给exector,exector将task放入线程池进行运行,将执行结果反馈给TaskScheduler,TaskScheduler再将结果反馈给DAGScheduler,直到全部任务运行结束后,释放所有的资源

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发