收款定制开发PySpark | Spark框架简述 | Spark环境搭建

文章目录


传送门:

  • 视频地址:

一.框架简述

1.Spark是什么

  定义:Apache Spark收款定制开发是用于大规模数据(large-scala data)收款定制开发处理的统一(unified)分析引擎(收款定制开发数据处理分析引擎)。

RDD 收款定制开发是一种分布式内存抽象,收款定制开发其使得程序员能够在大收款定制开发规模集群中做内存运算,收款定制开发并且有一定的容错方式。而这也
是整个 Spark 收款定制开发的核心数据结构,Spark 收款定制开发整个平台都围绕着RDD进行。

"分而治之"的思想,收款定制开发对数据的处理与计算,收款定制开发都是进行分区,收款定制开发对数据进行分布式任务收款定制开发分配来完成大规模数据集的计算。
Spark 借鉴了 MapReduce 收款定制开发思想发展而来,收款定制开发保留了其分布式并行计收款定制开发算的优点并改进了其明显的缺陷,收款定制开发让中间数据存储在内存收款定制开发中提高了运行速度、收款定制开发并提供丰富的操作数据的API收款定制开发提高了开发速度。

  Spark收款定制开发是一款分布式内存计算收款定制开发的统一分析引擎。其特点就是对收款定制开发任意类型的数据收款定制开发进行自定义计算。Spark可以计算:结构化、半结构化、收款定制开发非结构化等各种类型的数据结构,收款定制开发同时也支持使用Python、Java、Scala、R以及SQL收款定制开发语言去开发应用程序计算数据。Spark收款定制开发的适用面非常广泛,所以,被称之为 统一的(适用面广)收款定制开发的分析引擎(数据处理)。

2.Spark与Hadoop的对比

  Spark与Hadoop收款定制开发技术栈的对比:

Spark收款定制开发解决什么问题? 收款定制开发海量数据的计算,收款定制开发可以进行离线批处理和收款定制开发实时流计算
注:
线程是CPU收款定制开发的基本调度单位
收款定制开发一个进程一般包含多个线程, 收款定制开发一个进程下的多个线程收款定制开发共享进程的资源
收款定制开发不同进程之间的线程相互不可见
收款定制开发线程不能独立执行
收款定制开发一个线程可以创建和撤收款定制开发销另外一个线程

尽管Spark相对于Hadoop收款定制开发而言具有较大优势,但Spark收款定制开发并不能完全替代Hadoop

  • 计算层面,Spark相比较MR(MapReduce)收款定制开发有巨大的性能优势,收款定制开发但至今仍有许多计算工具基于MR构架,收款定制开发比如非常成熟的Hive
  • Spark仅做计算,而Hadoop收款定制开发生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据体系的核心架构。

Spark与Hadoop融合在一起,形成HDFS+YARN+Spark。仍然需要使用Hadoop中的存储(HDFS)和资源管理调度()。

3.Spark的四大特点

  1. 运行速度快
    由于Apache Spark支持内存计算,并且通过DAG(有向无环图)执行引擎支持无环数据流,所以官方宣称其在内存中的运算速度要比Hadoop的MapReduce快100倍,在硬盘中要快10倍。

    Spark处理数据与MapReduce处理数据相比,有如下两个不同点:

    • 其一、Spark处理数据时,可以将中间处理结果数据存储到内存中;
    • 其二、Spark 提供了非常丰富的算子(API), 可以做到复杂任务在一个Spark 程序中完成。
  2. 易于使用
    Spark支持了包括 Java、Scala、Python 、R和SQL语言在内的多种语言

  3. 通用性强
    在 Spark 的基础上,Spark 还提供了包括Spark SQL、Spark Streaming、MLib 及GraphX在内的多个工具库,我们可以在一个应用中无缝地使用这些工具库。

    • Spark SQL模块:通过SQL语言来完成结构化数据处理
    • Spark Streaming模块:完成流式数据处理
    • MLib模块:完后机器学习的数据计算
    • GraphX模块:完成图计算
  4. 多种模式运行
    Spark 支持多种运行方式,包括在 Hadoop 和 Mesos 上,也支持 Standalone的独立运行模式,同时也可以运行在云Kubernetes——容器(Spark 2.3开始支持)上。对于数据源而言,Spark 支持从HDFS、HBase、Cassandra 及 Kafka 等多种途径获取数据。

  • 文件系统: LocalFS、HDFS、Hive、text、parquet、orc、json、csv
  • 数据库RDBMs: mysql、Orade、mssql
  • NOSQL数据库:HBase、ES、Redisiv
  • 消息对象:Kafka

4.Spark模块

  整个Spark 框架模块包含: SparkCore、Spark SQL、SparkStreaming、Spark GraphX、Spark MLlib,而后四项的能力都是建立在核心引擎之上。

  • Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。
  • SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparhSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。
  • SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
  • MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
  • GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

5.Spark运行模式

  Spark提供多种运行模式,包括:本地模式、集群模式和云模式

  • 本地模式(单机)=> Local:用于开发和测试,下面的几个模式用于生产环境
    本地模式就是以一个独立的进程,通过其内部的多个线程来模拟整个Spark运行时环境
  • Standalone模式(集群)
    Spark中的各个角色以独立进程的形式存在,并组成Spark集群环境
  • Hadoop YARN模式(集群)
    Spark中的各个角色运行在YARN的容器内部,并组成Spark集群环境
  • Kubernetes模式(容器集群)
    Spark中的各个角色运行在Kubernetes的容器内部,并组成Spark集群环境
  • 云服务模式(运行在云平台上)
    提供EMR框架(弹性MapReduce)

6.Spark的架构角色


注:正常情况下Executor是干活的角色,不过在特殊场景下(Local模式)Driver可以即管理又干活

二、Spark环境搭建

1.服务器环境

  使用三台Linux虚拟机服务器来学习,三台虚拟机的功能分别是:

  • node1: Master(HDFS\YARN\Spark) 和 Worker(HDFS\ YARN\ Spark)
  • node2: Worker(HDFS\ YARN\ Spark)
  • node3: Worker(HDFS\ YARN\ Spark) 和 Hive

我使用的是课程中提供的虚拟机,没有自己搭建
课程资料中提供了3台虚拟机的压缩包, 同学们解压后导入VMWare即可

软件存放在/export/software
组件安装存放/export/server/

2.Local模式基本原理

  本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task。简单来讲,Local模式就是以一个独立进程配合其内部线程(每个线程模拟一个Spark服务器)来提供完成Spark运行时环境.。Local模式可以通过spark-shell/pyspark/spark-submit等来开启。

  • Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或 Local[*]
  • 其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N,则默认是1个线程(该线程有1个core)。 通常Cpu有几个Core,就指定几个线程,最大化利用计算能力。
  • 如果是local[*],则代表按照Cpu最多的Cores设置线程数。

Local下的角色分布

  • 资源管理:
    Master:Local进程本身
    Worker:Local进程本身
  • 任务执行:
    Driver:Local进程本身
    Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力

Driver也算一种特殊的Executor, 只不过多数时候, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)
注意: Local模式只能运行一个Spark程序, 如果执行多个Spark程序, 那就是由多个相互独立的Local进程在执行

3.安装包下载

  • anaconda3

    //安装文件夹/root/anaconda3
    • 1
    • 2


    每个虚拟机中有两个虚拟环境,pyspark_env可供课程使用

4.基础操作

  • 解压spark文件并建立软链接
  • 配置spark环境变量,

    配置Spark由如下5个环境变量需要设置,这5个环境变量 都需要配置在: /etc/profile

    • SPARK_HOME: 表示Spark安装路径在哪里
    • PYSPARK_PYTHON: 表示Spark想运行Python程序, 那么去哪里找python执行器
    • JAVA_HOME: 告知Spark Java在哪里
    • HADOOP_CONF_DIR: 告知Spark Hadoop的配置文件在哪里
    • HADOOP_HOME: 告知Spark Hadoop安装在哪里

    PYSPARK_PYTHON和 JAVA_HOME 需要同样配置在: /root/.bashrc

5.测试

  spark中代码展示如下:

bin/pyspark:利用bin目录下的pyspark来进行测试

bin/pyspark 程序, 可以提供一个 交互式的 Python解释器环境, 在这里面可以写普通python代码让spark执行


ctrl+d退出

这一步碰到一个:开启hadoop集群后,可以正常使用了

WEB UI (4040):
  Spark的任务在运行后,会在Driver所在机器绑定到4040端口,提供当前任务的监控页面供查看。如果4040端口被占用, 会顺延到4041 … 4042…
输入:服务器ip:4040 即可打开:

打开监控页面后, 可以发现 在程序内仅有一个Driver。因为我们是Local模式, Driver即管理 又干活。

spark-submit
  利用bin目录下的spark-submit来提交写好的代码到spark集群中运行


总结:pyspark/spark-shell/spark-submit 对比

三、Standalone环境搭建

1.Standalone 架构

  Standalone模式是Spark自带的一种集群模式,Standalone模式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。StandAlone 是完整的Spark运行环境,其中:

  • Master角色以Master进程存在, Worker角色以Worker进程存在
  • Driver和Executor运行于Worker进程内, 由Worker提供资源供给它们运行


StandAlone集群在进程上主要有3类进程:

  • 主节点Master进程:
    Master角色, 管理整个集群资源,并托管运行各个任务的Driver
  • 从节点Workers:
    Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task); 每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
  • 历史服务器HistoryServer(可选):
    Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。

如图所示,Spark集群是一个Master进程和3个Worker进程,可以开启非常多的任务。诸如:一个任务Driver以及负责该任务的Executor;二个任务Driver以及负责这两个任务的Executor

2.standalone环境安装操作

  集群规划如下:

node1运行: Spark的Master进程 和 1个Worker进程
node2运行: spark的1个worker进程
node3运行: spark的1个worker进程
整个集群提供: 1个master进程 和 3个worker进程

安装步骤:

  1. 在所有机器安装Python(Anaconda),都创建pyspark虚拟环境 以及安装虚拟环境所需要的包pyspark jieba pyhive

  2. 在所有机器配置环境变量
    /etc/profile/root/.bashrc文件中都如node1配置

  3. 配置配置文件
    在/export/server/spark/conf中进行如下操作:

    (pyspark_env) [root@node1 conf]# mv workers.template workersnode1.itcast.cnnode2.itcast.cnnode3.itcast.cn
    • 1
    • 2
    • 3
    • 4
    (pyspark_env) [root@node1 conf]# mv spark-env.sh.template spark-env.sh(pyspark_env) [root@node1 conf]# vim spark-env.sh#!/usr/bin/env bashJAVA_HOME=/export/server/jdk1.8.0_241/HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop/YARN_CONF_DIR=/export/server/hadoop/etc/hadoop/export SPARK_MASTER_HOST=node1.itcast.cnexport SPARK_MASTER_PORT=7077SPARK_MASTER_WEBUI_PORT=8080SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=2gSPARK_WORKER_PORT=7078SPARK_WORKER_WEBUI_PORT=8081SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1.itcast.cn:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21


    在HDFS上创建程序运行历史记录存放的文件夹:sparklog

    (pyspark_env) [root@node1 conf]# hadoop fs -mkdir /sparklog(pyspark_env) [root@node1 conf]# hadoop fs -chmod 777 /sparklog(pyspark_env) [root@node1 conf]# hadoop fs -ls /Found 7 itemsdrwxr-xr-x   - root supergroup          0 2021-10-24 23:07 /flinkdrwxr-xr-x   - root supergroup          0 2021-10-24 17:38 /hbasedrwxr-xr-x   - root supergroup          0 2021-10-24 16:02 /sparkdrwxrwxrwx   - root supergroup          0 2022-06-17 20:45 /sparklogdrwxr-xr-x   - root supergroup          0 2021-10-24 22:19 /testdrwxrwx---   - root supergroup          0 2021-10-24 16:10 /tmpdrwxr-xr-x   - root supergroup          0 2021-10-24 16:09 /user
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    配置spark-defaults.conf文件:

    (pyspark_env) [root@node1 conf]# mv spark-defaults.conf.template spark-defaults.conf(pyspark_env) [root@node1 conf]# vim spark-defaults.confspark.eventLog.enabled  truespark.eventLog.dir      hdfs://node1.itcast.cn:8020/sparklog/spark.eventLog.compress truespark.yarn.historyServer.address        node1.itcast.cn:18080 
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6


    配置log4j.properties文件【可选配置】:

    (pyspark_env) [root@node1 conf]# mv log4j.properties.template log4j.properties(pyspark_env) [root@node1 conf]# vim log4j.properties# Set everything to be logged to the consolelog4j.rootCategory=WARN, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.errlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n# Set the default spark-shell log level to WARN. When running the spark-shell, the# log level for this class is used to overwrite the root logger's log level, so that# the user can have different defaults for the shell and regular Spark apps.log4j.logger.org.apache.spark.repl.Main=WARN# Settings to quiet third party logs that are too verboselog4j.logger.org.sparkproject.jetty=WARNlog4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这个文件的修改不是必须的,为什么修改为WARN。因为Spark是个话唠,会疯狂输出日志,设置级别为WARN只输出警告和错误日志,
    将Spark安装文件夹分发到其他服务器上

    (pyspark_env) [root@node1 server]#scp -r spark-3.1.2-bin-hadoop3.2 node2 : /export/server/(pyspark_env) [root@node1 server]#scp -r spark-3.1.2-bin-hadoop3.2 node3 :/export/server/
    • 1
    • 2

    分别在其他服务器上建立软链接

    (pyspark_env) [root@node2 server]#ln -s /export/server/spark-3.1.2-bin-hadoop3.2 /export/server/spark
    • 1
    (pyspark_env) [root@node3 server]#ln -s /export/server/spark-3.1.2-bin-hadoop3.2 /export/server/spark
    • 1
  4. 启动历史服务器
    启动spark之前需要先启动hadoop,因为spark使用hdfs文件系统作为写入日志的地方

    (pyspark_env) [root@node1 hadoop]# sbin/start-dfs.shStarting namenodes on [node1.itcast.cn]Starting datanodesStarting secondary namenodes [node2.itcast.cn](pyspark_env) [root@node1 hadoop]# sbin/start-yarn.shStarting resourcemanagerStarting nodemanagers[root@node1 hadoop]# sbin/mr-jobhistory-daemon.sh start historyserverWARNING: Use of this script to start the MR JobHistory daemon is deprecated.WARNING: Attempting to execute replacement "mapred --daemon start" instead.(pyspark_env) [root@node1 hadoop]# jps2960 NodeManager2771 ResourceManager2347 DataNode3388 JobHistoryServer2127 NameNode3455 Jps
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    (pyspark_env) [root@node1 spark]# sbin/start-history-server.shstarting org.apache.spark.deploy.history.HistoryServer, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.history.HistoryServer-1-node1.itcast.cn.out
    • 1
    • 2
  5. 启动Spark的Master和Worker进程

    # 启动全部master和workersbin/start-all.sh# 或者可以一个个启动:# 启动当前机器的mastersbin/start-master.sh# 启动当前机器的workersbin/start-worker.sh# 停止全部sbin/stop-all.sh# 停止当前机器的mastersbin/stop-master.sh# 停止当前机器的workersbin/stop-worker.sh
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    (pyspark_env) [root@node1 spark]# sbin/start-all.shstarting org.apache.spark.deploy.master.Master, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.master.Master-1-node1.itcast.cn.outnode3.itcast.cn: starting org.apache.spark.deploy.worker.Worker, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node3.itcast.cn.outnode2.itcast.cn: starting org.apache.spark.deploy.worker.Worker, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node2.itcast.cn.outnode1.itcast.cn: starting org.apache.spark.deploy.worker.Worker, logging to /export/server/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-node1.itcast.cn.out(pyspark_env) [root@node1 spark]# jps4049 DataNode7873 Master3832 NameNode4664 NodeManager7976 Worker7737 HistoryServer4475 ResourceManager8028 Jps## 可以发现spark的Master和Worker进程
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  6. 打开WEB UI界面进行简单测试。WEB UI端口默认是8080。

3.测试

  使用客户端工具进行测试。

  • bin/pyspark

    [root@node1 bin]# ./pyspark --master spark://node1.itcast.cn:7077
    • 1


    可以发现:Running Applications中有一个正在运行的任务

    进入该任务,可以看到3个Worker:

    点击Application Detail UI,可以看到在spark任务中运行的多个子任务

    注意:

    1. 集群模式下程序是在集群上运行的,不要直接读取本地文件,应该读取hdfs上的
      因为程序运行在集群上,具体在哪个节点上我们运行并不知道,其他节点可能并没有那个数据文件
      2.Spark应用程序的层级关系如下:
  • bin/spark-submit (PI)

    [root@node1 bin]# ./spark-submit --master spark://node1.itcast.cn:7077 /export/server/spark/examples//src/main/python/pi.py 100
    • 1


    查看历史服务器,端口:18080。(运行的时候查看4040端口,运行完成后查看18080端口)

    就可以查看jobs

  • bin/spark-shell

    [root@node1 bin]# spark-shell --master spark://node1.itcast.cn:7077
    • 1


  • 查看历史服务器WEB UI
    端口是18080

4.Spark程序运行层次结构(应用架构)

  在Spark中运行单词计数,

  登录4040端口,查看任务运行状态,图中显示一个Application有一个Job(可以有多个)。

  可以发现一个Job分为多个阶段

  点击某一阶段,可以发现一个阶段又可以有多个任务并行运行。


应用架构可以用下图简单理解

  切换到【Executors】Tab页面

从图中可以看到Spark Application运行到集群上时,由两部分组成:Driver Program和Executors。

  • Driver Program

    • 相当于AppMaster,整个应用管理者,负责应用中所有Job的调度执行;
    • 运行JVM Process,运行程序的MAIN函数,必须创建SparkContext上下文对象;
    • 一个SparkApplication仅有一个;
  • Executors

    • 相当于一个线程池,运行JVM Process,其中有很多线程,每个线程运行一个Task任务,一个Task任务运行需要1 Core CPU,所有可以认为Executor中线程数就等于CPU Core核数;
    • 一个Spark Application可以有多个,可以设置个数和资源信息;


用户程序从最开始的提交到最终的计算执行,需要经历以下几个阶段:

  1. 用户程序创建 SparkContext 时,新创建的 SparkContext 实例会连接到 ClusterManager。 Cluster Manager 会根据用户提交时设置的 CPU 和内存等信息为本次提交分配计算资源,启动 Executor。

  2. Driver会将用户程序划分为不同的执行阶段Stage,每个执行阶段Stage由一组完全相同Task组成,这些Task分别作用于待处理数据的不同分区。在阶段划分完成和Task创建后, Driver会向Executor发送 Task;

  3. Executor在接收到Task后,会下载Task的运行时依赖,在准备好Task的执行环境后,会开始执行Task,并且将Task的运行状态汇报给Driver;

  4. Driver会根据收到的Task的运行状态来处理不同的状态更新。 Task分为两种:一种是Shuffle Map Task,它实现数据的重新洗牌,洗牌的结果保存到Executor 所在节点的文件系统中;另外一种是Result Task,它负责生成结果数据;

  5. Driver 会不断地调用Task,将Task发送到Executor执行,在所有的Task 都正确执行或者超过执行次数的限制仍然没有执行成功时停止;

Spark Application程序运行时三个核心概念:Job、Stage、Task,说明如下:

  • Job:由多个 Task 的并行计算部分,一般 Spark 中的action 操作,会生成一个 Job。
  • Stage:Job 的组成单位,一个 Job 会切分成多个 Stage,Stage 彼此之间相互依赖顺序执行,而每个 Stage 是多个 Task 的集合,类似 map 和 reduce stage。
  • Task:被分配到各个 Executor 的单位工作内容,它是Spark 中的最小执行单位,一般来说有多少个 Paritition,就会有多少个 Task,每个 Task 只会处理单一分支上的数据。

监控页面有4040,8080,18080,它们有何区别吗?

  • 4040: 是一个运行的Application在运行的过程中临时绑定的端口,用以查看当前任务的状态。4040被占用会顺延到4041、4042等。4040是一个临时端口,当前程序运行完成后, 4040就会被注销
  • 8080: 默认是StandAlone下,Master角色(进程)的WEB端口,用以查看当前Master(集群)的状态
  • 18080: 默认是历史服务器的端口,由于每个程序运行完成后,4040端口就被注销了。 在以后想回看某个程序的运行状态就可以通过历史服务器查看,历史服务器长期稳定运行,可供随时查看被记录的程序的运行过程。

4.总结

  • StandAlone的原理?
    Master和Worker角色以独立进程的形式存在,并组成Spark运行时环境(集群),而Local模式中Master、Worker和Driver是一起的,通通是Local进程本身。
  • Spark角色在StandAlone中的分布?
    Master角色:Master进程,Worker角色:Worker进程,Driver角色:以线程运行在Master中,Executor角色:以线程运行在Worker中
  • standalone如何提交Spark应用?
    bin/spark-submit --master spark:/lserver:7077
  • 4040\8080\18080分别是什么?
    4040是单个程序运行的时候绑定的端口可供查看本任务运行情况
    8080Master角色默认的WEB UI端口,用以查看当前Master(集群)的状态
    18080: 默认是历史服务器的端口,可供随时查看被记录的程序的运行过程
  • Job\State\Task的关系?
    一个Spark程序会被分成多个子任务(Job)运行,每一个Job会分成多个State(阶段)来运行,每一个State内会分出来多个Task(线程)来执行具体任务,每个Task以线程Thread方式执行,需要1Core CPU。每个Job执行按照DAG图进行的。

四、Standalone HA环境搭建

1. StandAlone HA 运行原理

  Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master单点故障(SPOF)的问题。如何解决这个单点故障的问题,Spark提供了两种方案

  1. 基于文件系统的单点恢复(Single-Node Recovery with Local File System)–只能用于开发或测试环境。
  2. 基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)–可以用于生产环境。
    ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

2. 高可用HA StandAlone集群搭建

前提: 确保Zookeeper 和 HDFS 均已经启动

先在spark-env.sh中, 删除: SPARK_MASTER_HOST=node1.itcast.cn
原因: 配置文件中固定master是谁, 那么就无法用到zk的动态切换master功能了.
spark-env.sh中, 增加:

SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"# spark.deploy.recoveryMode 指定HA模式 基于Zookeeper实现# 指定Zookeeper的连接地址# 指定在Zookeeper中注册临时节点的路径
  • 1
  • 2
  • 3
  • 4

将spark-env.sh 分发到每一台服务器上

scp spark-env.sh node2:/export/server/spark/conf/scp spark-env.sh node3:/export/server/spark/conf/
  • 1
  • 2

停止当前StandAlone集群

sbin/stop-all.sh
  • 1

启动集群:

# 在node1上 启动一个master 和全部workersbin/start-all.sh# 注意, 下面命令在node2上执行sbin/start-master.sh# 在node2上启动一个备用的master进程
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6



master主备切换:
提交一个spark任务到当前alivemaster上:

bin/spark-submit --master spark://node1:7077 /export/server/spark/examples/src/main/python/pi.py 1000
  • 1

在提交成功后, 将alivemaster直接kill掉,不会影响程序运行:

当新的master接收集群后, 程序继续运行, 正常得到结果.

结论:HA模式下, 主备切换 不会影响到正在运行的程序.

最大的影响是 会让它中断大约30秒左右.

3. 测试运行

4. 总结

  • StandAlone HA的原理
    基于Zookeeper做状态的维护, 开启多个Master进程, 一个作为活跃,其它的作为备份,当活跃进程宕机,备份Master进行接管
  • 为什么需要Zookeeper?
    分布式进程是分布在多个服务器上的, 状态之间的同步需要协调,比如谁是master,谁 是worker.谁成了master后要通知worker等, 这些需要中心化协调器Zookeeper来进行状态统一协调

五、Spark on YARN环境搭建(重点掌握)

  对于企业来说,在已有YARN集群的前提下,在单独准备Spark StandAlone集群,对资源的利用就不高。 所以,在企业中,多数场景下,会将Spark运行到YARN集群中,提高资源利用率。YARN本身是一个资源调度框架,负责对运行在内部的计算框架进行资源调度管理。作为典型的计算框架,Spark本身也是直接运行在YARN中,并接受YARN的调度的。所以,对于Spark On YARN,无需部署Spark集群,只要找一台服务器,充当Spark的客户端,即可提交任务到YARN集群中运行。

为什么需要Spark on YARN?
提高资源利用率,在已有YARN的场景下让Spark收到YARN的调度,可以更好的管控资源,提高利用率并方便管理。

1.Spark On Yarn本质

资源管理层面

  • Master角色由YARN的ResourceManager担任。
  • Worker角色由YARN的NodeManager担任。

任务运行层面

  • Driver可以运行在容器内(Cluster模式)或客户端进程中(Client模式)
  • Executor全部运行在YARN提供的容器内

简单来讲,Spark On Yarn就是将Driver和Executor运行在容器内部。我们只需要关心任务运行层面,资源管理层面Spark不在管理,有YARN统一协调。

Spark On Yarn需要

  1. 需要Yarn集群:已经安装了
  2. 需要Spark客户端工具, 比如spark-submit, 可以将Spark程序提交到YARN中
  3. 需要被提交的代码程序:,如spark/examples/src/main/python/pi.py此示例程序,或我们后续自己开发的Spark任务

2.spark on yarn环境配置

只需要确保在spark-env.sh 配置以下环境变量即可

  • HADOOP_CONF_DIR
  • YARN_CONF_DIR

3. 测试

注意: 交互式环境 pyspark 和 spark-shell 无法运行 cluster模式
–deploy-mode 选项是指定部署模式, 默认是 客户端模式

  • client就是客户端模式
  • cluster就是集群模式

–deploy-mode 仅可以用在YARN模式下

  • bin/pyspark

    [root@node1 spark]# bin/pyspark --master yarn
    • 1

    可以发现:pyspark交互式程序正常的运行在yarn客户端之中。

    可以通过端口:4040,查看当前任务的运行状态。

  • bin/spark-shell

    bin/spark-shell --master yarn --deploy-mode client|cluster
    • 1

    注意: 交互式环境 pyspark 和 spark-shell 无法运行 cluster模式

  • bin/spark-submit (PI)

    [root@node1 spark]# bin/spark-submit --master yarn /export/server/spark/examples/src/main/python/pi.py 100
    • 1


4. spark on yarn部署模式

  Spark On YARN是有两种运行模式的,一种是Cluster模式,一种是Client模式。这两种模式的区别就是Driver运行的位置

  • Cluster模式即:Driver运行在YARN容器内部,和ApplicationMaster在同一个容器内
  • Client模式即:Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中

  Cluster模式与Client模式的区别:

5.Spark On Yarn两种模式测试

  • client模式

    [root@node1 spark]# bin/spark-submit --master yarn --deploy-mode client --driver-memory 512m --executor-memoryy 512m --num-executors 3 --total-executor-cores 3 /export/server/spark/examples/src/main/python/pi.py 100
    • 1



    日志跟随客户端的标准输出流进行输出。

  • cluster模式

    [root@node1 spark]# bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/server/spark/examples/src/main/python/pi.py 100
    • 1



6.Spark On Yarn两种模式总结

  Client模式和Cluster模式最本质的区别是:Driver程序运行在哪里
Client模式:学习测试时使用,生产不推荐(要用也可以,性能略低,稳定性略低)。

  1. Driver运行在Client上,和集群的通信成本高
  2. Driver输出结果会在客户端显示

Cluster模式:生产环境中使用该模式

  1. Driver程序在YARN集群中,和集群的通信成本低
  2. Driver输出结果不能在客户端显示
  3. 该模式下Driver运行ApplicattionMaster这个节点上,由Yarn管理,如果出现问题,yarn会重启ApplicattionMaster(Driver)

5.扩展阅读:两种模式详细流程

  在YARN Client模式下,Driver在任务提交的本地机器上运行,示意图如下:

具体流程步骤如下:

  1. Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster
  2. 随后ResourceManager分配Container,在合适的NodeManager上启ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;
  3. ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程;
  4. Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;
  5. 之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后将Task分发到各个Executor上执行。

  在YARN Cluster模式下,Driver运行在NodeManager Contanier中,此时Driver与AppMaster合为一体,示意图如下:

具体流程步骤如下:

  1. 任务提交后会和ResourceManager通讯申请启动ApplicationMaster
  2. 随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver;
  3. Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后在合适的NodeManager上启动Executor进程;
  4. Executor进程启动后会向Driver反向注册;
  5. Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行;

六、spark-submit和pyspark相关参数

客户端工具我们可以用的有:

  • bin/pyspark: pyspark解释器spark环境
  • bin/spark-shell: scala解释器spark环境
  • bin/spark-submit: 提交jar包或Python文件执行的工具
  • bin/spark-sql: sparksql客户端工具

这4个客户端工具的参数基本通用.
以spark-submit 为例:
bin/spark-submit --master spark://node1:7077 xxx.py

Usage: spark-submit [options] <app jar | python file | R file> [app arguments]Usage: spark-submit --kill [submission ID] --master [spark://...]Usage: spark-submit --status [submission ID] --master [spark://...]Usage: spark-submit run-example [options] example-class [example args]Options:  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,                              k8s://https://host:port, or local (Default: local[*]).  --deploy-mode DEPLOY_MODE   部署模式 client 或者 cluster 默认是client  --class CLASS_NAME          运行java或者scala class(for Java / Scala apps).  --name NAME                 程序的名字  --jars JARS                 Comma-separated list of jars to include on the driver                              and executor classpaths.  --packages                  Comma-separated list of maven coordinates of jars to include                              on the driver and executor classpaths. Will search the local                              maven repo, then maven central and any additional remote                              repositories given by --repositories. The format for the                              coordinates should be groupId:artifactId:version.  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while                              resolving the dependencies provided in --packages to avoid                              dependency conflicts.  --repositories              Comma-separated list of additional remote repositories to                              search for the maven coordinates given with --packages.  --py-files PY_FILES         指定Python程序依赖的其它python文件  --files FILES               Comma-separated list of files to be placed in the working                              directory of each executor. File paths of these files                              in executors can be accessed via SparkFiles.get(fileName).  --archives ARCHIVES         Comma-separated list of archives to be extracted into the                              working directory of each executor.  --conf, -c PROP=VALUE       手动指定配置  --properties-file FILE      Path to a file from which to load extra properties. If not                              specified, this will look for conf/spark-defaults.conf.  --driver-memory MEM         Driver的可用内存(Default: 1024M).  --driver-java-options       Driver的一些Java选项  --driver-library-path       Extra library path entries to pass to the driver.  --driver-class-path         Extra class path entries to pass to the driver. Note that                              jars added with --jars are automatically included in the                              classpath.  --executor-memory MEM       Executor的内存 (Default: 1G).  --proxy-user NAME           User to impersonate when submitting the application.                              This argument does not work with --principal / --keytab.  --help, -h                  显示帮助文件  --verbose, -v               Print additional debug output.  --version,                  打印版本 Cluster deploy mode only(集群模式专属):  --driver-cores NUM          Driver可用的的CPU核数(Default: 1). Spark standalone or Mesos with cluster deploy mode only:  --supervise                 如果给定, 可以尝试重启Driver Spark standalone, Mesos or K8s with cluster deploy mode only:  --kill SUBMISSION_ID        指定程序ID kill  --status SUBMISSION_ID      指定程序ID 查看运行状态 Spark standalone, Mesos and Kubernetes only:  --total-executor-cores NUM  整个任务可以给Executor多少个CPU核心用 Spark standalone, YARN and Kubernetes only:  --executor-cores NUM        单个Executor能使用多少CPU核心 Spark on YARN and Kubernetes only(YARN模式下):  --num-executors NUM         Executor应该开启几个  --principal PRINCIPAL       Principal to be used to login to KDC.  --keytab KEYTAB             The full path to the file that contains the keytab for the                              principal specified above. Spark on YARN only:  --queue QUEUE_NAME          指定运行的YARN队列(Default: "default").
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发