Spark

概述

概述

概念

  • 一种基于内存的快速、通用、可扩展的大数据分析计算引擎
  • 使用Scala开发,适合迭代计算和数据挖掘计算
  • 基于MR框架,优化了计算过程,使用内存替换计算结果的传输
  • 计算模型非常丰富
  • Spark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,Hadoop基于磁盘
  • Spark运行环境环境 = Java环境(JVM) + 集群环境(YARN) + Spark环境(lib)

端口号

  • Spark查看当前Spark-shell运行任务情况端口号:4040(计算)
  • Spark Master内部通信服务端口号:7077
  • Standalone模式下,Spark Master Web端口号:8080(资源)
  • Spark历史服务器端口号:18080
  • Hadoop YARN任务运行情况查看端口号:8088

入门案例

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.3.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 获取Spark的连接(环境)
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
// 读取文件
val lines = sc.textFile("data/word.txt")
// 分词
val words = lines.flatMap(_.split(" "))
// 分组
val wordGroup = words.groupBy(word => word)
// 统计分组
val wordCount = wordGroup.map((word, list) => (word, list.size))
// 打印
wordCount.collect().foreach(println)
// 停止
sc.stop()

运行环境

Local模式

  • 本机提供资源 + Spark提供计算

  • 启动

    1
    spark-shell
    • 启动成功可以输入网址进行Web UI监控页面访问:http://虚拟机地址:4040
  • 命令行工具

    • 在解压缩文件夹下的data目录中,添加word.txt文件。在命令行工具中执行如下代码指令(和IDEA中代码简化版一致)

      1
      sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  • 退出本地模式

    1
    :quit
  • 提交应用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    bin/spark-submit \
    # 执行程序的主类,此处可以更换为咱们自己写的应用程序
    --class org.apache.spark.examples.SparkPi \
    # 部署模式,默认为本地模式,数字表示分配的虚拟CPU核数量
    # local[*]代表使用最大的核数
    --master local[2] \
    # 运行的应用类所在的jar包,实际使用时,可以设定为咱们自己打的jar包
    ./examples/jars/spark-examples_2.12-3.0.0.jar \
    # 数字10表示程序的入口参数,用于设定当前应用的任务数量
    10
    1
    2
    3
    4
    spark-submit \
    --class cc.mousse.spark.wordcount.WordCountDemo1 \
    --master local[2] \
    ./jars/Spark-1.0.jar

Standalone

  • Spark提供资源 + 计算

  • 体现了经典的master-slave模式

  • 配置

    • 修改conf/works文件

      1
      2
      3
      bigdata1
      bigdata2
      bigdata3
    • 修改spark-env.sh

      1
      2
      3
      4
      export JAVA_HOME=/jdk
      SPARK_MASTER_HOST=bigdata1
      # 内部通信端口
      SPARK_MASTER_PORT=7077
    • 分发

  • 启动集群

    1
    /spark/sbin/start-all.sh 
  • 提交应用

    1
    2
    3
    4
    5
    bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master spark://bigdata1:7077 \
    ./examples/jars/spark-examples_2.12-3.3.0.jar \
    10
    • 执行任务时,会产生多个Java进程
      • SparkSubmit:提交节点进程
      • CoarseGrainedExecutorBackend:执行节点进程
  • 配置历史服务器

    • spark-shell停止掉后,集群监控bigdata1:4040页面就看不到历史任务的运行情况,所以开发时都配置历史服务器记录任务运行情况

    • 修改spark-default.conf文件,配置日志存储路径

      1
      2
      spark.eventLog.enabled          true
      spark.eventLog.dir hdfs://bigdata2:8020/directory
    • 需要启动hadoop集群,HDFS上的directory目录需要提前存在

      1
      2
      start-dfs.sh
      hadoop fs -mkdir /directory
    • 修改spark-env.sh文件, 添加日志配置

      1
      2
      3
      4
      5
      6
      7
      export SPARK_HISTORY_OPTS="
      # WEB UI访问的端口号为18080
      -Dspark.history.ui.port=18080
      # 指定历史服务器日志存储路径
      -Dspark.history.fs.logDirectory=hdfs://bigdata2:8020/directory
      # 指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数
      -Dspark.history.retainedApplications=30"
    • 分发配置文件

    • 重新启动集群和历史服务

      1
      2
      sbin/start-all.sh
      sbin/start-history-server.sh
    • 重新执行任务

    • 看历史服务:http://bigdata1:18080

  • 配置高可用

    • 修改spark-env.sh

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      # 注释如下内容
      # SPARK_MASTER_HOST=linux1
      # SPARK_MASTER_PORT=7077

      # 添加如下内容
      # Master监控页面默认访问端口为8080,但是可能会和Zookeeper冲突,所以改成8989,也可以自定义,访问UI监控页面时请注意
      SPARK_MASTER_WEBUI_PORT=8989

      export SPARK_DAEMON_JAVA_OPTS="
      -Dspark.deploy.recoveryMode=ZOOKEEPER
      -Dspark.deploy.zookeeper.url=bigdata1,bigdata2,bigdata3
      -Dspark.deploy.zookeeper.dir=/spark"
    • 分发配置文件

    • 启动集群

      1
      sbin/start-all.sh
    • 在bigdata2上单独启用Master节点

      1
      sbin/start-master.sh
    • 提交应用到高可用集群

      1
      2
      3
      4
      5
      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://bigdata1:7077,bigdata2:7077 \
      ./examples/jars/spark-examples_2.12-3.3.0.jar \
      10

Yarn

  • 配置

    • 修改hadoop配置文件yarn-site.xml并分发

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
      <property>
      <name>yarn.nodemanager.pmem-check-enabled</name>
      <value>false</value>
      </property>

      <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
      <property>
      <name>yarn.nodemanager.vmem-check-enabled</name>
      <value>false</value>
      </property>
    • 修改conf/spark-env.sh

      1
      2
      export JAVA_HOME=/jdk
      YARN_CONF_DIR=/hadoop/etc/hadoop
    • 启动

    • 提交任务

      1
      2
      3
      4
      5
      6
      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master yarn \
      --deploy-mode cluster \
      ./examples/jars/spark-examples_2.12-3.3.0.jar \
      10
    • 查看http://bigdata2:8088页面,点击History,查看历史页面

  • 配置历史服务器

    • 前面步骤相同

    • 修改spark-defaults.conf

      1
      2
      spark.yarn.historyServer.address=bigdata1:18080
      spark.history.ui.port=18080
    • 启动

    • 提交任务

      1
      2
      3
      4
      5
      6
      bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master yarn \
      --deploy-mode client \
      ./examples/jars/spark-examples_2.12-3.3.0.jar \
      10

部署模式对比

模式 Spark安装机器数 需启动的进程 所属者 应用场景
Local 1 Spark 测试
Standalone 3 Master及Worker Spark 单独部署
Yarn 1 Yarn及HDFS Hadoop 混合部署

K8S&Mesos

本机

  • 提交任务

    1
    spark-submit --class org.apache.spark.examples.SparkPi --master local ../examples/jars/spark-examples_2.12-3.3.0.jar 10

运行架构

核心组件

Driver

  • Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工作
  • 主要作用
    • 将用户程序转化为作业(job)
    • 在Executor之间调度任务(task)
    • 跟踪Executor的执行情况
    • 通过UI展示查询运行情况

Executor

  • 集群中工作节点(Worker)中的一个JVM进程,负责在Spark作业中运行具体任务(Task),任务彼此之间相互独立
  • Spark应用启动时,Executor节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在
  • 如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行
  • 核心功能
    • 负责运行组成Spark应用的任务,并将结果返回给驱动器进程
    • 通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD提供内存式存储。RDD是直接缓存在Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算

Master & Worker

  • Standalone模式下存在
  • Spark集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调度的功能
  • Master:一个进程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于Yarn环境中的RM
  • Worker:一个进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于Yarn环境中NM

ApplicationMaster

  • Hadoop用户向YARN集群提交应用程序时,提交程序中应该包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行用户自己的程序任务Job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况
  • ResourceManager(资源)和Driver(计算)之间的解耦合靠的就是ApplicationMaster

核心概念

Executor与Core

  • Executor:用来做计算的容器

  • Core:容器运算时的CPU核数量

  • 应用程序相关启动参数

    名称 说明
    –num-executors 配置Executor的数量
    –executor-memory 配置每个Executor的内存大小
    –executor-cores 配置每个Executor的虚拟CPU core数量

并行度

  • 整个集群并行执行任务的数量称之为并行度
  • 是并行,不是并发

提交流程

graph TD
提交任务 --> Driver运行 --> 执行main函数创建环境后阻塞 --反向注册后--> 执行到action算子 --> stage划分 --> 创建taskSet -- 将task分发给指定的Excutor --> Executor启动
执行main函数创建环境后阻塞 -- 注册应用程序 --> 集群管理器 --> Executor启动 -- 反向注册 --> 执行main函数创建环境后阻塞
  • 两种部署执行的方式:Client和Cluster。两种模式主要区别在于:Driver程序的运行节点位置

数据结构

概述

  • Spark计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景
  • 数据结构:组织和管理数据的结构
  • 三大数据结构
    • RDD : 弹性分布式数据集
    • 累加器:分布式共享只写变量
    • 广播变量:分布式共享只读变量

RDD

概念

  • 弹性分布式数据集,是Spark中最基本的数据处理模型,代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
    • 弹性
      • 存储的弹性:内存与磁盘的自动切换
      • 容错的弹性:数据丢失可以自动恢复
      • 计算的弹性:计算出错重试机制
      • 分片的弹性:可根据需要重新分片
    • 分布式:数据存储在大数据集群不同节点上
    • 数据集:RDD封装了计算逻辑,并不保存数据
    • 数据抽象:RDD是一个抽象类,需要子类具体实现
    • 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
    • 分区、并行计算

数据结构

  • 类似于链表中Node的数据结构
  • RDD中有适合并行计算的分区操作
  • RDD分装了最小的计算单元,目的是更适合重复使用
  • Spark的计算主要通过组合RDD的操作完成业务需求
  • RDD的功能扩展和IO一样,体现了装饰者修饰模式
  • RDD中的collect方法类似于IO中的read方法,不调用不执行逻辑

核心属性

  • 分区列表:用于执行任务时并行计算,是实现分布式计算的重要属性
  • 分区计算函数:Spark在计算时使用分区函数对每一个分区进行计算
  • RDD间的依赖关系:RDD是计算模型的封装,当需求中需要将多个计算模型进行组合时需要将多个RDD建立依赖关系
  • 分区器(可选):当数据为KV类型数据时可通过设定分区器自定义数据的分区
  • 首选位置(可选):计算数据时可根据计算节点的状态选择不同的节点位置进行计算
    • 数据和计算的位置
      • 在同一个进程:进程本地化
      • 在同一个节点:节点本地化
      • 在同一个机架:机架本地化
      • 移动数据不如移动计算

执行流程

  • Spark框架在执行时先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算。最后得到计算结果
  • 在Yarn环境中RDD的工作原理
    1. 启动Yarn集群环境
    2. Spark通过申请资源创建调度节点和计算节点
    3. Spark框架根据需求将计算逻辑根据分区划分成不同的任务
    4. 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

创建

从内存中创建

1
2
3
4
5
6
7
8
9
10
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 从内存中创建RDD
// 构建方式1
// parallelize用于构建RDD,也可将该集合作为数据模型处理的数据源
// parallelize:并行
val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4))
// 构建方式2:底层就是调用parallelize
val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 2, 3, 4))
sc.stop()

从磁盘中创建

1
2
3
4
5
6
7
8
9
10
11
12
val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(conf)
// TODO 从文件中创建RDD
// textFile可以通过路径获取数据,可以将文件作为处理数据的数据源
// testFile的路径可以是相对路径/绝对路径/HDFS路径
val rdd: RDD[String] = sc.textFile("data/word.txt")
// testFile的路径可以是目录/通配符路径
val rdd1: RDD[String] = sc.textFile("data")
// wholeTextFiles可以获取文件数据的来源
val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data")
rdd2.collect().foreach(println)
sc.stop()

分区

从内存中创建

  • 若构建RDD时没有指定数据处理分区的数量,会使用默认分区数量

  • makeRDD方法第二个参数表示分区数量numSlices(存在默认值)

    1
    2
    3
    val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2)
    // saveAsTextFile方法可以生成分区文件
    rdd.saveAsTextFile("data/output")
    1
    2
    scheduler.conf.getInt("spark.default.parallelism", totalCores)
    // totalCores : 当前Master环境的总(虚拟)核数
  • 分区设置的优先级: 方法参数 > 配置参数 > 环境配置

  • Spark分区策略:范围

    1
    2
    [1,2][3,4,5]
    [1][2,3][4,5]

从磁盘中创建

  • Spark读取文件底层是Hadoop读取文件, Spark分区数量来自于Hadoop读取文件的切片

    1
    2
    3
    4
    5
    6
    // 预计每个分区的字节大小 = 总的字节数 / 想要切片数量
    goalSize = totalSize / numSplits = 7B / 3 = 2B
    // 实际的切片数量
    splitSize = Math.max(minSize(1), Math.min(goalSize(2B), blockSize(128M))) = 2B
    // 余下来的数大于每个分区的字节大小的10%,会新开一个区
    7 / 2 = 3...1 = 3 + 1 = 4
  • Spark读取文件数据底层是Hadoop读取,是Hadoop的读取规则

    • Hadoop在计算分区时会处理数据时的逻辑不一样
      • Hadoop按行读取,不是按字节读取
      • Hadoop按偏移量读取,且不会重复读取相同偏移量
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    输入文件:(@@代表换行符)
    文件 偏移量
    123@@ => 1,2,3,4,5
    456@@ => 6,7,8,9,10
    789 => 11,12,13
    计算分区:(预计3个分区)
    13B / 3 = 4B
    13B / 4B = 3 ... 1B(1B / 4B > 10%) => 3 + 1 = 4
    计算读取偏移量:
    分区 分区中的数据 偏移量
    [0, 4] => [123] 1,2,3,4,5(读取一整行)
    [4, 8] => [456] 6,7,8,9,10(不重复读)
    [8, 12] => [789] 11,12,13
    [12, 13] => []
  • textFile方法可在读取文件时设定分区,参数2若不设定则使用默认值

    1
    math.min(defaultParallelism, 2)
    • 参数2表示最小分区数,最终的分区数量可能大于该个值

      1
      2
      3
      val rdd = sc.textFile("data/num.txt", 3)
      // saveAsTextFile方法可以生成分区文件
      rdd.saveAsTextFile("data/output")

转换算子

概念

  • 为了和Scala集合方法进行区分,将RDD的放法称之为算子

    1
    2
    3
    4
    5
    // 该foreach方法为Scala集合(单点)的方法
    // val array: Array[(String, Int)] = wordCount.collect()
    wordCount.collect().foreach(println)
    // 该foreach方法为RDD(分布式)的算子(方法)
    wordCount.foreach(println)
  • 转换算子:逻辑封装,将旧的逻辑转为新的逻辑

  • 根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

Map

  • 将数据源中的每一条数据进行处理
  • 函数类型:Int => U(不确定)
  • 默认情况下,RDD进行转换时新的RDD和旧的RDD分区数量保持一致
  • 数据处理过程中,数据执行顺序分区内有序分区间无序
    • setMaster(“local”)会全部有序(1个线程执行多个分区)
  • RDD是逻辑的封装,多个RDD的情况下,一个分区的一条数据走完所有的逻辑后才会走下一条数据
  • 默认情况下,数据处理过程中默认的分区不变
1
2
3
4
5
6
7
8
9
10
11
12
// [1,2][3,4]
val rdd2 = sc.makeRDD(Seq(1, 2, 3, 4), 2)
val rdd21 = rdd2.map(num => {
println(s"first map: $num")
num
})
val rdd22 = rdd21.map(num => {
println(s"second map: $num")
num * 2
})
// [2,4][6,8]
rdd22.saveAsTextFile("data/output")

MapPartions

  • 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据(批量放入内存中再进行操作)
  • 和Map的区别
    • 数据处理角度
      • Map:分区内一个数据一个数据的执行,类似于串行操作
      • MapPartitions:以分区为单位进行批处理操作
    • 功能角度
      • Map:将数据源中的数据进行转换和改变,不会减少或增多数
      • MapPartitions:需要传递一个迭代器,返回一个迭代器(不一定与传递的相同),可增加/减少数据
    • 性能角度
      • Map:类似串行操作,所以性能比较低
      • MapPartitions:类似批处理,性能较高。但会长时间占用内存,可能导致内存溢出
      • 内存有限的情况下不推荐使用MapPartitions
  • 问题
    • 占用内存大
    • 使用完的数据不会释放(直到集合中的数据全处理完)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val rdd1 = sc.makeRDD(Seq(1, 2, 3, 4), 2)
// 以一个分区为单位进行转换
rdd1.mapPartitions(list => {
// 在单点内存中做操作,而不是网络传输一个数据做一次操作
// 有几个分区走几遍
list.map(_ * 2)
})
rdd1.collect().foreach(println)

// TODO 练习:获取每个分区的最大值
rdd1.mapPartitions(list => {
val max = list.max
// 返回的数据需要可迭代
List(max).iterator
}).collect().foreach(println)

MapPartionsWithIndex

  • 将待处理的数据以分区为单位发送到计算节点进行处理,可以进行任意的处理,包括过滤数据,在处理时同时可以获取当前分区索引
1
2
3
4
5
6
7
8
// [1,2][3,4][5,6]
val rdd1 = sc.makeRDD(Seq(1, 2, 3, 4, 5, 6), 3)
// 获取特定分区的数据
val partitionIdx = 1
rdd1.mapPartitionsWithIndex((idx, list) => {
if (idx == partitionIdx) list
else Nil.iterator
}).collect().foreach(println)

FlatMap

  • 将处理的数据进行扁平化后再进行映射处理,也称之为扁平映射
1
2
3
4
5
6
7
8
9
10
11
12
13
val rdd1 = sc.makeRDD(List("Hello Scala", "Hello Spark"))
// 1个整体 => 使用容器包含n个个体
rdd1.flatMap(_.split(" ")).collect().foreach(println)
val rdd2 = sc.makeRDD(List(List(1, 2), List(3, 4)))
// 1,2,3,4
rdd2.flatMap(list => list).collect().foreach(println)

// TODO 练习:进行扁平化操作
val rdd3 = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
rdd3.flatMap {
case list: List[_] => list
case other => List(other)
}.collect().foreach(println)

Glom

  • 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
1
2
3
4
5
6
7
8
9
10
// 个体 => 整体
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
// 将每个分区的数据封装成1个数组
val rdd11: RDD[Array[Int]] = rdd1.glom()
rdd11.collect().foreach(_.mkString(","))

// TODO 练习:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
println(
rdd1.glom().map(_.max).collect().sum
)

GroupBy

  • 根据函数计算结果进行分组,执行结果为KV数据类型:k是为分组的标识,v就是同一个组的数据集合
  • 默认情况处理数据后所在分区不会发生改变:Spark要求一个族的数据必须在一个分区中
  • 同组数据一定在同分区中,同分区中不一定只有一个组
  • Shuffle
    • 一个分区数据被打乱和其他分区数据混合在一起,该操作称为Shuffle
    • Shuffle操作不允许在内存中等待,必须落盘
    • Shuffle会将完整的计算过程一分为二,一个阶段写数据,一个阶段读数据
    • 写数据没完成不能读数据
    • Shuffle操作可以手动更改分区
1
2
3
4
5
6
7
8
9
10
11
12
13
// TODO 练习:将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组
sc.makeRDD(List("Hello", "hive", "hbase", "Hadoop")).groupBy(_.charAt(0), 2)
// TODO 练习:从服务器日志数据apache.log中获取每个时间段访问量
// 可以实现WordCount(1/10)
sc
.textFile("data/input/apache.log")
.map( line => {
val data = line.split(" ")
val time = data(3).split(":")(3)
(time, 1)
})
.groupBy(_._1)
.map{case (time, list) => (time, list.size)}

Filter

  • 将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃
  • 数据进行筛选过滤后分区不变,数据可能不均衡,生产环境下可能会出现数据倾斜
1
2
3
4
5
6
7
8
9
10
val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
// 可以按照指定规则对每一条数据进行筛选过滤
// true数据保留,false数据丢球
rdd1.filter(_ % 2 == 0)

// TODO 练习:从服务器日志数据apache.log中获取2015年5月17日的请求路径
sc
.textFile("data/input/apache.log")
.filter(_.split(" ")(3).startsWith("17/05/2015"))
.map(_.split(" ")(6))

Sample

  • 根据指定的规则从数据集中抽取数据
    • 参数1:抽取方式
      • 抽取放回:true
      • 抽取不放回:false
    • 参数2
      • 抽取放回:每条数据希望被重复抽取的次数
      • 抽取不放回:每条数据被抽取的几率
    • 参数3:随机数种子
      • 随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
1
2
val rdd1 = sc.makeRDD(1 to 10)
rdd1.sample(false, 0.5)

Coalesce

  • 根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率
  • 默认合并不会进行Shuffle,分区合并与首选位置有关(就近),在一些情况下无法解决数据倾斜问题,可以在缩减分区的同时进行Shuffle操作(参数2)
    • Shuffle表示分区数据被打乱后和其他分区数据重新组合在一起
    • 所有Shuffle都有改变分区的能力,不是所有改变分区的操作都有Shuffle
1
2
val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val rdd11 = rdd1.coalesce(2, true)

Distinct

  • 底层使用分布式的方式去重
1
2
3
4
5
6
7
8
9
val rdd1 = sc.makeRDD(List(1, 1, 1, 1, 1, 1))
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
// 【1,1,1】
// 【(1, null),(1, null),(1, null)】
// 【null, null, null】
// 【null, null】
// 【(1, null)】
// 【1】
val rdd11 = rdd1.distinct()

Repartition

  • 扩大分区
  • 底层为Shuffle为true的coalesce,在不Shuffle的情况下coalesce扩大分区没有意义
1
2
val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val rdd12 = rdd1.repartition(3)

SoryBy

  • 中间存在shuffle的过程
1
2
3
4
5
val rdd1 = sc.makeRDD(List(1, 4, 2, 5, 6, 3), 2)
// 默认升序
rdd1.sortBy(num => num)
// 降序
rdd1.sortBy(num => num, false)

双V类型

1
2
3
4
5
6
7
8
9
10
11
12
13
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)
val rdd3 = sc.makeRDD(List("3", "4", "5", "6"))
// 需要保证类型相等
// 交集
rdd1.intersection(rdd2)
// 并集
rdd1.union(rdd2)
// 差集
rdd1.subtract(rdd2)
// 类型可以不相同
// 拉链,每个分区数量或分区中元素数量不相同会报错
rdd2.zip(rdd3)

KV类型

PartitionBy
  • 根据指定的规则对每一条数据进行重分区
    • repartition:强调分区数量的变化,数据怎么变不关心
    • partitionBy:关心数据的分区规则
1
2
3
4
5
6
val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd11 = rdd1.map((_, 1))
// 二次编译(隐式转换)
// RDD => PairRDDFunctions
// HashPartitioner是Spark中默认shuffle分区器
rdd11.partitionBy(new HashPartitioner(2))
ReduceByKey
  • 将相同的key的value分在一个组中,然后进行reduce操作
  • 可以在Shuffle前对分区数据进行预聚合,称为Combine
1
2
3
val rdd1 = sc.makeRDD(List(("a", 1), ("a", 1), ("a", 1)))
// reduceByKey可以实现WordCount(2/10)
rdd1.reduceByKey(_ + _)
GroupByKey
  • 将相同key数据的value分在一个组中
  • groupBy
    • 不需要考虑数据类型
    • 按照指定的规则进行分组
    • groupBy => (String, Iterable[(String, Int)])
  • groupByKey
    • 必须保证数据kv类型
    • 必须根据key对value分组
    • groupByKey => (String, Iterable[Int])
1
2
3
val rdd1 = sc.makeRDD(List(("a", 1), ("a", 1), ("a", 1)))
// groupByKey也可以实现 WordCount(3/10)
rdd1.groupByKey()
AggregateByKey
  • reduceByKey在分区内与分区间逻辑相同
  • aggregateByKey分区间有Shuffle
  • aggregateByKey算子存在函数柯里化
    • 参数列表1:参数为零值,表示计算初始值zero/z, 用于数据进行分区内计算(否则一个数据无法参与两两计算)
    • 参数列表2
      • 参数1:分区内计算规则
      • 参数2:分区间计算规则
1
2
3
4
5
6
7
8
9
10
11
12
val rdd1 = sc.makeRDD(
List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),
2
)
// TODO 取出每个分区内相同Key的最大值后分区间相加
rdd1.aggregateByKey(0)(
(x, y) => Math.max(x, y),
(x, y) => x + y
)
FoldByKey
  • 当分区内计算规则和分区间计算规则相同时,aggregateByKey就可以简化为foldByKey
1
2
3
4
5
6
7
8
9
10
11
12
val rdd1 = sc.makeRDD(
List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),
2
)
// aggregateByKey也可以实现WordCount(4/10)
val rdd11 = rdd1.aggregateByKey(0)(_ + _, _ + _)
// foldByKey也可以实现WordCount(5/10)
// 如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化
val rdd12 = rdd1.foldByKey(0)(_ + _)
CombineByKey
  • 最通用的对KV型rdd进行聚集操作的聚集函数,允许用户返回值的类型与输入不一致
  • 参数1:当第1个数据不符合规则时,进行转换
  • 参数2:分区内计算规则
  • 参数3:分区间计算规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// TODO 求每个key的平均值
val rdd1 = sc.makeRDD(
List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
),
2
)
/*
List(
("a", (1, 1)), ("a", 2), ("b", (3, 1)),
("b", (4, 1)), ("b", 5), ("a", (6, 1))
)
*/
// combineByKey算子也可以实现wordCount(6/10)
rdd1.combineByKey(
num => (num, 1),
// 类型是动态执行的结果,不能省略
(x: (Int, Int), y) => (x._1 + y, x._2 + 1),
(x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
)
SortByKey
  • 只按Key排序,默认升序
  • Key须实现Ordered特质
1
2
3
4
5
6
7
8
9
10
11
12
def rddSortByKey(): Unit = {
val rdd1 = sc.makeRDD(List(("a", 2), ("a", 1), ("c", 3), ("b", 4)))
// ("a", 2), ("a", 1), ("b", 4), ("c", 3)
rdd1.sortByKey()

val rdd2 = sc.makeRDD(List((new User, 2), (new User, 1), (new User, 3), (new User, 4)))
rdd2.sortByKey()
}

class User extends Ordered[User] {
override def compare(that: User) = 1
}
Join
  • 主要针对于两个数据集中相同Key的数据连接
  • 可能会产生笛卡尔乘积,可能会出现Shuffle,性能较差
  • 能使用其他方式实现就不要用Join
1
2
3
4
5
6
val rdd1 = sc.makeRDD(List(("a", 1),  ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 5), ("a", 6)))
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
val rdd4 = rdd1.leftOuterJoin(rdd2)
val rdd5 = rdd1.rightOuterJoin(rdd2)
val rdd6 = rdd1.fullOuterJoin(rdd2)
Cogroup
  • 先在数据集中相同Key分组,再与其他数据集连接
  • connect + group
1
2
3
val rdd1 = sc.makeRDD(List(("a", 1),  ("b", 2), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("a", 5), ("a", 6)))
rdd1.cogroup(rdd2)

案例

  • 统计出每一个省份每个广告被点击数量排行的Top3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// TODO 分区内排序取前3,分区间排序取前3
def method3(): Unit = {
sc
.textFile("data/input/agent.log")
// 将原始数据进行结构转换((省份,广告),1)
.map(line => {
val data = line.split(" ")
((data(1), data(4)), 1)
})
// 统计
.reduceByKey(_ + _)
// 结构转换(省份,(广告,1))
.map { case ((prv, adv), sum) => (prv, (adv, sum)) }
// 按省份分组(省份,List[(广告,sum)])
.aggregateByKey(ArrayBuffer[(String, Int)]())(
(buff, t) => {
buff.append(t)
buff.sortBy(_._2)(Ordering.Int.reverse).take(3)
},
(buff1, buff2) => {
buff1.appendAll(buff2)
buff1.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
.collect()
.foreach(println)
}

// TODO 先统计分析再分组
def method1(): Unit = {
sc
.textFile("data/input/agent.log")
// 将原始数据进行结构转换((省份,广告),1)
.map(line => {
val data = line.split(" ")
((data(1), data(4)), 1)
})
// 统计
.reduceByKey(_ + _)
// 结构转换(省份,(广告,1))
.map { case ((prv, adv), sum) => (prv, (adv, sum)) }
// 按省份分组(省份,List[(广告,sum)])
.groupByKey()
// 根据点击量排序并取前三
.mapValues(_.toList.sortBy(_._2)(Ordering.Int.reverse).take(3))
.collect()
.foreach(println)
}

// TODO 不推荐
def method2(): Unit = {
sc
.textFile("data/input/agent.log")
// 将原始数据进行结构转换((省份,广告),1)
.map(line => {
val data = line.split(" ")
(data(1), (data(4), 1))
})
// 分组
.groupByKey()
// 统计
// 内存中操作
.mapValues(
_
.groupBy(_._1)
.mapValues(_.size)
.toList
.sortBy(_._2)(Ordering.Int.reverse)
.take(3)
)
.collect()
.foreach(println)
}

行动算子

概念

  • 行动算子:逻辑执行,执行封装好的逻辑,让作业运行起来
  • 行动算子执行时,会构建新的作业

Reduce

  • 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
1
2
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
val reduce: Int = rdd.reduce(_ + _)

Collect

  • 在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素
  • 会将数据全部拉取到Driver端的内存中形成数据集合,可能会导致内存溢出
1
val collectRes: Array[Int] = rdd.collect()

Count

  • 返回RDD中元素的个数
1
val count: Long = rdd.count()

First

  • 返回RDD中的第一个元素
1
val first: Int = rdd.first()

Take

  • 返回一个由RDD的前n个元素组成的数组
1
val take: Array[Int] = rdd.take(2)

TakeOrdered

  • 返回该RDD排序后的前n个元素组成的数组
1
val takeOrdered: Array[Int] = rdd.takeOrdered(2)

Aggregate

  • 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
  • AggregateByKey
    • 转换算子,执行后产生新的RDD
    • 初始值只会参与分区内计算
  • Aggregate
    • 行动算子,执行后得到结果
    • 初始值会参与分区内计算,也会参与分区间的计算
1
2
3
4
5
6
// [1,4] [3,2]
// [5,1,4] [5,3,2]
// [10] [10]
// [5,10,10]
val rdd = sc.makeRDD(List(1, 4, 3, 2), 2)
val aggregate: Int = rdd.aggregate(5)(_ + _, _ + _)

Fold

  • 折叠操作,aggregate的简化版操作
1
val fold: Int = rdd.fold(5)(_ + _)

CountByKey

  • 统计每种key的个数
1
2
// 可以实现WordCount(7/10)
val countByKey: collection.Map[String, Long] = rdd.map(("a", _)).countByKey()

CountByValue

  • 集合中相同的值的个数
  • CountByValue中Value不是KV键值对中的v的意思
1
2
// 可以实现WordCount(8/10)
val countByValue: collection.Map[Int, Long] = rdd.countByValue()

Save

  • 将数据保存到不同格式的文件中
1
2
3
4
5
// 保存为分区文本文件
rdd.saveAsTextFile("data/output/saveAsTextFile")
// 保存为对象文件(会序列化)
rdd.saveAsObjectFile("data/output/saveAsObjectFile")
rdd.map(("a", _)).saveAsSequenceFile("data/output/saveAsSequenceFile")

Foreach

  • 分布式遍历RDD中的每一个元素,调用指定函数
1
2
3
4
5
6
// 单点操作
// 按分区号进行采集
rdd.collect().foreach(println)
// 分布式操作,在Executor中遍历
// 分区内有序,分区间无序
rdd.foreach(println)

序列化

闭包检查

  • 从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,称之为闭包检测
  • Scala2.12版本后闭包编译方式发生了改变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("RddForeach"))
// Scala语法:闭包
// 在执行算子时,如果算子的内部使用了外部的变量(对象),那么意味着一定会出现闭包
// 在这种场景中,需要将Driver端的变量通过网络传递给Executor端执行,这个操作不用执行也能判断出来
// 可以在真正执行之前,对数据进行序列化校验
// Spark在执行作业前,需要先进行闭包检测功能
val rdd = sc.makeRDD(List[Int](), 2)
// Driver端
val user = new User
rdd.foreach(num => {
// Executor端
println(user.age + num)
})
sc.stop()
}

class User extends Serializable {
val age = 30
}
  • 序列化方法属性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("RddSerial"))
    val rdd = sc.makeRDD(List("Hello", "Hbase", "Spark", "Scala"))
    new Search("S").filterByQuery(rdd).foreach(println)
    sc.stop()
    }

    class Search(s: String) extends Serializable {
    def filterByQuery(rdd: RDD[String]): RDD[String] ={
    // 算子外Driver
    rdd.filter(
    // 算子内Executor
    _.startsWith(this.s)
    )
    }
    }

    class Search1(s: String) {
    def filterByQuery(rdd: RDD[String]): RDD[String] ={
    val str = this.s
    // 算子外Driver
    rdd.filter(
    // 算子内Executor
    _.startsWith(str)
    )
    }
    }

Kryo序列化框架

  • 参考地址: https://github.com/EsotericSoftware/kryo
  • Java的序列化能够序列化任何的类,但比较重(字节多)
  • 出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制,速度是Serializable的10倍,当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化
  • 即使使用Kryo序列化,也要继承Serializable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
object serializable_Kryo {

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu", "atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}

}

case class Searcher(val query: String) {

def isMatch(s: String) = {
s.contains(query)
}

def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}

def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}

}

依赖关系

血缘/依赖

graph LR
A -- 血缘关系 --> C
A -- 依赖 --> B -- 依赖 --> C
  • 依赖:相邻的两个RDD之间的关系
  • 血缘:多个连续的依赖关系

依赖关系

  • 窄依赖:上游RDD一个分区的数据被下游RDD一个分区独享,OneToOneDependency
  • 宽依赖:上游RDD一个分区的数据被下游RDD多个分区共享,ShuffleDependency

阶段划分

  • 阶段数量:Shuffe操作个数 + 1
  • 任务数量:所有阶段最后一个RDD的分区数之和
  • RDD任务切分
    • Application:初始化一个SparkContext即生成一个Application
    • Job:一个Action算子就会生成一个Job
    • Stage:Stage等于宽依赖(ShuffleDependency)的个数加1
    • Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数
    • Application → Job → Stage → Task每一层都是1对n的关系

持久化

持久化

  • 通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用
  • 持久化的文件只能自己用,且使用完毕后会删除
1
2
3
4
5
6
7
8
9
10
val wordToOne = words.map((_, 1))
// TODO 设定数据持久化
// 放入内存中
// cache方法可以将血缘关系进行修改,添加一个和缓存相关的依赖关系
// cache操作不安全
wordToOne.cache()
// 放入磁盘中
// StorageLevel.OFF_HEAP:堆外内存,不受JVM管理的内存,主动向OS申请的内存
wordToOne.persist(StorageLevel.DISK_ONLY_2)
val wordToCount = wordToOne.reduceByKey(_ + _)

检查点

  • 将中间的计算结果保存到检查点中,让其他的应用使用数据
  • 检查点可以切断血缘关系
  • 检查点为了数据的安全会重新再执行一遍作业,所以会执行2次。为了解决这个问题,可以将检查点和缓存联合使用
1
2
3
4
5
6
val wordToOne = words.map((_, 1))
wordToOne.cache()
// 不设置路径会报错:Checkpoint directory has not been set in the SparkContext
sc.setCheckpointDir("data/checkpoint")
wordToOne.checkpoint()
val wordToCount = wordToOne.reduceByKey(_ + _)

区别

  • Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖
  • Cache缓存的数据通常存储在磁盘/内存,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高
  • 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的Job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD

分区器

概念

  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

  • 每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的

  • Hash分区:对于给定的key,计算其hashCode并除以分区个数取余

  • Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀且分区间有序

自定义分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("Partitioner"))
val lines = sc.makeRDD(
List(
("nba", 1),
("cba", 1),
("nba", 1),
("wnba", 1)
), 2
)

}

// TODO 自定义分区器
class MyPartitioner extends Partitioner {

// TODO 分区数量
override def numPartitions: Int = 3

// TODO 根据数据Key返回数据所在的分区编号(从0开始)
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "cba" => 1
case _ => 2
}
}

// TODO 重复分区器判断
override def equals(other: Any): Boolean = other match {
case h: MyPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions

}

文件读取/保存

text文件

1
val textFile: RDD[String] = sc.textFile("data/output/saveAsTextFile")

sequence文件

1
2
// 需要使用泛型告诉Spark数据格式
val sequenceFile = sc.sequenceFile[String, Int]("data/output/saveAsSequenceFile")

object对象文件

1
2
// 需要使用泛型告诉Spark对象类型
val objectFile = sc.objectFile[(String, Int)]("data/output/saveAsObjectFile")

累加器

概念

  • 分布式共享只写变量:累加器之间无法互相读取
  • 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge

编程

系统累加器

1
2
3
4
5
6
7
8
// 创建累加器
val sum = sc.longAccumulator("sum")
rdd.foreach(num => {
// 使用累加器
sum.add(num)
})
// 获取结果
println(sum.value)

自定义累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
object WordCount {

def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("WordCount"))
val rdd = sc.makeRDD(List("scala", "scala", "scala", "scala", "scala", "scala", "spark", "spark", "spark", "spark"))
// 创建累加器
val acc = new WordCountAccumulator
// 向Spark注册
sc.register(acc, "wordCount")
// 将单词放入累加器
rdd.foreach(acc.add)
// 获取结果
println(acc.value)
sc.stop()
}

// TODO 自定义数据累加器
// 泛型为输入,输出类型
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {

private val map = mutable.Map[String, Int]()

// 判断累加器是否为初始状态
// 复制流程 copy => reset => isZero
override def isZero = map.isEmpty

// 复制累加器
override def copy() = new WordCountAccumulator

// 重置累加器
override def reset(): Unit = map.clear()

// 从外部向累加器中添加数据
override def add(v: String): Unit = {
val count = map.getOrElse(v, 0)
map.update(v, count + 1)
}

// 合并两个累加器的结果
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
other.value.foreach { case (word, count1) =>
val count2 = map.getOrElse(word, 0)
map.update(word, count1 + count2)
}
}

// 将结果返回到外部
override def value = map

}

重复计算问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
// TODO 创建累加器
val sum = sc.longAccumulator("sum")
val rdd1 = rdd.map(num => {
// 使用累计器
sum.add(num)
num
})
// rdd第一遍执行
rdd1.collect
// rdd第二遍执行
rdd1.foreach(println)
println("***********************")
// TODO 累计器重复计算的问题
// 累加器会执行两遍
// 获取累加器的结果:20
println(sum.value)

广播变量

概念

  • 分布式共享只读变量
  • 传输数据时以Task数量来定
  • 用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送

编程

1
2
3
4
5
6
7
val rdd = sc.makeRDD(List(("a", 1), ("b", 2)))
val map = mutable.Map[String, Int](("a", 3), ("b", 4))
val bc = sc.broadcast(map)
rdd
.map { case (word, count) => (word, (count, bc.value.getOrElse(word, 0)))}
.collect()
.foreach(println)

SQL

概述

  • Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块
  • 底层将SQL转换为RDD
  • 为了简化RDD的开发提高开发效率,提供了2个编程抽象,类似Spark Core中的RDD
    • DataFrame:一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格
      • DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型
      • 懒执行,性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化

RDD与DataFrame

  • DataSet:分布式数据集合,是DataFrame的一个扩展

操作

概述

  • Spark Core中想要执行应用程序,需要首先构建上下文环境对象SparkContext
  • Spark SQL可理解为对Spark Core的封装,不仅在模型上进行了封装,上下文环境对象也进行了封装
  • SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合
  • SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的

DataFrame

概念

  • Spark SQL的DataFrame API 允许我们使用DataFrame而不用必须去注册临时表或者生成SQL表达式
  • DataFrame既有transformation操作也有action操作

创建

  • 在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回

  • 从Spark数据源进行创建

    • 查看Spark支持创建文件的数据源格式

      1
      spark.read.
    • 在spark的bin/data目录中创建user.json文件

      1
      {"username":"zhangsan","age":20}
    • 读取json文件创建DataFrame

      1
      2
      3
      scala> val df = spark.read.json("data/user.json")

      df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
    • 如果从内存中获取数据,spark可以知道数据类型具体是什么如果是数字,默认作为Int处理。但是从文件中读取的数字,不能确定是什么类型,所以用BigInt接收,可以和Long类型转换,但是和Int不能进行转换

SQL语法

  • 查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

  • 读取JSON文件创建DataFrame

    1
    2
    3
    scala> val df = spark.read.json("data/user.json")

    df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  • 对DataFrame创建一个临时表

    1
    df.createOrReplaceTempView("people")
  • 通过SQL语句实现查询全表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    scala> val sqlDF = spark.sql("SELECT * FROM people")

    sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

    // 结果展示
    scala> sqlDF.show

    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30| lisi|
    | 40| wangwu|
    +---+--------+
  • 普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

  • 对于DataFrame创建一个全局表

    1
    df.createGlobalTempView("people")
  • 通过SQL语句实现查询全表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    scala> spark.sql("SELECT * FROM global_temp.people").show()

    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30| lisi|
    | 40| wangwu|
    +---+--------+

    scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()

    +---+--------+
    |age|username|
    +---+--------+
    | 20|zhangsan|
    | 30| lisi|
    | 40| wangwu|
    +---+--------+

DSL语法

  • DataFrame提供一个特定领域语言(domain-specific language)去管理结构化的数据。可以在 Scala,Java,Python和R中使用DSL,使用DSL语法风格不必去创建临时视图

  • 创建一个DataFrame

    1
    2
    3
    scala> val df = spark.read.json("data/user.json")

    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  • 查看DataFrame的Schema信息

    1
    2
    3
    4
    5
    scala> df.printSchema

    root
    |-- age: Long (nullable = true)
    |-- username: string (nullable = true)
  • 只查看”username”列数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    scala> df.select("username").show()

    +--------+
    |username|
    +--------+
    |zhangsan|
    | lisi|
    | wangwu|
    +--------+
  • 查看”username”列数据以及”age+1”数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    scala> df.select($"username",$"age" + 1).show

    scala> df.select('username, 'age + 1).show()

    scala> df.select('username, 'age + 1 as "newage").show()

    +--------+---------+
    |username|(age + 1)|
    +--------+---------+
    |zhangsan| 21|
    | lisi| 31|
    | wangwu| 41|
    +--------+---------+
  • 查看”age”大于”30”的数据

    1
    2
    3
    4
    5
    6
    7
    scala> df.filter($"age">30).show

    +---+---------+
    |age| username|
    +---+---------+
    | 40| wangwu|
    +---+---------+
  • 按照”age”分组,查看数据条数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    scala> df.groupBy("age").count.show

    +---+-----+
    |age|count|
    +---+-----+
    | 20| 1|
    | 30| 1|
    | 40| 1|
    +---+-----+
  • 涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名

RDD转DataFrame

  • 在IDEA中开发程序时如需RDD与DF或者DS之间互相操作,那么需要引入import spark.implicits._

  • 这里的spark不是Scala中的包名,而是创建的sparkSession对象的变量名称,所以必须先创建SparkSession对象再导入。这里的spark对象不能使用var声明,因为Scala只支持val修饰的对象的引入

  • spark-shell中无需导入,自动完成此操作

    1
    2
    3
    val idRDD = sc.textFile("data/id.txt")
    idRDD.toDF("id").show

  • 实际开发中,一般通过样例类将RDD转换为DataFrame

    1
    2
    3
    case class User(name:String, age:Int)

    sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show

DataFrame转RDD

  • DataFrame其实就是对RDD的封装,可直接获取内部的RDD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF

    df: org.apache.spark.sql.DataFrame = [name: string, age: int]

    scala> val rdd = df.rdd

    rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25

    scala> val array = rdd.collect

    array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])

    // 此时得到的RDD存储类型为Row
    scala> array(0)
    res28: org.apache.spark.sql.Row = [zhangsan,30]
    scala> array(0)(0)
    res29: Any = zhangsan
    scala> array(0).getAs[String]("name")
    res30: String = zhangsan

DataSet

概念

  • DataSet是具有强类型的数据集合,需要提供对应的类型信息

创建

  • 使用样例类序列创建DataSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    scala> case class Person(name: String, age: Long)

    defined class Person

    scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()

    caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

    scala> caseClassDS.show

    +---------+---+
    | name|age|
    +---------+---+
    | zhangsan| 2|
    +---------+---+
  • 使用基本类型的序列创建DataSet

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    scala> val ds = Seq(1,2,3,4,5).toDS

    ds: org.apache.spark.sql.Dataset[Int] = [value: int]

    scala> ds.show

    +-----+
    |value|
    +-----+
    | 1|
    | 2|
    | 3|
    | 4|
    | 5|
    +-----+
  • 在实际使用的时候很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

RDD转DataSet

  • SparkSQL能够自动将包含有case类的RDD转换成DataSet,case类定义了table的结构,case类属性通过反射变成了表的列名。Case类可以包含诸如Seq或者Array等复杂的结构

    1
    2
    3
    4
    5
    6
    7
    scala> case class User(name:String, age:Int)

    defined class User

    scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

    res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet转RDD

  • DataSet其实也是对RDD的封装,可以直接获取内部的RDD

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    scala> case class User(name:String, age:Int)

    defined class User

    scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS

    res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

    scala> val rdd = res11.rdd

    rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at <console>:25

    scala> rdd.collect

    res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

三者关系

共性

  • 都是spark平台下的分布式弹性数据集:为处理超大型数据提供便利
  • 都有惰性机制:在进行创建、转换,如map方法时,不会立即执行
  • 有许多共同的函数
  • 都会根据Spark的内存情况自动缓存运算
  • DataFrame&DataSet
    • 许多操作都需要import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
    • 可使用模式匹配获取各个字段的值和类型
    • 相同的成员函数

特性

  • RDD
    • 一般和spark mlib同时使用
    • 不支持sparksql操作
  • DataFrame
    • 每行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    • DataFrame是DataSet的一个特例type DataFrame = Dataset[Row]
  • DataSet
    • 每一行是什么类型不一定,自定义了case class之后可以很自由的获得每一行的信息

DataFrame转DataSet

1
2
3
4
5
6
7
8
9
10
11
scala> case class User(name:String, age:Int)

defined class User

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")

df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[User]

ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

DataSet转DataFrame

1
2
3
4
5
6
7
scala> val ds = df.as[User]

ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

scala> val df = ds.toDF

df: org.apache.spark.sql.DataFrame = [name: string, age: int]
graph TD
subgraph DataSet
a -. toDF&#40&#91列名&#93&#41 .-> b(id,name,age)
a -. 样例类 toDS .-> c(User)
b -. as&#91类型&#93 .-> c
c -. toDF .-> b
c -. rdd .-> a
subgraph DataFrame&#40DataSet&#91Row&#93&#41
b -. rdd .-> a
subgraph RDD
a(&quot1&quot,&quotZhangsan&quot,&quot30&quot)
end
end
end

IDE开发

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.2.2</version>
</dependency>

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// TODO 创建环境对象
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
val spark = SparkSession.builder().config(conf).getOrCreate()
// 导入隐式转换,这里的spark是环境对象的名称
// 该对象需要用val声明
import spark.implicits._
// TODO 逻辑操作
val jsonDF = spark.read.json("data/input/user.json")
// TODO SQL
// 将df转为临时视图
jsonDF.createOrReplaceTempView("user")
spark.sql("select * from user").show
// TODO DSL
jsonDF.select("username", "age").show
// 若查询列名采用单引号,需要引入隐式转换
jsonDF.select('username, 'age).show
// TODO 三者转换
val rdd = spark.sparkContext.makeRDD(List(
(1, "Tom", 10),
(2, "Jerry", 20)
))
// TODO RDD <=> DataFrame
val df: DataFrame = rdd.toDF("id", "name", "age")
val dfToRdd: RDD[Row] = df.rdd
// TODO RDD <=> DataSet
val userRDD: RDD[User] = rdd.map { case (id, name, age) => User(id, name, age) }
val ds: Dataset[User] = userRDD.toDS()
val dsToRdd = ds.rdd
// TODO DataSet <=> DataFrame
val dfToDs: Dataset[User] = df.as[User]
val dsToDf = ds.toDF()
// TODO 释放对象
spark.close()

自定义函数

概述

  • 用户可以通过spark.udf功能添加自定义函数,实现自定义功能

UDF

1
2
3
4
5
6
7
8
9
10
val userRDD: RDD[User] = rdd.map { case (id, name, age) => User(id, name, age) }
val ds: Dataset[User] = userRDD.toDS()
ds.map(user => User(user.id, "name: " + user.name, user.age)).show
// TODO 使用自定义函数在SQL中完成自定义操作
// 注自定义册函数
spark.udf.register("addName", (s: String) => "name: " + s)
// 将DataSet转换临时表
ds.createOrReplaceTempView("user")
// 应用UDF
spark.sql("select addName(name) from user").show

UDAF

  • 用户可以设定自己的自定义聚合函数。通过继承UserDefinedAggregateFunction来实现用户自定义聚合函数

  • 一个需求可以采用很多种不同的方法实现需求

  • 弱类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd = spark.sparkContext.makeRDD(List(
    (1, "Tom", 10L),
    (2, "Jerry", 20L)
    ))
    val userRDD: RDD[User] = rdd.map { case (id, name, age) => User(id, name, age) }
    val ds: Dataset[User] = userRDD.toDS()
    // 将DataSet转换为表
    ds.createOrReplaceTempView("user")
    // TODO 使用自定义函数在SQL中完成自定义聚合操作
    // 创建UDAF函数
    val udaf = new MyAveAgeUDAF
    // 注自定义册函数
    spark.udf.register("avgAge", udaf)
    // 在SQL中使用聚合函数
    spark.sql("select avgAge(age) from user").show
    spark.close()
    }

    case class User(id: Long, name: String, age: Long) {}

    // TODO 自定义聚合函数
    class MyAveAgeUDAF extends UserDefinedAggregateFunction {

    // TODO 输入数据的结构信息:年龄信息
    override def inputSchema = {
    StructType(Array(StructField("age", LongType)))
    }

    // TODO 缓冲区的数据结构信息:年龄总和,人数
    override def bufferSchema = {
    StructType(Array(
    StructField("totalAge", LongType),
    StructField("count", LongType)
    ))
    }

    // TODO 聚合函数返回的结果类型
    override def dataType = DoubleType

    // TODO 函数稳定性
    override def deterministic = true

    // TODO 函数缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
    // totalAge
    buffer(0) = 0L
    // count
    buffer(1) = 0L
    }

    // TODO 更新缓冲区数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
    }

    // TODO 合并缓冲区
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }

    // TODO 函数计算
    override def evaluate(buffer: Row) = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
    }

    }
  • 强类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._
    val rdd = spark.sparkContext.makeRDD(List(
    (1, "Tom", 10L),
    (2, "Jerry", 20L)
    ))
    val userRDD: RDD[User] = rdd.map { case (id, name, age) => User(id, name, age) }
    val ds: Dataset[User] = userRDD.toDS()
    // 将DataSet转换为表
    ds.createOrReplaceTempView("user")
    // TODO 使用自定义函数在SQL中完成自定义聚合操作
    // TODO 创建UDAF函数
    val udaf = new MyAveAgeUDAF
    // 因为聚合函数是强类型,那么sql中没有类型的概念,所以无法使用
    // 可以采用DSL语法方法进行访问
    // 将聚合函数转换为查询的列让DataSet访问
    ds.select(udaf.toColumn).show()
    spark.close()
    }

    case class User(id: Long, name: String, age: Long) {}

    case class AvgBuffer(var totalAge: Long, var count: Long)

    // TODO 自定义强类型聚合函数
    // 继承Aggregator[输入, 缓冲区, 输出]
    // 输入数据的类型 User
    // 缓冲区的数据类型 AvgBuffer
    // 输出数据的类型 Long
    class MyAveAgeUDAF extends Aggregator[User, AvgBuffer, Double] {

    // TODO 缓冲区的初始值
    override def zero = {
    AvgBuffer(0L, 0L)
    }

    // TODO 聚合数据
    override def reduce(b: AvgBuffer, a: User) = {
    b.totalAge += a.age
    b.count += 1
    b
    }

    // TODO 合并缓冲区
    override def merge(b1: AvgBuffer, b2: AvgBuffer) = {
    b1.totalAge += b2.totalAge
    b1.count += b2.count
    b1
    }

    // TODO 计算函数的结果
    override def finish(reduction: AvgBuffer) = {
    reduction.totalAge.toDouble / reduction.count
    }

    // DataSet默认额编解码器,用于序列化,固定写法
    // 自定义类型就是product自带类型根据类型选择
    override def bufferEncoder = Encoders.product

    override def outputEncoder = Encoders.scalaDouble

    }

数据加载保存

通用方式

概念

  • 通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据

  • SparkSQL默认读取和保存的文件格式为parquet

  • 加载数据

    • spark.read.load是加载数据的通用方法

    • 如果读取不同格式的数据,可以对不同的数据格式进行设定

      1
      scala> spark.read.format("…")[.option("…")].load("…")
      • format("…"):指定加载的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"
      • load("…"):在"csv""jdbc""json""orc""parquet""textFile"格式下需要传入加载数据的路径
      • option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
      • Spark读取JSON文件时,要求文件中的每一行符合JSON的格式要求,如果文件格式不正确,那么不会发生错误,但是解析结果不正确
    • 可直接在文件上进行查询

      1
      2
      // 文件格式.`文件路径`
      scala>spark.sql("select * from json.`/opt/module/data/user.json`").show
  • 保存数据

    • df.write.save是保存数据的通用方法

    • 如果保存不同格式的数据,可以对不同的数据格式进行设定

      1
      scala>df.write.format("…")[.option("…")].save("…")
      • format("…"):指定保存的数据类型,包括"csv""jdbc""json""orc""parquet""textFile"
      • save ("…"):在"csv""orc""parquet""textFile"格式下需要传入保存数据的路径
      • option("…"):在"jdbc"格式下需要传入JDBC相应参数,url、user、password和dbtable
      • 如果路径已经存在执行保存操作会发生错误,如果想在路径已经存在的情况下保存数据,可以使用保存模式
    • 保存操作可以使用SaveMode来指明如何处理数据,使用mode()方法设置

      1
      df.write.mode("append").json("/opt/module/data/output")
      • 这些SaveMode都没有加锁,也不是原子操作
      • SaveMode是一个枚举类,其中的常量包括
    Scala/Java Any Language Meaning
    SaveMode.ErrorIfExists(default) "error"(default) 如果文件已经存在则抛出异常
    SaveMode.Append "append" 如果文件已经存在则追加
    SaveMode.Overwrite "overwrite" 如果文件已经存在则覆盖
    SaveMode.Ignore "ignore" 如果文件已经存在则忽略

Parquet

  • Spark SQL的默认数据源为Parquet格式

  • Parquet是一种能够有效存储嵌套数据的列式存储格式

  • 数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default可修改默认数据源格式

  • 加载数据

    1
    scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
  • 保存数据

    1
    2
    3
    scala> var df = spark.read.json("/opt/module/data/input/people.json")
    //保存为parquet格式
    scala> df.write.mode("append").save("/opt/module/data/output")

JSON

  • Spark SQL能自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]

  • 可通过SparkSession.read.json()加载JSON文件

  • Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串

    1
    2
    3
    {"name":"Michael"}
    {"name":"Andy","age":30}
    {"name":"Justin","age":19}
  • 导入隐式转换

    1
    import spark.implicits._
  • 加载JSON文件

    1
    2
    val path = "/opt/module/spark-local/people.json"
    val peopleDF = spark.read.json(path)
  • 创建临时表

    1
    peopleDF.createOrReplaceTempView("people")
  • 数据查询

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

    teenagerNamesDF.show()

    +------+
    | name|
    +------+
    |Justin|
    +------+
CSV
  • Spark SQL可配置CSV文件的列表信息,读取CSV文件,CSV文件的第一行设置为数据列

    1
    2
    3
    4
    5
    6
    7
    8
    val df = spark.read.format("csv")
    // 分隔
    .option("sep", ";")
    // 包含字段名
    .option("inferSchema", "true")
    .option("header", "true")
    .load("data/input/user.csv")
    df.show()

MySQL

  • Spark SQL可通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中

  • 如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下

    1
    2
    3
    bin/spark-shell 

    --jars mysql-connector-java-8.0.30-bin.jar
  • 依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.30</version>
    </dependency>
  • 读取数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // TODO 读取
    // 方式1:通用的load方法读取
    spark.read.format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/spark-sql")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "root")
    .option("dbtable", "user")
    .load()
    .show

    // 方式2:通用的load方法读取 参数另一种形式
    spark.read.format("jdbc")
    .options(Map("url"->"jdbc:mysql://localhost:3306/spark-sql?user=root&password=root",
    "dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))
    .load()
    .show

    // 方式3:使用jdbc方法读取
    val props: Properties = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "root")
    val df = spark.read.jdbc("jdbc:mysql://localhost:3306/spark-sql", "user", props)
    df.show
  • 写入数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // TODO 写入
    df.write.format("jdbc")
    .option("url", "jdbc:mysql://localhost:3306/spark-sql")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("user", "root")
    .option("password", "root")
    .option("dbtable", "user1")
    .mode(SaveMode.Append)
    .save()
    //...

Hive

  • Spark SQL编译时可包含Hive支持。包含Hive支持的Spark SQL可支持Hive表访问、UDF (用户自定义函数)及Hive查询语言(HiveQL/HQL)等。如果要在Spark SQL中包含Hive 的库,并不需要事先安装Hive。一般来说最好还是在编译Spark SQL时引入Hive支持

  • 若要把Spark SQL连接到一个部署好的Hive上,必须把hive-site.xml复制到Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL 也可以运行。 如果没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive元数据仓库

  • spark-shell默认支持Hive,代码默认不支持,需要手动指定(加一个参数)

  • Spark SQL CLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQL CLI,直接执行SQL语句,类似一Hive窗口

  • 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.29</version>
    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.2.2</version>
    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.2.1</version>
    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.2.2</version>
    </dependency>

    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    </dependency>

    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.13.3</version>
    </dependency>

    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-auth</artifactId>
    <version>3.3.2</version>
    </dependency>
  • 内置Hive

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // builder 构建,创建
    // TODO 默认情况下SparkSQL支持本地Hive操作的,执行前需要启用Hive的支持
    // 调用enableHiveSupport方法。
    val spark = SparkSession.builder()
    .enableHiveSupport()
    .config(sparkConf).getOrCreate()
    // 导入隐式转换,这里的spark其实是环境对象的名称

    // 可以使用基本的sql访问hive中的内容
    //spark.sql("create table aa(id int)")
    //spark.sql("show tables").show()
    spark.sql("load data local inpath 'data/input/id.txt' into table aa")
    spark.sql("select * from aa").show
  • 外置Hive

    • 将hive-site.xml文件拷贝到项目的resources目录中

    • 在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址

      1
      config("spark.sql.warehouse.dir", "hdfs://bigdata1:8020/user/hive/warehouse")
    • 执行操作时出错误可以代码最前面增加如下代码解决

      1
      System.setProperty("HADOOP_USER_NAME", "root")

Streaming

概述

概念

  • 用于流式数据的处理,使得构建可扩展的容错流应用程序变得更加容易
    • Spark Streaming无法实现真正的流式数据处理,使用微批次数据处理
    • Spark Streaming是准实时数据处理引擎
  • 和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream
  • DStream是随时间推移而收到的数据的序列。在内部每个时间区间收到的数据都作为 RDD存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)
  • DStream是对RDD在实时数据处理场景的一种封装
  • 几种概念
    • 实时:数据处理的延迟在毫秒级进行响应
    • 离线:数据处理的延迟在小时往上进行响应
    • 批处理:数据处理的方式,多条数据一起做
    • 流式:数据处理的方式,一条数据一起做
  • 背压机制
    • Spark Streaming Backpressure,根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率
    • 通过属性spark.streaming.backpressure.enabled控制是否启用backpressure机制,默认false

SparkStreaming架构图

入门案例

需求

  • 使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.2</version>
</dependency>

编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Spark环境,至少要有两个线程
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
// 参数2: 采集周期
val ssc = new StreamingContext(conf, Seconds(3))
// 从socket一行行获取数据
val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
wordDS
.map((_, 1))
.reduceByKey(_ + _)
.print
// Driver程序执行streaming处理过程中不能结束
// 采集器在正常情况下启动后不应该停止,除非特殊的情况
// 启动采集器
ssc.start()
Thread.sleep(2000)
ssc.stop()
// 等待采集器的结束
ssc.awaitTermination()

创建

RDD队列

  • 测试过程中可通过使用ssc.queueStream(queueOfRDDs)创建DStream,每一个推送到这个队列中的RDD都会作为一个DStream处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
val ssc = new StreamingContext(
new SparkConf().setMaster("local[*]").setAppName("SparkStreaming"),
Seconds(3)
)
val queue = new mutable.Queue[RDD[String]]()
val ds = ssc.queueStream(queue)
ds.print()
ssc.start()
for (i <- 1 to 5) {
val rdd = ssc.sparkContext.makeRDD(List(i.toString))
queue.enqueue(rdd)
Thread.sleep(1000)
}
ssc.awaitTermination()

文件

  • 容易出现问题
1
2
3
4
5
6
7
8
val ssc = new StreamingContext(
new SparkConf().setMaster("local[*]").setAppName("SparkStreaming"),
Seconds(3)
)
val ds = ssc.textFileStream("data/input/word.txt")
ds.print()
ssc.start()
ssc.awaitTermination()

自定义数据源

  • 需要继承Receiver,并实现onStartonStop方法来自定义数据源采集
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def main(args: Array[String]): Unit = {
val ssc = new StreamingContext(
new SparkConf().setMaster("local[*]").setAppName("SparkStreaming"),
Seconds(3)
)
val ds = ssc.receiverStream(new MyReceiver("localhost", 9999))
ds.print()
ssc.awaitTermination()
}

// TODO 自定义数据采集器
// Receiver的构造方法有参数的,所以子类在继承时,应该传递这个参数
class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

private var socket: Socket = _

override def onStart(): Unit = {
socket = new Socket(host, port)
new Thread("Socket Receiver") {
setDaemon(true)

override def run() {
receive()
}
}.start()
}

def receive(): Unit = {
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream, StandardCharset.UTF_8)
)
var s: String = null
while (true) {
s = reader.readLine()
if (s != null) {
// TODO 将获取的数据保存到框架内部进行封装
store(s)
}
}
}

override def onStop(): Unit = {
socket.close()
socket = null
}

}

Kafka数据源

  • 由计算的Executor来主动消费Kafka的数据,速度由自身控制
1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
<version>3.2.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
val ssc = new StreamingContext(
new SparkConf().setMaster("local[*]").setAppName("SparkStreaming"),
Seconds(3)
)
// Kafka的配置信息
val kafkaPara: Map[String, Object] = Map[String, Object](
// 集群配置
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "bigdata1:9092,bigdata2:9092,bigdata3:9092",
// 消费者组名
ConsumerConfig.GROUP_ID_CONFIG -> "mousse",
// 消费者反序列化
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
// TODO 使用SparkStreaming读取Kafka的数据
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("thisIsATopicName"), kafkaPara)
)
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
ssc.start()
// 等待采集器的结束
ssc.awaitTermination()

转换

概念

  • DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()transform()及各种Window相关的原语

无状态转化操作

概念

  • 把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD
  • 针对键值对的DStream转化操作(如reduceByKey())要添加import StreamingContext._才能在Scala中使用
  • 尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的
    • 如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据

部分无状态转化操作

Transform

  • 允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API
  • 该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Driver(只执行1次)
val newDS: DStream[String] = ds.transform(rdd => {
// Driver(执行N次)
rdd.map(data => {
// Executor(执行N次)
data * 2
})
})

// Driver(只执行1次)
val newDS1 = ds.map(data => {
// Executor(执行N次)
data * 2
})

Join

  • 两个流之间的join需要两个流的批次大小一致才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同
1
2
3
4
5
6
7
8
9
10
// 从端口获取数据创建流
val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)

// 将两个流转换为KV类型
val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))

// 流的JOIN
val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)

有状态转化操作

UpdateStateByKey

  • 用于记录历史记录,提供了对一个状态变量的访问,用于键值对形式的DStream
  • 给定一个由(键, 事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的DStream,其内部数据为(键, 状态)对,且内部的RDD序列是由每个时间区间对应的(键, 状态)对组成的
  • updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步
    • 定义状态,状态可以是一个任意的数据类型
    • 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
  • 使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 设定检查点路径
ssc.sparkContext.setCheckpointDir("cp")
val ds = ssc.socketTextStream("localhost", 9999)
// TODO 转换
// 数据的有状态的保存
// 将spark每个采集周期数据的处理结果保存起来,然后和后续的数据进行聚合
// reduceByKey方法是无状态的,而我们需要的有转态的数据操作
// 有状态的目的其实就是将每一个采集周期数据的计算结果临时保存起来
// 然后再一次数据的处理中可以继续使用
ds
.flatMap(_.split(" "))
.map((_, 1L))
//.reduceByKey(_+_)
// Option : Some, None
// updateStateByKey是有状态计算方法
// 参数1: 相同key的value的集合
// 参数2: 想用key的缓冲区的数据, 有可能为空
// requirement failed: The checkpoint directory has not been set.
// 这里的计算的中间结果需要保存到检查点的位置中,所以需要设定检查点路径
.updateStateByKey[Long](
(seq: Seq[Long], buffer: Option[Long]) => {
val newBufferValue = buffer.getOrElse(0L) + seq.sum
Option(newBufferValue)
}
)
.print()

Window

Operations
  • Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态
  • 所有基于窗口的操作都需要两个参数
    • 窗口时长:计算内容的时间范围
    • 滑动步长:隔多久触发一次计算
1
2
3
4
5
6
7
8
9
10
11
val wordDS: DStream[String] = ds.flatMap(_.split(" "))
val wordToOneDS = wordDS.map((_, 1))
// TODO 将多个采集周期作为计算的整体
// 窗口的范围应该是采集周期的整数倍
// 默认滑动的幅度(步长)为一个采集周期
// val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(9))
// 窗口的计算的周期等同于窗口的滑动的步长
// 窗口的范围大小和滑动的步长应该都是采集周期的整数倍
val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(9), Seconds(6))
val result = windowDS.reduceByKey(_ + _)
result.print()
其他方法
  • window(windowLength, slideInterval):基于对源DStream窗化的批次进行计算返回一个新的Dstream

  • countByWindow(windowLength, slideInterval):返回一个滑动窗口计数流中的元素个数

  • reduceByWindow(func, windowLength, slideInterval):通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流

  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ds
    .map(num => ("key", num.toInt))
    // TODO reduceByKeyAndWindow方法一般用于重复数据的范围比较大的场合,这样可以优化效率
    .reduceByKeyAndWindow(
    // 聚合数据
    (x, y) => {
    println(s"x = ${x}, y = ${y}")
    x + y
    },
    (a, b) => {
    println(s"a = ${a}, y = ${b}")
    a - b
    },
    Seconds(9)
    )
    .foreachRDD(rdd => rdd.foreach(println))
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作

输出

概念

  • 输出操作指定了对流数据经转化操作得到的数据所要执行的操作(如把结果推入外部数据库或输出到屏幕上)
  • 与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值
  • 如果StreamingContext中没有设定输出操作,整个context就都不会启动

输出操作

  • print()
    • 在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素
    • 用于开发和调试
  • saveAsTextFiles(prefix, [suffix])
    • 以text文件形式存储这个DStream的内容
    • 每一批次的存储文件名基于参数中的prefix和suffix为prefix-Time_IN_MS[.suffix]
  • saveAsObjectFiles(prefix, [suffix])
    • 以Java对象序列化的方式将Stream中的数据保存为SequenceFiles
    • 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]
  • saveAsHadoopFiles(prefix, [suffix])
    • 将Stream中的数据保存为Hadoop files
    • 每一批次的存储文件名基于参数中的为prefix-TIME_IN_MS[.suffix]
  • foreachRDD(func)
    • 通用的输出操作
    • 将函数func用于产生于stream的每一个RDD
    • 其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库
    • 用来对DStream中的RDD运行任意计算。和transform()有些类似
      • transform:rdd => rdd
      • foreachRDD:rdd => Unit
    • 可以重用在Spark中实现的所有行动操作。如把数据写到诸如MySQL的外部数据库
      • 连接不能写在driver层面(序列化)
      • 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失
      • 增加foreachPartition,在分区创建(获取)

关闭

  • 使用外部文件系统来控制内部程序关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 当业务升级的场合,或逻辑发生变化
// Stop方法一般不会放置在main线程完成
// 需要将stop方法使用新的线程完成调用
new Thread(() => {
// Stop方法的调用不应该是线程启动后马上调用
// Stop方法调用的时机,这个时机不容易确定,需要周期性的判断时机出否出现
while (true) {
Thread.sleep(10000)
// 关闭时机的判断一般不会使用业务操作
// 一般采用第三方的程序或存储进行判断
val state: StreamingContextState = ssc.getState()
if (state == StreamingContextState.ACTIVE) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
// SparkStreaming如果停止后,当前的线程也应该同时停止
System.exit(0)
}
}
}).start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def main(args: Array[String]): Unit = {
val ssc = StreamingContext.getActiveOrCreate("cp", getStreamingContext)
ssc.start()
ssc.awaitTermination()
}
def getStreamingContext() : StreamingContext = {
// TODO Spark环境
// SparkStreaming使用核数最少是2个
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// SparkStreaming中的检查点不仅仅保存中间处理数据,还保存逻辑
ssc.checkpoint("cp")
val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
ds.print()
ssc
}

原理

Yarn模式提交

概念

  • 当使用bin/java执行Java程序时,会产生JVM,Java的进程
  • ApplicationMaster是一个进程
  • Driver是一个线程,一般将SparkContext称为Driver
  • Executor是一个计算对象,有时也将ExecutorBackend后台通信对象称为Executor

集群模式

  1. 客户端向RM提交Application(运行指令及相关资源)

  2. RM选择NM节点启动AM

  3. AM启动Driver线程,运行Application创建环境后阻塞

  4. AM向RM注册AM并申请资源

  5. RM返回资源列表

  6. AM根据首选位置策略(本地化级别)选择NM并启动Executor后台进程

    • 注册通信后会收到onStart信息,并调用通信对象的该方法
  7. Executor后台进程创建完Executor计算对象后向Driver注册该对象(反向注册)

  8. Driver向Executor返回注册完毕信息

  9. Executor接收消息后创建计算对象,等待执行任务

  10. Driver将Application中的RDD转换成Task并放入Task池

  11. Driver将Task从Task池中取出后发给Executor后台

  12. Executor后台将Task对象传给Executor计算对象(Executor计算对象有线程池,1个线程对应1个Task对象

    • 并行执行Task与核数有关

Yarn模式运行流程

客户端模式

  • Yarn客户端模式的Driver线程在客户端

通讯架构

通讯架构

  • RpcEndpoint:RPC通信终端(InBox)
    • 每个节点(Client/Master/Worker)都称为一个RPC终端,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher
  • RpcEndpointRef:远程RpcEndpoint的引用(OutBox)
    • 当需要向一个具体的RpcEndpoint发送消息时,需要获取到该RpcEndpoint的引用,然后通过该应用发送消息
  • RpcEnv:RPC上下文环境
    • 每个RPC终端运行时依赖的上下文环境称为RpcEnv
  • Dispatcher:消息调度(分发)器
    • 针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)
    • 如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱
  • InBox:指令消息收件箱
    • 一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中
    • Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费
  • OutBox:指令消息发件箱
    • 对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox
    • 当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行
  • RpcAddress:远程的RpcEndpointRef的地址,Host + Port
  • TransportClient:Netty通信客户端
    • 一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer
  • TransportServer:Netty通信服务端
    • 一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱

通讯架构

阶段/任务

作业

graph LR
subgraph Application
subgraph SparkContext
a[SparkConf]
subgraph SparkEnv
b[NettyRpcEnv]
end
c[heartbeatReceiver]
subgraph DAGScheduler: Job内部调度: 阶段/任务划分
subgraph EventLoop
d[EventQueue]
end
end
e[TaskScheduler: 任务调度]
f[SchedulerBackend: RPC后台信息交互对象]
end
end

划分

概念

  • 一个Spark应用程序包括Job、Stage以及Task三个概念

    • Job:以Action方法为界,遇到一个Action方法则触发一个Job
    • Stage:是Job的子集,以RDD宽依赖(Shuffle)为界,遇到Shuffle做一次划分
    • Task:是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少task
  • 任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度

    总体调度流程

Stage级调度

  • Job由最终的RDD和Action方法封装而成

  • 划分策略由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,以Shuffle为界划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算

    • 分区数量默认不变,Shuffle的算子一般都会有改变分区数量的参数
    • 默认分区的数量Local[*]
  • 划分Stages分两类

    • ResultStage:DAG最下游的Stage,由Action方法决定
    • ShuffleMapStage:为下游Stage准备数据

Task级调度

  • 支持两种调度策略

    • FIFO,默认
    • FAIR
  • 根据每个Task的优先位置,确定Task的Locality级别,Locality一共有五种,优先级由高到低顺序(面试)

名称 解析
PROCESS_LOCAL 进程本地化,task和数据在同一个Executor中,性能最好
NODE_LOCAL 节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输
RACK_LOCAL 机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输
NO_PREF 对于task来说,从哪里获取都一样,没有好坏之分
ANY task和数据可以在集群的任何地方,而且不在一个机架中,性能最差
  • Spark调度总会尽量让task以最高的本地性级别来启动

  • 当task以X本地性级别启动,但该本地性级别对应的所有节点都没有空闲资源而启动失败时,不会马上降低本地性级别启动,而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推

    • 可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能
  • 会记录失败的Task的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败

    • 会记录它失败的Executor Id和Host,下次再调度这个Task时会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了
  • 任务和阶段的关系

    • 阶段数量:ResultStage + N * ShuffleDependencies
    • 阶段类型:ResultStage, ShuffleMapStage
  • 任务的数量:当前阶段的分区的数量

  • 任务的总数量: 所有阶段的任务总和

  • 任务的类型

    • ShuffleMapState => ShuffleMapTask
    • ResultStage => ResultTask

执行

  1. 内容
    • 阶段ID
    • RDD元数据,依赖
    • 分区
    • 首选位置
  2. 序列化(累加器,KRYO)
    • 默认序列化:Java序列化
    • Kryo序列化:shuffle场合下类型为值类型或字符串类型
  3. 调度
    • Driver
      • TaskSet(Stage)
      • TaskSetManager:Scheduler
      • 调度模式:FIFO,FAIR
      • 任务调度池:Pool(FIFO)
    • Executor
      • 本地化级别(调度)
      • 降级处理:默认等待3s
  4. 计算
    1. 任务的编码
    2. 向Executor发送Task
    3. Executor接收Task,进行解码
    4. 计算对象执行Task
    5. 线程池执行task.run方法, 调用具体Task对象的runTask方法
  5. Shuffle
    • Shuffle map(Write)
    • Shuffle reduce(Read)
    • 管理器:SortShuffleManager
    • ShuffleWriter
      • BypassMergeSortShuffleWriter
      • UnsafeShuffleWriter
      • SortShuffleWriter(面试)
        • 写磁盘文件时首先进行内存中的排序,如果内存(5M)不够会溢写磁盘生成临时文件,最终将所有的临时文件合并成数据文件和索引文件
        • 预聚合的原理:在排序时,构造了一种类似于hashtable的结构,所以相同key聚合在一起
        • 排序规则:首先会按照分区进行排序,然后按key.hashCode进行排序
    • ShuffleHandle(面试)
      • BypassMergeSortShuffleHandle:没有预聚和功能的算子 & reduce的分区数量 <= 阈值(200)
      • SerializedShuffleHandle:Spark的内存优化后的解决方案,对象序列化后不需要反序列化。(支持序列化重定位&支持预聚和&分区数量大于阈值)
      • BaseShuffleHandle:默认的shuffleHandle
  6. 内存(cache)
  7. 累加器

Shuffle

要点

  • 划分stage时,最后一个stage称为finalStage,本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage
  • ShuffleMapStage的结束伴随着shuffle文件的写磁盘
  • ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束

Shuffle落盘原理

HashShuffle

前提

  • 每个Executor只有1个CPU core,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程

未优化

  • 从Task开始那边各自把自己进行Hash计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个Task都分成3种类别的数据,把不同的数据汇聚然后计算出最终的结果,Reducer会在每个Task中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个Task输出3份本地文件,这里4个Mapper Tasks总共输出4个Tasks x 3个分类文件 = 12个本地小文件

未优化的HashShuffle

优化后

  • 启用合并机制以复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles(默认false),将其设置为true即可开启优化机制。通常建议开启这个选项
  • 在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每个Task所在的进程中,分别写入共同进程中的3份本地文件,4个Mapper Tasks总共输出 2个Cores x 3个分类文件 = 6个本地小文件

优化后的HashShuffle

SortShuffle(默认)

普通SortShuffle

  • 数据先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构
  • 在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,一个Task过程会产生多个临时文件
  • 在每个Task中将所有临时文件合并(就是merge过程),此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset

普通SortShuffle

bypass SortShuffle

  • 不进行排序,节省了这部分的性能开销
  • 触发条件
    • 不是预聚合的shuffle算子(如reduceByKey)
    • shuffle reduce task数量 <= spark.shuffle.sort.bypassMergeThreshold参数的值,默认200
  • 磁盘写机制其实跟未经优化的HashShuffleManager是一样的,在最后会做一个磁盘文件的合并。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好

bypass SortShuffle

Writer步骤

graph TD
SortShuffleManager --> Handle --> if1{没有预聚合<br>&&<br>下游RDD分区数量<br>小于等于指定配置参} -- true --> BypassMergeSortShuffleHandle --> BypassMergeSortShuffleWriter
if1 -- false --> if2{支持序列化重定位操作<br>&&<br>没有与聚合<br>&&<br>分区数不大于16777216}
if2 -- true --> SerializedShuffleHandle --> UnsafeShuffleWriter
if2 -- false --> BaseShuffleHandle --> SortShuffleWriter

内存管理

内存规划

堆内和堆外内存规划

堆内和堆外内存规划

堆内内存

  • Spark应用程序启动时的 –executor-memory spark.executor.memory参数配置
  • Executor内运行的并发任务共享 JVM堆内内存
  • 对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存
    • 申请内存流程
      1. Spark在代码中new一个对象实例
      2. JVM从堆内内存分配空间,创建对象并返回对象引用
      3. Spark保存该对象的引用,记录该对象占用的内存
    • 释放内存流程
      1. Spark记录该对象释放的内存,删除该对象的引用
      2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存

堆外内存

  • 可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据,进一步优化内存的使用以及提高Shuffle时排序的效率
  • 可通过配置park.memory.offHeap.enabled参数启用,并由 spark.memory.offHeap.size参数设定堆外空间的大小。除了没有other空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存

内存空间分配

静态内存管理

  • 早期版本使用

  • 堆内

    堆内静态内存管理

  • 堆外

    堆外静态内存管理

统一(动态)内存管理

  • 当前版本使用

  • 堆内

    堆内统一内存管理

  • 堆外

    堆外统一内存管理

  • 动态占用机制(面试)

    • 设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围
    • 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间(存储空间不足是指不足以放下一个完整的Block)
    • 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
    • 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑Shuffle过程中的很多因素,实现起来较为复杂
    • cache()方法会增加血缘关系

    动态占用机制